Skip to main content

mz_catalog/memory/
objects.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The current types used by the in-memory Catalog. Many of the objects in this module are
11//! extremely similar to the objects found in [`crate::durable::objects`] but in a format that is
12//! easier consumed by higher layers.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use chrono::{DateTime, Utc};
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_compute_client::logging::LogVariant;
24use mz_compute_types::dataflows::DataflowDescription;
25use mz_compute_types::plan::Plan as ComputePlan;
26use mz_controller::clusters::{ClusterRole, ClusterStatus, ReplicaConfig, ReplicaLogging};
27use mz_controller_types::{ClusterId, ReplicaId};
28use mz_expr::{MirScalarExpr, OptimizedMirRelationExpr};
29use mz_ore::collections::CollectionExt;
30use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
31use mz_repr::network_policy_id::NetworkPolicyId;
32use mz_repr::optimize::OptimizerFeatureOverrides;
33use mz_repr::refresh_schedule::RefreshSchedule;
34use mz_repr::role_id::RoleId;
35use mz_repr::{
36    CatalogItemId, ColumnName, Diff, GlobalId, RelationDesc, RelationVersion,
37    RelationVersionSelector, SqlColumnType, Timestamp, VersionedRelationDesc,
38};
39use mz_sql::ast::display::AstDisplay;
40use mz_sql::ast::{
41    ColumnDef, ColumnOption, ColumnOptionDef, ColumnVersioned, Expr, Raw, RawDataType, Statement,
42    UnresolvedItemName, Value, WithOptionValue,
43};
44use mz_sql::catalog::{
45    CatalogClusterReplica, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem,
46    CatalogItemType as SqlCatalogItemType, CatalogItemType, CatalogSchema, CatalogType,
47    CatalogTypeDetails, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference,
48    RoleAttributes, RoleMembership, RoleVars, SystemObjectType,
49};
50use mz_sql::names::{
51    Aug, CommentObjectId, DatabaseId, DependencyIds, FullItemName, QualifiedItemName,
52    QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier,
53};
54use mz_sql::plan::{
55    ClusterSchedule, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, ConnectionDetails,
56    CreateClusterManagedPlan, CreateClusterPlan, CreateClusterVariant, CreateSourcePlan,
57    HirRelationExpr, NetworkPolicyRule, PlanError, WebhookBodyFormat, WebhookHeaders,
58    WebhookValidation,
59};
60use mz_sql::rbac;
61use mz_sql::session::vars::OwnedVarInput;
62use mz_storage_client::controller::IntrospectionType;
63use mz_storage_types::connections::inline::ReferencedConnection;
64use mz_storage_types::sinks::{SinkEnvelope, StorageSinkConnection};
65use mz_storage_types::sources::load_generator::LoadGenerator;
66use mz_storage_types::sources::{
67    GenericSourceConnection, SourceConnection, SourceDesc, SourceEnvelope, SourceExportDataConfig,
68    SourceExportDetails, Timeline,
69};
70use mz_transform::dataflow::DataflowMetainfo;
71use mz_transform::notice::OptimizerNotice;
72use serde::ser::SerializeSeq;
73use serde::{Deserialize, Serialize};
74use timely::progress::Antichain;
75use tracing::debug;
76
77use crate::builtin::{MZ_CATALOG_SERVER_CLUSTER, MZ_SYSTEM_CLUSTER};
78use crate::durable;
79use crate::durable::objects::item_type;
80
81/// Used to update `self` from the input value while consuming the input value.
82pub trait UpdateFrom<T>: From<T> {
83    fn update_from(&mut self, from: T);
84}
85
86#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
87pub struct Database {
88    pub name: String,
89    pub id: DatabaseId,
90    pub oid: u32,
91    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
92    pub schemas_by_id: BTreeMap<SchemaId, Schema>,
93    pub schemas_by_name: BTreeMap<String, SchemaId>,
94    pub owner_id: RoleId,
95    pub privileges: PrivilegeMap,
96}
97
98impl From<Database> for durable::Database {
99    fn from(database: Database) -> durable::Database {
100        durable::Database {
101            id: database.id,
102            oid: database.oid,
103            name: database.name,
104            owner_id: database.owner_id,
105            privileges: database.privileges.into_all_values().collect(),
106        }
107    }
108}
109
110impl From<durable::Database> for Database {
111    fn from(
112        durable::Database {
113            id,
114            oid,
115            name,
116            owner_id,
117            privileges,
118        }: durable::Database,
119    ) -> Database {
120        Database {
121            id,
122            oid,
123            schemas_by_id: BTreeMap::new(),
124            schemas_by_name: BTreeMap::new(),
125            name,
126            owner_id,
127            privileges: PrivilegeMap::from_mz_acl_items(privileges),
128        }
129    }
130}
131
132impl UpdateFrom<durable::Database> for Database {
133    fn update_from(
134        &mut self,
135        durable::Database {
136            id,
137            oid,
138            name,
139            owner_id,
140            privileges,
141        }: durable::Database,
142    ) {
143        self.id = id;
144        self.oid = oid;
145        self.name = name;
146        self.owner_id = owner_id;
147        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
148    }
149}
150
151#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
152pub struct Schema {
153    pub name: QualifiedSchemaName,
154    pub id: SchemaSpecifier,
155    pub oid: u32,
156    pub items: BTreeMap<String, CatalogItemId>,
157    pub functions: BTreeMap<String, CatalogItemId>,
158    pub types: BTreeMap<String, CatalogItemId>,
159    pub owner_id: RoleId,
160    pub privileges: PrivilegeMap,
161}
162
163impl From<Schema> for durable::Schema {
164    fn from(schema: Schema) -> durable::Schema {
165        durable::Schema {
166            id: schema.id.into(),
167            oid: schema.oid,
168            name: schema.name.schema,
169            database_id: schema.name.database.id(),
170            owner_id: schema.owner_id,
171            privileges: schema.privileges.into_all_values().collect(),
172        }
173    }
174}
175
176impl From<durable::Schema> for Schema {
177    fn from(
178        durable::Schema {
179            id,
180            oid,
181            name,
182            database_id,
183            owner_id,
184            privileges,
185        }: durable::Schema,
186    ) -> Schema {
187        Schema {
188            name: QualifiedSchemaName {
189                database: database_id.into(),
190                schema: name,
191            },
192            id: id.into(),
193            oid,
194            items: BTreeMap::new(),
195            functions: BTreeMap::new(),
196            types: BTreeMap::new(),
197            owner_id,
198            privileges: PrivilegeMap::from_mz_acl_items(privileges),
199        }
200    }
201}
202
203impl UpdateFrom<durable::Schema> for Schema {
204    fn update_from(
205        &mut self,
206        durable::Schema {
207            id,
208            oid,
209            name,
210            database_id,
211            owner_id,
212            privileges,
213        }: durable::Schema,
214    ) {
215        self.name = QualifiedSchemaName {
216            database: database_id.into(),
217            schema: name,
218        };
219        self.id = id.into();
220        self.oid = oid;
221        self.owner_id = owner_id;
222        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
223    }
224}
225
226#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
227pub struct Role {
228    pub name: String,
229    pub id: RoleId,
230    pub oid: u32,
231    pub attributes: RoleAttributes,
232    pub membership: RoleMembership,
233    pub vars: RoleVars,
234}
235
236impl Role {
237    pub fn is_user(&self) -> bool {
238        self.id.is_user()
239    }
240
241    pub fn vars<'a>(&'a self) -> impl Iterator<Item = (&'a str, &'a OwnedVarInput)> {
242        self.vars.map.iter().map(|(name, val)| (name.as_str(), val))
243    }
244}
245
246impl From<Role> for durable::Role {
247    fn from(role: Role) -> durable::Role {
248        durable::Role {
249            id: role.id,
250            oid: role.oid,
251            name: role.name,
252            attributes: role.attributes,
253            membership: role.membership,
254            vars: role.vars,
255        }
256    }
257}
258
259impl From<durable::Role> for Role {
260    fn from(
261        durable::Role {
262            id,
263            oid,
264            name,
265            attributes,
266            membership,
267            vars,
268        }: durable::Role,
269    ) -> Self {
270        Role {
271            name,
272            id,
273            oid,
274            attributes,
275            membership,
276            vars,
277        }
278    }
279}
280
281impl UpdateFrom<durable::Role> for Role {
282    fn update_from(
283        &mut self,
284        durable::Role {
285            id,
286            oid,
287            name,
288            attributes,
289            membership,
290            vars,
291        }: durable::Role,
292    ) {
293        self.id = id;
294        self.oid = oid;
295        self.name = name;
296        self.attributes = attributes;
297        self.membership = membership;
298        self.vars = vars;
299    }
300}
301
302#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
303pub struct RoleAuth {
304    pub role_id: RoleId,
305    pub password_hash: Option<String>,
306    pub updated_at: u64,
307}
308
309impl From<RoleAuth> for durable::RoleAuth {
310    fn from(role_auth: RoleAuth) -> durable::RoleAuth {
311        durable::RoleAuth {
312            role_id: role_auth.role_id,
313            password_hash: role_auth.password_hash,
314            updated_at: role_auth.updated_at,
315        }
316    }
317}
318
319impl From<durable::RoleAuth> for RoleAuth {
320    fn from(
321        durable::RoleAuth {
322            role_id,
323            password_hash,
324            updated_at,
325        }: durable::RoleAuth,
326    ) -> RoleAuth {
327        RoleAuth {
328            role_id,
329            password_hash,
330            updated_at,
331        }
332    }
333}
334
335impl UpdateFrom<durable::RoleAuth> for RoleAuth {
336    fn update_from(&mut self, from: durable::RoleAuth) {
337        self.role_id = from.role_id;
338        self.password_hash = from.password_hash;
339    }
340}
341
342#[derive(Debug, Serialize, Clone, PartialEq)]
343pub struct Cluster {
344    pub name: String,
345    pub id: ClusterId,
346    pub config: ClusterConfig,
347    #[serde(skip)]
348    pub log_indexes: BTreeMap<LogVariant, GlobalId>,
349    /// Objects bound to this cluster. Does not include introspection source
350    /// indexes.
351    pub bound_objects: BTreeSet<CatalogItemId>,
352    pub replica_id_by_name_: BTreeMap<String, ReplicaId>,
353    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
354    pub replicas_by_id_: BTreeMap<ReplicaId, ClusterReplica>,
355    pub owner_id: RoleId,
356    pub privileges: PrivilegeMap,
357}
358
359impl Cluster {
360    /// The role of the cluster. Currently used to set alert severity.
361    pub fn role(&self) -> ClusterRole {
362        // NOTE - These roles power monitoring systems. Do not change
363        // them without talking to the cloud or observability groups.
364        if self.name == MZ_SYSTEM_CLUSTER.name {
365            ClusterRole::SystemCritical
366        } else if self.name == MZ_CATALOG_SERVER_CLUSTER.name {
367            ClusterRole::System
368        } else {
369            ClusterRole::User
370        }
371    }
372
373    /// Returns `true` if the cluster is a managed cluster.
374    pub fn is_managed(&self) -> bool {
375        matches!(self.config.variant, ClusterVariant::Managed { .. })
376    }
377
378    /// Lists the user replicas, which are those that do not have the internal flag set.
379    pub fn user_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
380        self.replicas().filter(|r| !r.config.location.internal())
381    }
382
383    /// Lists all replicas in the cluster
384    pub fn replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
385        self.replicas_by_id_.values()
386    }
387
388    /// Lookup a replica by ID.
389    pub fn replica(&self, replica_id: ReplicaId) -> Option<&ClusterReplica> {
390        self.replicas_by_id_.get(&replica_id)
391    }
392
393    /// Lookup a replica ID by name.
394    pub fn replica_id(&self, name: &str) -> Option<ReplicaId> {
395        self.replica_id_by_name_.get(name).copied()
396    }
397
398    /// Returns the availability zones of this cluster, if they exist.
399    pub fn availability_zones(&self) -> Option<&[String]> {
400        match &self.config.variant {
401            ClusterVariant::Managed(managed) => Some(&managed.availability_zones),
402            ClusterVariant::Unmanaged => None,
403        }
404    }
405
406    pub fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
407        let name = self.name.clone();
408        let variant = match &self.config.variant {
409            ClusterVariant::Managed(ClusterVariantManaged {
410                size,
411                availability_zones,
412                logging,
413                replication_factor,
414                optimizer_feature_overrides,
415                schedule,
416            }) => {
417                let introspection = match logging {
418                    ReplicaLogging {
419                        log_logging,
420                        interval: Some(interval),
421                    } => Some(ComputeReplicaIntrospectionConfig {
422                        debugging: *log_logging,
423                        interval: interval.clone(),
424                    }),
425                    ReplicaLogging {
426                        log_logging: _,
427                        interval: None,
428                    } => None,
429                };
430                let compute = ComputeReplicaConfig { introspection };
431                CreateClusterVariant::Managed(CreateClusterManagedPlan {
432                    replication_factor: replication_factor.clone(),
433                    size: size.clone(),
434                    availability_zones: availability_zones.clone(),
435                    compute,
436                    optimizer_feature_overrides: optimizer_feature_overrides.clone(),
437                    schedule: schedule.clone(),
438                })
439            }
440            ClusterVariant::Unmanaged => {
441                // Unmanaged clusters are deprecated, so hopefully we can remove
442                // them before we have to implement this.
443                return Err(PlanError::Unsupported {
444                    feature: "SHOW CREATE for unmanaged clusters".to_string(),
445                    discussion_no: None,
446                });
447            }
448        };
449        let workload_class = self.config.workload_class.clone();
450        Ok(CreateClusterPlan {
451            name,
452            variant,
453            workload_class,
454        })
455    }
456}
457
458impl From<Cluster> for durable::Cluster {
459    fn from(cluster: Cluster) -> durable::Cluster {
460        durable::Cluster {
461            id: cluster.id,
462            name: cluster.name,
463            owner_id: cluster.owner_id,
464            privileges: cluster.privileges.into_all_values().collect(),
465            config: cluster.config.into(),
466        }
467    }
468}
469
470impl From<durable::Cluster> for Cluster {
471    fn from(
472        durable::Cluster {
473            id,
474            name,
475            owner_id,
476            privileges,
477            config,
478        }: durable::Cluster,
479    ) -> Self {
480        Cluster {
481            name: name.clone(),
482            id,
483            bound_objects: BTreeSet::new(),
484            log_indexes: BTreeMap::new(),
485            replica_id_by_name_: BTreeMap::new(),
486            replicas_by_id_: BTreeMap::new(),
487            owner_id,
488            privileges: PrivilegeMap::from_mz_acl_items(privileges),
489            config: config.into(),
490        }
491    }
492}
493
494impl UpdateFrom<durable::Cluster> for Cluster {
495    fn update_from(
496        &mut self,
497        durable::Cluster {
498            id,
499            name,
500            owner_id,
501            privileges,
502            config,
503        }: durable::Cluster,
504    ) {
505        self.id = id;
506        self.name = name;
507        self.owner_id = owner_id;
508        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
509        self.config = config.into();
510    }
511}
512
513#[derive(Debug, Serialize, Clone, PartialEq)]
514pub struct ClusterReplica {
515    pub name: String,
516    pub cluster_id: ClusterId,
517    pub replica_id: ReplicaId,
518    pub config: ReplicaConfig,
519    pub owner_id: RoleId,
520}
521
522impl From<ClusterReplica> for durable::ClusterReplica {
523    fn from(replica: ClusterReplica) -> durable::ClusterReplica {
524        durable::ClusterReplica {
525            cluster_id: replica.cluster_id,
526            replica_id: replica.replica_id,
527            name: replica.name,
528            config: replica.config.into(),
529            owner_id: replica.owner_id,
530        }
531    }
532}
533
534#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
535pub struct ClusterReplicaProcessStatus {
536    pub status: ClusterStatus,
537    pub time: DateTime<Utc>,
538}
539
540#[derive(Debug, Serialize, Clone, PartialEq)]
541pub struct SourceReferences {
542    pub updated_at: u64,
543    pub references: Vec<SourceReference>,
544}
545
546#[derive(Debug, Serialize, Clone, PartialEq)]
547pub struct SourceReference {
548    pub name: String,
549    pub namespace: Option<String>,
550    pub columns: Vec<String>,
551}
552
553impl From<SourceReference> for durable::SourceReference {
554    fn from(source_reference: SourceReference) -> durable::SourceReference {
555        durable::SourceReference {
556            name: source_reference.name,
557            namespace: source_reference.namespace,
558            columns: source_reference.columns,
559        }
560    }
561}
562
563impl SourceReferences {
564    pub fn to_durable(self, source_id: CatalogItemId) -> durable::SourceReferences {
565        durable::SourceReferences {
566            source_id,
567            updated_at: self.updated_at,
568            references: self.references.into_iter().map(Into::into).collect(),
569        }
570    }
571}
572
573impl From<durable::SourceReference> for SourceReference {
574    fn from(source_reference: durable::SourceReference) -> SourceReference {
575        SourceReference {
576            name: source_reference.name,
577            namespace: source_reference.namespace,
578            columns: source_reference.columns,
579        }
580    }
581}
582
583impl From<durable::SourceReferences> for SourceReferences {
584    fn from(source_references: durable::SourceReferences) -> SourceReferences {
585        SourceReferences {
586            updated_at: source_references.updated_at,
587            references: source_references
588                .references
589                .into_iter()
590                .map(|source_reference| source_reference.into())
591                .collect(),
592        }
593    }
594}
595
596impl From<mz_sql::plan::SourceReference> for SourceReference {
597    fn from(source_reference: mz_sql::plan::SourceReference) -> SourceReference {
598        SourceReference {
599            name: source_reference.name,
600            namespace: source_reference.namespace,
601            columns: source_reference.columns,
602        }
603    }
604}
605
606impl From<mz_sql::plan::SourceReferences> for SourceReferences {
607    fn from(source_references: mz_sql::plan::SourceReferences) -> SourceReferences {
608        SourceReferences {
609            updated_at: source_references.updated_at,
610            references: source_references
611                .references
612                .into_iter()
613                .map(|source_reference| source_reference.into())
614                .collect(),
615        }
616    }
617}
618
619impl From<SourceReferences> for mz_sql::plan::SourceReferences {
620    fn from(source_references: SourceReferences) -> mz_sql::plan::SourceReferences {
621        mz_sql::plan::SourceReferences {
622            updated_at: source_references.updated_at,
623            references: source_references
624                .references
625                .into_iter()
626                .map(|source_reference| source_reference.into())
627                .collect(),
628        }
629    }
630}
631
632impl From<SourceReference> for mz_sql::plan::SourceReference {
633    fn from(source_reference: SourceReference) -> mz_sql::plan::SourceReference {
634        mz_sql::plan::SourceReference {
635            name: source_reference.name,
636            namespace: source_reference.namespace,
637            columns: source_reference.columns,
638        }
639    }
640}
641
642#[derive(Clone, Debug, Serialize)]
643pub struct CatalogEntry {
644    pub item: CatalogItem,
645    #[serde(skip)]
646    pub referenced_by: Vec<CatalogItemId>,
647    // TODO(database-issues#7922)––this should have an invariant tied to it that all
648    // dependents (i.e. entries in this field) have IDs greater than this
649    // entry's ID.
650    #[serde(skip)]
651    pub used_by: Vec<CatalogItemId>,
652    pub id: CatalogItemId,
653    pub oid: u32,
654    pub name: QualifiedItemName,
655    pub owner_id: RoleId,
656    pub privileges: PrivilegeMap,
657}
658
659/// A [`CatalogEntry`] that is associated with a specific "collection" of data.
660/// A single item in the catalog may be associated with multiple "collections".
661///
662/// Here "collection" generally means a pTVC, e.g. a Persist Shard, an Index, a
663/// currently running dataflow, etc.
664///
665/// Items in the Catalog have a stable name -> ID mapping, in other words for
666/// the entire lifetime of an object its [`CatalogItemId`] will _never_ change.
667/// Similarly, we need to maintain a stable mapping from [`GlobalId`] to pTVC.
668/// This presents a challenge when `ALTER`-ing an object, e.g. adding columns
669/// to a table. We can't just change the schema of the underlying Persist Shard
670/// because that would be rebinding the [`GlobalId`] of the pTVC. Instead we
671/// allocate a new [`GlobalId`] to refer to the new version of the table, and
672/// then the [`CatalogEntry`] tracks the [`GlobalId`] for each version.
673#[derive(Clone, Debug)]
674pub struct CatalogCollectionEntry {
675    pub entry: CatalogEntry,
676    pub version: RelationVersionSelector,
677}
678
679impl CatalogCollectionEntry {
680    pub fn relation_desc(&self) -> Option<Cow<'_, RelationDesc>> {
681        self.item().relation_desc(self.version)
682    }
683}
684
685impl mz_sql::catalog::CatalogCollectionItem for CatalogCollectionEntry {
686    fn relation_desc(&self) -> Option<Cow<'_, RelationDesc>> {
687        self.item().relation_desc(self.version)
688    }
689
690    fn global_id(&self) -> GlobalId {
691        self.entry
692            .item()
693            .global_id_for_version(self.version)
694            .expect("catalog corruption, missing version!")
695    }
696}
697
698impl Deref for CatalogCollectionEntry {
699    type Target = CatalogEntry;
700
701    fn deref(&self) -> &CatalogEntry {
702        &self.entry
703    }
704}
705
706impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry {
707    fn name(&self) -> &QualifiedItemName {
708        self.entry.name()
709    }
710
711    fn id(&self) -> CatalogItemId {
712        self.entry.id()
713    }
714
715    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
716        Box::new(self.entry.global_ids())
717    }
718
719    fn oid(&self) -> u32 {
720        self.entry.oid()
721    }
722
723    fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
724        self.entry.func()
725    }
726
727    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
728        self.entry.source_desc()
729    }
730
731    fn connection(
732        &self,
733    ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
734    {
735        mz_sql::catalog::CatalogItem::connection(&self.entry)
736    }
737
738    fn create_sql(&self) -> &str {
739        self.entry.create_sql()
740    }
741
742    fn item_type(&self) -> SqlCatalogItemType {
743        self.entry.item_type()
744    }
745
746    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
747        self.entry.index_details()
748    }
749
750    fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
751        self.entry.writable_table_details()
752    }
753
754    fn replacement_target(&self) -> Option<CatalogItemId> {
755        self.entry.replacement_target()
756    }
757
758    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
759        self.entry.type_details()
760    }
761
762    fn references(&self) -> &ResolvedIds {
763        self.entry.references()
764    }
765
766    fn uses(&self) -> BTreeSet<CatalogItemId> {
767        self.entry.uses()
768    }
769
770    fn referenced_by(&self) -> &[CatalogItemId] {
771        self.entry.referenced_by()
772    }
773
774    fn used_by(&self) -> &[CatalogItemId] {
775        self.entry.used_by()
776    }
777
778    fn subsource_details(
779        &self,
780    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
781        self.entry.subsource_details()
782    }
783
784    fn source_export_details(
785        &self,
786    ) -> Option<(
787        CatalogItemId,
788        &UnresolvedItemName,
789        &SourceExportDetails,
790        &SourceExportDataConfig<ReferencedConnection>,
791    )> {
792        self.entry.source_export_details()
793    }
794
795    fn is_progress_source(&self) -> bool {
796        self.entry.is_progress_source()
797    }
798
799    fn progress_id(&self) -> Option<CatalogItemId> {
800        self.entry.progress_id()
801    }
802
803    fn owner_id(&self) -> RoleId {
804        *self.entry.owner_id()
805    }
806
807    fn privileges(&self) -> &PrivilegeMap {
808        self.entry.privileges()
809    }
810
811    fn cluster_id(&self) -> Option<ClusterId> {
812        self.entry.item().cluster_id()
813    }
814
815    fn at_version(
816        &self,
817        version: RelationVersionSelector,
818    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
819        Box::new(CatalogCollectionEntry {
820            entry: self.entry.clone(),
821            version,
822        })
823    }
824
825    fn latest_version(&self) -> Option<RelationVersion> {
826        self.entry.latest_version()
827    }
828}
829
830#[derive(Debug, Clone, Serialize)]
831pub enum CatalogItem {
832    Table(Table),
833    Source(Source),
834    Log(Log),
835    View(View),
836    MaterializedView(MaterializedView),
837    Sink(Sink),
838    Index(Index),
839    Type(Type),
840    Func(Func),
841    Secret(Secret),
842    Connection(Connection),
843}
844
845impl From<CatalogEntry> for durable::Item {
846    fn from(entry: CatalogEntry) -> durable::Item {
847        let (create_sql, global_id, extra_versions) = entry.item.into_serialized();
848        durable::Item {
849            id: entry.id,
850            oid: entry.oid,
851            global_id,
852            schema_id: entry.name.qualifiers.schema_spec.into(),
853            name: entry.name.item,
854            create_sql,
855            owner_id: entry.owner_id,
856            privileges: entry.privileges.into_all_values().collect(),
857            extra_versions,
858        }
859    }
860}
861
862#[derive(Debug, Clone, Serialize)]
863pub struct Table {
864    /// Parse-able SQL that defines this table.
865    pub create_sql: Option<String>,
866    /// [`VersionedRelationDesc`] of this table, derived from the `create_sql`.
867    pub desc: VersionedRelationDesc,
868    /// Versions of this table, and the [`GlobalId`]s that refer to them.
869    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
870    pub collections: BTreeMap<RelationVersion, GlobalId>,
871    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
872    #[serde(skip)]
873    pub conn_id: Option<ConnectionId>,
874    /// Other catalog objects referenced by this table, e.g. custom types.
875    pub resolved_ids: ResolvedIds,
876    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
877    pub custom_logical_compaction_window: Option<CompactionWindow>,
878    /// Whether the table's logical compaction window is controlled by the ['metrics_retention']
879    /// session variable.
880    ///
881    /// ['metrics_retention']: mz_sql::session::vars::METRICS_RETENTION
882    pub is_retained_metrics_object: bool,
883    /// Where data for this table comes from, e.g. `INSERT` statements or an upstream source.
884    pub data_source: TableDataSource,
885}
886
887impl Table {
888    pub fn timeline(&self) -> Timeline {
889        match &self.data_source {
890            // The Coordinator controls insertions for writable tables
891            // (including system tables), so they are realtime.
892            TableDataSource::TableWrites { .. } => Timeline::EpochMilliseconds,
893            TableDataSource::DataSource { timeline, .. } => timeline.clone(),
894        }
895    }
896
897    /// Returns all of the [`GlobalId`]s that this [`Table`] can be referenced by.
898    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
899        self.collections.values().copied()
900    }
901
902    /// Returns the latest [`GlobalId`] for this [`Table`] which should be used for writes.
903    pub fn global_id_writes(&self) -> GlobalId {
904        *self
905            .collections
906            .last_key_value()
907            .expect("at least one version of a table")
908            .1
909    }
910
911    /// Returns all of the collections and their [`RelationDesc`]s associated with this [`Table`].
912    pub fn collection_descs(
913        &self,
914    ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
915        self.collections.iter().map(|(version, gid)| {
916            let desc = self
917                .desc
918                .at_version(RelationVersionSelector::Specific(*version));
919            (*gid, *version, desc)
920        })
921    }
922
923    /// Returns the [`RelationDesc`] for a specific [`GlobalId`].
924    pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
925        let (version, _gid) = self
926            .collections
927            .iter()
928            .find(|(_version, gid)| *gid == id)
929            .expect("GlobalId to exist");
930        self.desc
931            .at_version(RelationVersionSelector::Specific(*version))
932    }
933}
934
935#[derive(Clone, Debug, Serialize)]
936pub enum TableDataSource {
937    /// The table owns data created via INSERT/UPDATE/DELETE statements.
938    TableWrites {
939        #[serde(skip)]
940        defaults: Vec<Expr<Aug>>,
941    },
942
943    /// The table receives its data from the identified `DataSourceDesc`.
944    /// This table type does not support INSERT/UPDATE/DELETE statements.
945    DataSource {
946        desc: DataSourceDesc,
947        timeline: Timeline,
948    },
949}
950
951#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
952pub enum DataSourceDesc {
953    /// Receives data from an external system
954    Ingestion {
955        desc: SourceDesc<ReferencedConnection>,
956        cluster_id: ClusterId,
957    },
958    /// Receives data from an external system
959    OldSyntaxIngestion {
960        desc: SourceDesc<ReferencedConnection>,
961        cluster_id: ClusterId,
962        // If we're dealing with an old syntax ingestion the progress id will be some other collection
963        // and the ingestion itself will have the data from an external reference
964        progress_subsource: CatalogItemId,
965        data_config: SourceExportDataConfig<ReferencedConnection>,
966        details: SourceExportDetails,
967    },
968    /// This source receives its data from the identified ingestion,
969    /// specifically the output identified by `external_reference`.
970    /// N.B. that `external_reference` should not be used to identify
971    /// anything downstream of purification, as the purification process
972    /// encodes source-specific identifiers into the `details` struct.
973    /// The `external_reference` field is only used here for displaying
974    /// human-readable names in system tables.
975    IngestionExport {
976        ingestion_id: CatalogItemId,
977        external_reference: UnresolvedItemName,
978        details: SourceExportDetails,
979        data_config: SourceExportDataConfig<ReferencedConnection>,
980    },
981    /// Receives introspection data from an internal system
982    Introspection(IntrospectionType),
983    /// Receives data from the source's reclocking/remapping operations.
984    Progress,
985    /// Receives data from HTTP requests.
986    Webhook {
987        /// Optional components used to validation a webhook request.
988        validate_using: Option<WebhookValidation>,
989        /// Describes how we deserialize the body of a webhook request.
990        body_format: WebhookBodyFormat,
991        /// Describes whether or not to include headers and how to map them.
992        headers: WebhookHeaders,
993        /// The cluster which this source is associated with.
994        cluster_id: ClusterId,
995    },
996    /// Exposes the contents of the catalog shard.
997    Catalog,
998}
999
1000impl From<IntrospectionType> for DataSourceDesc {
1001    fn from(typ: IntrospectionType) -> Self {
1002        Self::Introspection(typ)
1003    }
1004}
1005
1006impl DataSourceDesc {
1007    /// The key and value formats of the data source.
1008    pub fn formats(&self) -> (Option<&str>, Option<&str>) {
1009        match &self {
1010            DataSourceDesc::Ingestion { .. } => (None, None),
1011            DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1012                match &data_config.encoding.as_ref() {
1013                    Some(encoding) => match &encoding.key {
1014                        Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1015                        None => (None, Some(encoding.value.type_())),
1016                    },
1017                    None => (None, None),
1018                }
1019            }
1020            DataSourceDesc::IngestionExport { data_config, .. } => match &data_config.encoding {
1021                Some(encoding) => match &encoding.key {
1022                    Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1023                    None => (None, Some(encoding.value.type_())),
1024                },
1025                None => (None, None),
1026            },
1027            DataSourceDesc::Introspection(_)
1028            | DataSourceDesc::Webhook { .. }
1029            | DataSourceDesc::Progress
1030            | DataSourceDesc::Catalog => (None, None),
1031        }
1032    }
1033
1034    /// Envelope of the data source.
1035    pub fn envelope(&self) -> Option<&str> {
1036        // Note how "none"/"append-only" is different from `None`. Source
1037        // sources don't have an envelope (internal logs, for example), while
1038        // other sources have an envelope that we call the "NONE"-envelope.
1039
1040        fn envelope_string(envelope: &SourceEnvelope) -> &str {
1041            match envelope {
1042                SourceEnvelope::None(_) => "none",
1043                SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
1044                    mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert",
1045                    mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
1046                        // NOTE(aljoscha): Should we somehow mark that this is
1047                        // using upsert internally? See note above about
1048                        // DEBEZIUM.
1049                        "debezium"
1050                    }
1051                    mz_storage_types::sources::envelope::UpsertStyle::ValueErrInline { .. } => {
1052                        "upsert-value-err-inline"
1053                    }
1054                },
1055                SourceEnvelope::CdcV2 => {
1056                    // TODO(aljoscha): Should we even report this? It's
1057                    // currently not exposed.
1058                    "materialize"
1059                }
1060            }
1061        }
1062
1063        match self {
1064            // NOTE(aljoscha): We could move the block for ingestions into
1065            // `SourceEnvelope` itself, but that one feels more like an internal
1066            // thing and adapter should own how we represent envelopes as a
1067            // string? It would not be hard to convince me otherwise, though.
1068            DataSourceDesc::Ingestion { .. } => None,
1069            DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1070                Some(envelope_string(&data_config.envelope))
1071            }
1072            DataSourceDesc::IngestionExport { data_config, .. } => {
1073                Some(envelope_string(&data_config.envelope))
1074            }
1075            DataSourceDesc::Introspection(_)
1076            | DataSourceDesc::Webhook { .. }
1077            | DataSourceDesc::Progress
1078            | DataSourceDesc::Catalog => None,
1079        }
1080    }
1081}
1082
1083#[derive(Debug, Clone, Serialize)]
1084pub struct Source {
1085    /// Parse-able SQL that defines this table.
1086    pub create_sql: Option<String>,
1087    /// [`GlobalId`] used to reference this source from outside the catalog.
1088    pub global_id: GlobalId,
1089    // TODO: Unskip: currently blocked on some inner BTreeMap<X, _> problems.
1090    #[serde(skip)]
1091    pub data_source: DataSourceDesc,
1092    /// [`RelationDesc`] of this source, derived from the `create_sql`.
1093    pub desc: RelationDesc,
1094    /// The timeline this source exists on.
1095    pub timeline: Timeline,
1096    /// Other catalog objects referenced by this table, e.g. custom types.
1097    pub resolved_ids: ResolvedIds,
1098    /// This value is ignored for subsources, i.e. for
1099    /// [`DataSourceDesc::IngestionExport`]. Instead, it uses the primary
1100    /// sources logical compaction window.
1101    pub custom_logical_compaction_window: Option<CompactionWindow>,
1102    /// Whether the source's logical compaction window is controlled by
1103    /// METRICS_RETENTION
1104    pub is_retained_metrics_object: bool,
1105}
1106
1107impl Source {
1108    /// Creates a new `Source`.
1109    ///
1110    /// # Panics
1111    /// - If an ingestion-based plan is not given a cluster_id.
1112    /// - If a non-ingestion-based source has a defined cluster config in its plan.
1113    /// - If a non-ingestion-based source is given a cluster_id.
1114    pub fn new(
1115        plan: CreateSourcePlan,
1116        global_id: GlobalId,
1117        resolved_ids: ResolvedIds,
1118        custom_logical_compaction_window: Option<CompactionWindow>,
1119        is_retained_metrics_object: bool,
1120    ) -> Source {
1121        Source {
1122            create_sql: Some(plan.source.create_sql),
1123            data_source: match plan.source.data_source {
1124                mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1125                    desc,
1126                    cluster_id: plan
1127                        .in_cluster
1128                        .expect("ingestion-based sources must be given a cluster ID"),
1129                },
1130                mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1131                    desc,
1132                    progress_subsource,
1133                    data_config,
1134                    details,
1135                } => DataSourceDesc::OldSyntaxIngestion {
1136                    desc,
1137                    cluster_id: plan
1138                        .in_cluster
1139                        .expect("ingestion-based sources must be given a cluster ID"),
1140                    progress_subsource,
1141                    data_config,
1142                    details,
1143                },
1144                mz_sql::plan::DataSourceDesc::Progress => {
1145                    assert!(
1146                        plan.in_cluster.is_none(),
1147                        "subsources must not have a host config or cluster_id defined"
1148                    );
1149                    DataSourceDesc::Progress
1150                }
1151                mz_sql::plan::DataSourceDesc::IngestionExport {
1152                    ingestion_id,
1153                    external_reference,
1154                    details,
1155                    data_config,
1156                } => {
1157                    assert!(
1158                        plan.in_cluster.is_none(),
1159                        "subsources must not have a host config or cluster_id defined"
1160                    );
1161                    DataSourceDesc::IngestionExport {
1162                        ingestion_id,
1163                        external_reference,
1164                        details,
1165                        data_config,
1166                    }
1167                }
1168                mz_sql::plan::DataSourceDesc::Webhook {
1169                    validate_using,
1170                    body_format,
1171                    headers,
1172                    cluster_id,
1173                } => {
1174                    mz_ore::soft_assert_or_log!(
1175                        cluster_id.is_none(),
1176                        "cluster_id set at Source level for Webhooks"
1177                    );
1178                    DataSourceDesc::Webhook {
1179                        validate_using,
1180                        body_format,
1181                        headers,
1182                        cluster_id: plan
1183                            .in_cluster
1184                            .expect("webhook sources must be given a cluster ID"),
1185                    }
1186                }
1187            },
1188            desc: plan.source.desc,
1189            global_id,
1190            timeline: plan.timeline,
1191            resolved_ids,
1192            custom_logical_compaction_window: plan
1193                .source
1194                .compaction_window
1195                .or(custom_logical_compaction_window),
1196            is_retained_metrics_object,
1197        }
1198    }
1199
1200    /// Type of the source.
1201    pub fn source_type(&self) -> &str {
1202        match &self.data_source {
1203            DataSourceDesc::Ingestion { desc, .. }
1204            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.name(),
1205            DataSourceDesc::Progress => "progress",
1206            DataSourceDesc::IngestionExport { .. } => "subsource",
1207            DataSourceDesc::Introspection(_) | DataSourceDesc::Catalog => "source",
1208            DataSourceDesc::Webhook { .. } => "webhook",
1209        }
1210    }
1211
1212    /// Connection ID of the source, if one exists.
1213    pub fn connection_id(&self) -> Option<CatalogItemId> {
1214        match &self.data_source {
1215            DataSourceDesc::Ingestion { desc, .. }
1216            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.connection_id(),
1217            DataSourceDesc::IngestionExport { .. }
1218            | DataSourceDesc::Introspection(_)
1219            | DataSourceDesc::Webhook { .. }
1220            | DataSourceDesc::Progress
1221            | DataSourceDesc::Catalog => None,
1222        }
1223    }
1224
1225    /// The single [`GlobalId`] that refers to this Source.
1226    pub fn global_id(&self) -> GlobalId {
1227        self.global_id
1228    }
1229
1230    /// The expensive resource that each source consumes is persist shards. To
1231    /// prevent abuse, we want to prevent users from creating sources that use an
1232    /// unbounded number of persist shards. But we also don't want to count
1233    /// persist shards that are mandated by the system (e.g., the progress
1234    /// shard) so that future versions of Materialize can introduce additional
1235    /// per-source shards (e.g., a per-source status shard) without impacting
1236    /// the limit calculation.
1237    pub fn user_controllable_persist_shard_count(&self) -> i64 {
1238        match &self.data_source {
1239            DataSourceDesc::Ingestion { .. } => 0,
1240            DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1241                match &desc.connection {
1242                    // These multi-output sources do not use their primary
1243                    // source's data shard, so we don't include it in accounting
1244                    // for users.
1245                    GenericSourceConnection::Postgres(_)
1246                    | GenericSourceConnection::MySql(_)
1247                    | GenericSourceConnection::SqlServer(_) => 0,
1248                    GenericSourceConnection::LoadGenerator(lg) => match lg.load_generator {
1249                        // Load generators that output data in their primary shard
1250                        LoadGenerator::Clock
1251                        | LoadGenerator::Counter { .. }
1252                        | LoadGenerator::Datums
1253                        | LoadGenerator::KeyValue(_) => 1,
1254                        LoadGenerator::Auction
1255                        | LoadGenerator::Marketing
1256                        | LoadGenerator::Tpch { .. } => 0,
1257                    },
1258                    GenericSourceConnection::Kafka(_) => 1,
1259                }
1260            }
1261            //  DataSourceDesc::IngestionExport represents a subsource, which
1262            //  use a data shard.
1263            DataSourceDesc::IngestionExport { .. } => 1,
1264            DataSourceDesc::Webhook { .. } => 1,
1265            // Introspection, catalog, and progress subsources are not under the user's control, so
1266            // shouldn't count toward their quota.
1267            DataSourceDesc::Introspection(_)
1268            | DataSourceDesc::Progress
1269            | DataSourceDesc::Catalog => 0,
1270        }
1271    }
1272}
1273
1274#[derive(Debug, Clone, Serialize)]
1275pub struct Log {
1276    /// The category of data this log stores.
1277    pub variant: LogVariant,
1278    /// [`GlobalId`] used to reference this log from outside the catalog.
1279    pub global_id: GlobalId,
1280}
1281
1282impl Log {
1283    /// The single [`GlobalId`] that refers to this Log.
1284    pub fn global_id(&self) -> GlobalId {
1285        self.global_id
1286    }
1287}
1288
1289#[derive(Debug, Clone, Serialize)]
1290pub struct Sink {
1291    /// Parse-able SQL that defines this sink.
1292    pub create_sql: String,
1293    /// [`GlobalId`] used to reference this sink from outside the catalog, e.g storage.
1294    pub global_id: GlobalId,
1295    /// Collection we read into this sink.
1296    pub from: GlobalId,
1297    /// Connection to the external service we're sinking into, e.g. Kafka.
1298    pub connection: StorageSinkConnection<ReferencedConnection>,
1299    /// Envelope we use to sink into the external system.
1300    ///
1301    /// TODO(guswynn): this probably should just be in the `connection`.
1302    pub envelope: SinkEnvelope,
1303    /// Emit an initial snapshot into the sink.
1304    pub with_snapshot: bool,
1305    /// Used to fence other writes into this sink as we evolve the upstream materialized view.
1306    pub version: u64,
1307    /// Other catalog objects this sink references.
1308    pub resolved_ids: ResolvedIds,
1309    /// Cluster this sink runs on.
1310    pub cluster_id: ClusterId,
1311    /// Commit interval for the sink.
1312    pub commit_interval: Option<Duration>,
1313}
1314
1315impl Sink {
1316    pub fn sink_type(&self) -> &str {
1317        self.connection.name()
1318    }
1319
1320    /// Envelope of the sink.
1321    pub fn envelope(&self) -> Option<&str> {
1322        match &self.envelope {
1323            SinkEnvelope::Debezium => Some("debezium"),
1324            SinkEnvelope::Upsert => Some("upsert"),
1325            SinkEnvelope::Append => Some("append"),
1326        }
1327    }
1328
1329    /// Output a combined format string of the sink. For legacy reasons
1330    /// if the key-format is none or the key & value formats are
1331    /// both the same (either avro or json), we return the value format name,
1332    /// otherwise we return a composite name.
1333    pub fn combined_format(&self) -> Option<Cow<'_, str>> {
1334        match &self.connection {
1335            StorageSinkConnection::Kafka(connection) => Some(connection.format.get_format_name()),
1336            StorageSinkConnection::Iceberg(_) => None,
1337        }
1338    }
1339
1340    /// Output distinct key_format and value_format of the sink.
1341    pub fn formats(&self) -> Option<(Option<&str>, &str)> {
1342        match &self.connection {
1343            StorageSinkConnection::Kafka(connection) => {
1344                let key_format = connection
1345                    .format
1346                    .key_format
1347                    .as_ref()
1348                    .map(|f| f.get_format_name());
1349                let value_format = connection.format.value_format.get_format_name();
1350                Some((key_format, value_format))
1351            }
1352            StorageSinkConnection::Iceberg(_) => None,
1353        }
1354    }
1355
1356    pub fn connection_id(&self) -> Option<CatalogItemId> {
1357        self.connection.connection_id()
1358    }
1359
1360    /// The single [`GlobalId`] that this Sink can be referenced by.
1361    pub fn global_id(&self) -> GlobalId {
1362        self.global_id
1363    }
1364}
1365
1366#[derive(Debug, Clone, Serialize)]
1367pub struct View {
1368    /// Parse-able SQL that defines this view.
1369    pub create_sql: String,
1370    /// [`GlobalId`] used to reference this view from outside the catalog, e.g. compute.
1371    pub global_id: GlobalId,
1372    /// Unoptimized high-level expression from parsing the `create_sql`.
1373    pub raw_expr: Arc<HirRelationExpr>,
1374    /// Optimized mid-level expression from (locally) optimizing the `raw_expr`.
1375    pub locally_optimized_expr: Arc<OptimizedMirRelationExpr>,
1376    /// Columns of this view.
1377    pub desc: RelationDesc,
1378    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
1379    pub conn_id: Option<ConnectionId>,
1380    /// Other catalog objects that are referenced by this view, determined at name resolution.
1381    pub resolved_ids: ResolvedIds,
1382    /// All of the catalog objects that are referenced by this view.
1383    pub dependencies: DependencyIds,
1384}
1385
1386impl View {
1387    /// The single [`GlobalId`] this [`View`] can be referenced by.
1388    pub fn global_id(&self) -> GlobalId {
1389        self.global_id
1390    }
1391}
1392
1393#[derive(Debug, Clone, Serialize)]
1394pub struct MaterializedView {
1395    /// Parse-able SQL that defines this materialized view.
1396    pub create_sql: String,
1397    /// Versions of this materialized view, and the [`GlobalId`]s that refer to them.
1398    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
1399    pub collections: BTreeMap<RelationVersion, GlobalId>,
1400    /// Raw high-level expression from planning, derived from the `create_sql`.
1401    pub raw_expr: Arc<HirRelationExpr>,
1402    /// Optimized mid-level expression, derived from the `raw_expr`.
1403    pub locally_optimized_expr: Arc<OptimizedMirRelationExpr>,
1404    /// [`VersionedRelationDesc`] of this materialized view, derived from the `create_sql`.
1405    pub desc: VersionedRelationDesc,
1406    /// Other catalog items that this materialized view references, determined at name resolution.
1407    pub resolved_ids: ResolvedIds,
1408    /// All of the catalog objects that are referenced by this view.
1409    pub dependencies: DependencyIds,
1410    /// ID of the materialized view this materialized view is intended to replace.
1411    pub replacement_target: Option<CatalogItemId>,
1412    /// Cluster that this materialized view runs on.
1413    pub cluster_id: ClusterId,
1414    /// If set, only install this materialized view's dataflow on the specified replica.
1415    pub target_replica: Option<ReplicaId>,
1416    /// Column indexes that we assert are not `NULL`.
1417    ///
1418    /// TODO(parkmycar): Switch this to use the `ColumnIdx` type.
1419    pub non_null_assertions: Vec<usize>,
1420    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
1421    pub custom_logical_compaction_window: Option<CompactionWindow>,
1422    /// Schedule to refresh this materialized view, e.g. set via `REFRESH EVERY` option.
1423    pub refresh_schedule: Option<RefreshSchedule>,
1424    /// The initial `as_of` of the storage collection associated with the materialized view.
1425    ///
1426    /// Note: This doesn't change upon restarts.
1427    /// (The dataflow's initial `as_of` can be different.)
1428    pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1429    // The catalog `dump` method uses serde to serialize catalog state, e.g., Testdrive catalog
1430    // consistency checks do two dumps and compare them. One of these states comes from the durable
1431    // catalog, but the following fields are not restored when the consistency check loads the
1432    // durable catalog, hence we need `#[serde(skip)]`.
1433    /// Optimized global MIR plan, set after global optimization.
1434    #[serde(skip)]
1435    pub optimized_plan: Option<Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
1436    /// Physical (LIR) plan, set after physical optimization.
1437    #[serde(skip)]
1438    pub physical_plan: Option<Arc<DataflowDescription<ComputePlan>>>,
1439    /// Dataflow metainfo (optimizer notices, etc.), set after optimization.
1440    #[serde(skip)]
1441    pub dataflow_metainfo: Option<DataflowMetainfo<Arc<OptimizerNotice>>>,
1442}
1443
1444impl MaterializedView {
1445    /// Returns all [`GlobalId`]s that this [`MaterializedView`] can be referenced by.
1446    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1447        self.collections.values().copied()
1448    }
1449
1450    /// The latest [`GlobalId`] for this [`MaterializedView`] which represents the writing
1451    /// version.
1452    pub fn global_id_writes(&self) -> GlobalId {
1453        *self
1454            .collections
1455            .last_key_value()
1456            .expect("at least one version of a materialized view")
1457            .1
1458    }
1459
1460    /// Returns all collections and their [`RelationDesc`]s associated with this [`MaterializedView`].
1461    pub fn collection_descs(
1462        &self,
1463    ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
1464        self.collections.iter().map(|(version, gid)| {
1465            let desc = self
1466                .desc
1467                .at_version(RelationVersionSelector::Specific(*version));
1468            (*gid, *version, desc)
1469        })
1470    }
1471
1472    /// Returns the [`RelationDesc`] for a specific [`GlobalId`].
1473    pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
1474        let (version, _gid) = self
1475            .collections
1476            .iter()
1477            .find(|(_version, gid)| *gid == id)
1478            .expect("GlobalId to exist");
1479        self.desc
1480            .at_version(RelationVersionSelector::Specific(*version))
1481    }
1482
1483    /// Apply the given replacement materialized view to this [`MaterializedView`].
1484    pub fn apply_replacement(&mut self, replacement: Self) {
1485        let target_id = replacement
1486            .replacement_target
1487            .expect("replacement has target");
1488
1489        fn parse(create_sql: &str) -> mz_sql::ast::CreateMaterializedViewStatement<Raw> {
1490            let res = mz_sql::parse::parse(create_sql).unwrap_or_else(|e| {
1491                panic!("invalid create_sql persisted in catalog: {e}\n{create_sql}");
1492            });
1493            if let Statement::CreateMaterializedView(cmvs) = res.into_element().ast {
1494                cmvs
1495            } else {
1496                panic!("invalid MV create_sql persisted in catalog\n{create_sql}");
1497            }
1498        }
1499
1500        let old_stmt = parse(&self.create_sql);
1501        let rpl_stmt = parse(&replacement.create_sql);
1502        let new_stmt = mz_sql::ast::CreateMaterializedViewStatement {
1503            if_exists: old_stmt.if_exists,
1504            name: old_stmt.name,
1505            columns: rpl_stmt.columns,
1506            replacement_for: None,
1507            in_cluster: rpl_stmt.in_cluster,
1508            in_cluster_replica: rpl_stmt.in_cluster_replica,
1509            query: rpl_stmt.query,
1510            as_of: rpl_stmt.as_of,
1511            with_options: rpl_stmt.with_options,
1512        };
1513        let create_sql = new_stmt.to_ast_string_stable();
1514
1515        let mut collections = std::mem::take(&mut self.collections);
1516        // Note: We can't use `self.desc.latest_version` here because a replacement doesn't
1517        // necessary evolve the relation schema, so that version might be lower than the actual
1518        // latest version.
1519        let latest_version = collections.keys().max().expect("at least one version");
1520        let new_version = latest_version.bump();
1521        collections.insert(new_version, replacement.global_id_writes());
1522
1523        let mut resolved_ids = replacement.resolved_ids;
1524        resolved_ids.remove_item(&target_id);
1525        let mut dependencies = replacement.dependencies;
1526        dependencies.0.remove(&target_id);
1527
1528        *self = Self {
1529            create_sql,
1530            collections,
1531            raw_expr: replacement.raw_expr,
1532            locally_optimized_expr: replacement.locally_optimized_expr,
1533            desc: replacement.desc,
1534            resolved_ids,
1535            dependencies,
1536            replacement_target: None,
1537            cluster_id: replacement.cluster_id,
1538            target_replica: replacement.target_replica,
1539            non_null_assertions: replacement.non_null_assertions,
1540            custom_logical_compaction_window: replacement.custom_logical_compaction_window,
1541            refresh_schedule: replacement.refresh_schedule,
1542            initial_as_of: replacement.initial_as_of,
1543            optimized_plan: replacement.optimized_plan,
1544            physical_plan: replacement.physical_plan,
1545            dataflow_metainfo: replacement.dataflow_metainfo,
1546        };
1547    }
1548}
1549
1550#[derive(Debug, Clone, Serialize)]
1551pub struct Index {
1552    /// Parse-able SQL that defines this table.
1553    pub create_sql: String,
1554    /// [`GlobalId`] used to reference this index from outside the catalog, e.g. compute.
1555    pub global_id: GlobalId,
1556    /// The [`GlobalId`] this Index is on.
1557    pub on: GlobalId,
1558    /// Keys of the index.
1559    pub keys: Arc<[MirScalarExpr]>,
1560    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
1561    pub conn_id: Option<ConnectionId>,
1562    /// Other catalog objects referenced by this index, e.g. the object we're indexing.
1563    pub resolved_ids: ResolvedIds,
1564    /// Cluster this index is installed on.
1565    pub cluster_id: ClusterId,
1566    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
1567    pub custom_logical_compaction_window: Option<CompactionWindow>,
1568    /// Whether the table's logical compaction window is controlled by the ['metrics_retention']
1569    /// session variable.
1570    ///
1571    /// ['metrics_retention']: mz_sql::session::vars::METRICS_RETENTION
1572    pub is_retained_metrics_object: bool,
1573    // The catalog `dump` method uses serde to serialize catalog state, e.g., Testdrive catalog
1574    // consistency checks do two dumps and compare them. One of these states comes from the durable
1575    // catalog, but the following fields are not restored when the consistency check loads the
1576    // durable catalog, hence we need `#[serde(skip)]`.
1577    /// Optimized global MIR plan, set after global optimization.
1578    #[serde(skip)]
1579    pub optimized_plan: Option<Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
1580    /// Physical (LIR) plan, set after physical optimization.
1581    #[serde(skip)]
1582    pub physical_plan: Option<Arc<DataflowDescription<ComputePlan>>>,
1583    /// Dataflow metainfo (optimizer notices, etc.), set after optimization.
1584    #[serde(skip)]
1585    pub dataflow_metainfo: Option<DataflowMetainfo<Arc<OptimizerNotice>>>,
1586}
1587
1588impl Index {
1589    /// The [`GlobalId`] that refers to this Index.
1590    pub fn global_id(&self) -> GlobalId {
1591        self.global_id
1592    }
1593}
1594
1595#[derive(Debug, Clone, Serialize)]
1596pub struct Type {
1597    /// Parse-able SQL that defines this type.
1598    pub create_sql: Option<String>,
1599    /// [`GlobalId`] used to reference this type from outside the catalog.
1600    pub global_id: GlobalId,
1601    #[serde(skip)]
1602    pub details: CatalogTypeDetails<IdReference>,
1603    /// Other catalog objects referenced by this type.
1604    pub resolved_ids: ResolvedIds,
1605}
1606
1607#[derive(Debug, Clone, Serialize)]
1608pub struct Func {
1609    /// Static definition of the function.
1610    #[serde(skip)]
1611    pub inner: &'static mz_sql::func::Func,
1612    /// [`GlobalId`] used to reference this function from outside the catalog.
1613    pub global_id: GlobalId,
1614}
1615
1616#[derive(Debug, Clone, Serialize)]
1617pub struct Secret {
1618    /// Parse-able SQL that defines this secret.
1619    pub create_sql: String,
1620    /// [`GlobalId`] used to reference this secret from outside the catalog.
1621    pub global_id: GlobalId,
1622}
1623
1624#[derive(Debug, Clone, Serialize)]
1625pub struct Connection {
1626    /// Parse-able SQL that defines this connection.
1627    pub create_sql: String,
1628    /// [`GlobalId`] used to reference this connection from the storage layer.
1629    pub global_id: GlobalId,
1630    /// The kind of connection.
1631    pub details: ConnectionDetails,
1632    /// Other objects this connection depends on.
1633    pub resolved_ids: ResolvedIds,
1634}
1635
1636impl Connection {
1637    /// The single [`GlobalId`] used to reference this connection.
1638    pub fn global_id(&self) -> GlobalId {
1639        self.global_id
1640    }
1641}
1642
1643#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
1644pub struct NetworkPolicy {
1645    pub name: String,
1646    pub id: NetworkPolicyId,
1647    pub oid: u32,
1648    pub rules: Vec<NetworkPolicyRule>,
1649    pub owner_id: RoleId,
1650    pub privileges: PrivilegeMap,
1651}
1652
1653impl From<NetworkPolicy> for durable::NetworkPolicy {
1654    fn from(policy: NetworkPolicy) -> durable::NetworkPolicy {
1655        durable::NetworkPolicy {
1656            id: policy.id,
1657            oid: policy.oid,
1658            name: policy.name,
1659            rules: policy.rules,
1660            owner_id: policy.owner_id,
1661            privileges: policy.privileges.into_all_values().collect(),
1662        }
1663    }
1664}
1665
1666impl From<durable::NetworkPolicy> for NetworkPolicy {
1667    fn from(
1668        durable::NetworkPolicy {
1669            id,
1670            oid,
1671            name,
1672            rules,
1673            owner_id,
1674            privileges,
1675        }: durable::NetworkPolicy,
1676    ) -> Self {
1677        NetworkPolicy {
1678            id,
1679            oid,
1680            name,
1681            rules,
1682            owner_id,
1683            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1684        }
1685    }
1686}
1687
1688impl UpdateFrom<durable::NetworkPolicy> for NetworkPolicy {
1689    fn update_from(
1690        &mut self,
1691        durable::NetworkPolicy {
1692            id,
1693            oid,
1694            name,
1695            rules,
1696            owner_id,
1697            privileges,
1698        }: durable::NetworkPolicy,
1699    ) {
1700        self.id = id;
1701        self.oid = oid;
1702        self.name = name;
1703        self.rules = rules;
1704        self.owner_id = owner_id;
1705        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1706    }
1707}
1708
1709impl CatalogItem {
1710    /// Returns a string indicating the type of this catalog entry.
1711    pub fn typ(&self) -> mz_sql::catalog::CatalogItemType {
1712        match self {
1713            CatalogItem::Table(_) => CatalogItemType::Table,
1714            CatalogItem::Source(_) => CatalogItemType::Source,
1715            CatalogItem::Log(_) => CatalogItemType::Source,
1716            CatalogItem::Sink(_) => CatalogItemType::Sink,
1717            CatalogItem::View(_) => CatalogItemType::View,
1718            CatalogItem::MaterializedView(_) => CatalogItemType::MaterializedView,
1719            CatalogItem::Index(_) => CatalogItemType::Index,
1720            CatalogItem::Type(_) => CatalogItemType::Type,
1721            CatalogItem::Func(_) => CatalogItemType::Func,
1722            CatalogItem::Secret(_) => CatalogItemType::Secret,
1723            CatalogItem::Connection(_) => CatalogItemType::Connection,
1724        }
1725    }
1726
1727    /// Returns the [`GlobalId`]s that reference this item, if any.
1728    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1729        let gid = match self {
1730            CatalogItem::Source(source) => source.global_id,
1731            CatalogItem::Log(log) => log.global_id,
1732            CatalogItem::Sink(sink) => sink.global_id,
1733            CatalogItem::View(view) => view.global_id,
1734            CatalogItem::MaterializedView(mv) => {
1735                return itertools::Either::Left(mv.collections.values().copied());
1736            }
1737            CatalogItem::Index(index) => index.global_id,
1738            CatalogItem::Func(func) => func.global_id,
1739            CatalogItem::Type(ty) => ty.global_id,
1740            CatalogItem::Secret(secret) => secret.global_id,
1741            CatalogItem::Connection(conn) => conn.global_id,
1742            CatalogItem::Table(table) => {
1743                return itertools::Either::Left(table.collections.values().copied());
1744            }
1745        };
1746        itertools::Either::Right(std::iter::once(gid))
1747    }
1748
1749    /// Returns the most up-to-date [`GlobalId`] for this item.
1750    ///
1751    /// Note: The only type of object that can have multiple [`GlobalId`]s are tables.
1752    pub fn latest_global_id(&self) -> GlobalId {
1753        match self {
1754            CatalogItem::Source(source) => source.global_id,
1755            CatalogItem::Log(log) => log.global_id,
1756            CatalogItem::Sink(sink) => sink.global_id,
1757            CatalogItem::View(view) => view.global_id,
1758            CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
1759            CatalogItem::Index(index) => index.global_id,
1760            CatalogItem::Func(func) => func.global_id,
1761            CatalogItem::Type(ty) => ty.global_id,
1762            CatalogItem::Secret(secret) => secret.global_id,
1763            CatalogItem::Connection(conn) => conn.global_id,
1764            CatalogItem::Table(table) => table.global_id_writes(),
1765        }
1766    }
1767
1768    /// Returns the optimized global MIR plan, if this item has one.
1769    pub fn optimized_plan(&self) -> Option<&Arc<DataflowDescription<OptimizedMirRelationExpr>>> {
1770        match self {
1771            CatalogItem::Index(idx) => idx.optimized_plan.as_ref(),
1772            CatalogItem::MaterializedView(mv) => mv.optimized_plan.as_ref(),
1773            _ => None,
1774        }
1775    }
1776
1777    /// Returns the physical (LIR) plan, if this item has one.
1778    pub fn physical_plan(&self) -> Option<&Arc<DataflowDescription<ComputePlan>>> {
1779        match self {
1780            CatalogItem::Index(idx) => idx.physical_plan.as_ref(),
1781            CatalogItem::MaterializedView(mv) => mv.physical_plan.as_ref(),
1782            _ => None,
1783        }
1784    }
1785
1786    /// Returns the dataflow metainfo, if this item has one.
1787    pub fn dataflow_metainfo(&self) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
1788        match self {
1789            CatalogItem::Index(idx) => idx.dataflow_metainfo.as_ref(),
1790            CatalogItem::MaterializedView(mv) => mv.dataflow_metainfo.as_ref(),
1791            _ => None,
1792        }
1793    }
1794
1795    /// Returns mutable references to the plan fields (`optimized_plan`,
1796    /// `physical_plan`, `dataflow_metainfo`) on plan-bearing items
1797    /// (`Index`, `MaterializedView`), or `None` for
1798    /// other item kinds.
1799    pub fn plan_fields_mut(
1800        &mut self,
1801    ) -> Option<(
1802        &mut Option<Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
1803        &mut Option<Arc<DataflowDescription<ComputePlan>>>,
1804        &mut Option<DataflowMetainfo<Arc<OptimizerNotice>>>,
1805    )> {
1806        match self {
1807            CatalogItem::Index(idx) => Some((
1808                &mut idx.optimized_plan,
1809                &mut idx.physical_plan,
1810                &mut idx.dataflow_metainfo,
1811            )),
1812            CatalogItem::MaterializedView(mv) => Some((
1813                &mut mv.optimized_plan,
1814                &mut mv.physical_plan,
1815                &mut mv.dataflow_metainfo,
1816            )),
1817            _ => None,
1818        }
1819    }
1820
1821    /// Whether this item represents a storage collection.
1822    pub fn is_storage_collection(&self) -> bool {
1823        match self {
1824            CatalogItem::Table(_)
1825            | CatalogItem::Source(_)
1826            | CatalogItem::MaterializedView(_)
1827            | CatalogItem::Sink(_) => true,
1828            CatalogItem::Log(_)
1829            | CatalogItem::View(_)
1830            | CatalogItem::Index(_)
1831            | CatalogItem::Type(_)
1832            | CatalogItem::Func(_)
1833            | CatalogItem::Secret(_)
1834            | CatalogItem::Connection(_) => false,
1835        }
1836    }
1837
1838    /// Returns the [`RelationDesc`] for items that yield rows, at the requested
1839    /// version.
1840    ///
1841    /// Some item types honor `version` so callers can ask for the schema that
1842    /// matches a specific [`GlobalId`] or historical definition. Other relation
1843    /// types ignore `version` because they have a single shape. Non-relational
1844    /// items ( for example functions, indexes, sinks, secrets, and connections)
1845    /// return `None`.
1846    pub fn relation_desc(&self, version: RelationVersionSelector) -> Option<Cow<'_, RelationDesc>> {
1847        match &self {
1848            CatalogItem::Source(src) => Some(Cow::Borrowed(&src.desc)),
1849            CatalogItem::Log(log) => Some(Cow::Owned(log.variant.desc())),
1850            CatalogItem::Table(tbl) => Some(Cow::Owned(tbl.desc.at_version(version))),
1851            CatalogItem::View(view) => Some(Cow::Borrowed(&view.desc)),
1852            CatalogItem::MaterializedView(mview) => {
1853                Some(Cow::Owned(mview.desc.at_version(version)))
1854            }
1855            CatalogItem::Func(_)
1856            | CatalogItem::Index(_)
1857            | CatalogItem::Sink(_)
1858            | CatalogItem::Secret(_)
1859            | CatalogItem::Connection(_)
1860            | CatalogItem::Type(_) => None,
1861        }
1862    }
1863
1864    pub fn func(
1865        &self,
1866        entry: &CatalogEntry,
1867    ) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
1868        match &self {
1869            CatalogItem::Func(func) => Ok(func.inner),
1870            _ => Err(SqlCatalogError::UnexpectedType {
1871                name: entry.name().item.to_string(),
1872                actual_type: entry.item_type(),
1873                expected_type: CatalogItemType::Func,
1874            }),
1875        }
1876    }
1877
1878    pub fn source_desc(
1879        &self,
1880        entry: &CatalogEntry,
1881    ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
1882        match &self {
1883            CatalogItem::Source(source) => match &source.data_source {
1884                DataSourceDesc::Ingestion { desc, .. }
1885                | DataSourceDesc::OldSyntaxIngestion { desc, .. } => Ok(Some(desc)),
1886                DataSourceDesc::IngestionExport { .. }
1887                | DataSourceDesc::Introspection(_)
1888                | DataSourceDesc::Webhook { .. }
1889                | DataSourceDesc::Progress
1890                | DataSourceDesc::Catalog => Ok(None),
1891            },
1892            _ => Err(SqlCatalogError::UnexpectedType {
1893                name: entry.name().item.to_string(),
1894                actual_type: entry.item_type(),
1895                expected_type: CatalogItemType::Source,
1896            }),
1897        }
1898    }
1899
1900    /// Reports whether this catalog entry is a progress source.
1901    pub fn is_progress_source(&self) -> bool {
1902        matches!(
1903            self,
1904            CatalogItem::Source(Source {
1905                data_source: DataSourceDesc::Progress,
1906                ..
1907            })
1908        )
1909    }
1910
1911    /// Collects the identifiers of the objects that were encountered when resolving names in the
1912    /// item's DDL statement.
1913    pub fn references(&self) -> &ResolvedIds {
1914        static EMPTY: LazyLock<ResolvedIds> = LazyLock::new(ResolvedIds::empty);
1915        match self {
1916            CatalogItem::Func(_) => &*EMPTY,
1917            CatalogItem::Index(idx) => &idx.resolved_ids,
1918            CatalogItem::Sink(sink) => &sink.resolved_ids,
1919            CatalogItem::Source(source) => &source.resolved_ids,
1920            CatalogItem::Log(_) => &*EMPTY,
1921            CatalogItem::Table(table) => &table.resolved_ids,
1922            CatalogItem::Type(typ) => &typ.resolved_ids,
1923            CatalogItem::View(view) => &view.resolved_ids,
1924            CatalogItem::MaterializedView(mview) => &mview.resolved_ids,
1925            CatalogItem::Secret(_) => &*EMPTY,
1926            CatalogItem::Connection(connection) => &connection.resolved_ids,
1927        }
1928    }
1929
1930    /// Collects the identifiers of the objects used by this [`CatalogItem`].
1931    ///
1932    /// Like [`CatalogItem::references()`] but also includes objects that are not directly
1933    /// referenced. For example this will include any catalog objects used to implement functions
1934    /// and casts in the item.
1935    pub fn uses(&self) -> BTreeSet<CatalogItemId> {
1936        let mut uses: BTreeSet<_> = self.references().items().copied().collect();
1937        match self {
1938            // TODO(jkosh44) This isn't really correct for functions. They may use other objects in
1939            // their implementation. However, currently there's no way to get that information.
1940            CatalogItem::Func(_) => {}
1941            CatalogItem::Index(_) => {}
1942            CatalogItem::Sink(_) => {}
1943            CatalogItem::Source(_) => {}
1944            CatalogItem::Log(_) => {}
1945            CatalogItem::Table(_) => {}
1946            CatalogItem::Type(_) => {}
1947            CatalogItem::View(view) => uses.extend(view.dependencies.0.iter().copied()),
1948            CatalogItem::MaterializedView(mview) => {
1949                uses.extend(mview.dependencies.0.iter().copied())
1950            }
1951            CatalogItem::Secret(_) => {}
1952            CatalogItem::Connection(_) => {}
1953        }
1954        uses
1955    }
1956
1957    /// Returns the connection ID that this item belongs to, if this item is
1958    /// temporary.
1959    pub fn conn_id(&self) -> Option<&ConnectionId> {
1960        match self {
1961            CatalogItem::View(view) => view.conn_id.as_ref(),
1962            CatalogItem::Index(index) => index.conn_id.as_ref(),
1963            CatalogItem::Table(table) => table.conn_id.as_ref(),
1964            CatalogItem::Log(_)
1965            | CatalogItem::Source(_)
1966            | CatalogItem::Sink(_)
1967            | CatalogItem::MaterializedView(_)
1968            | CatalogItem::Secret(_)
1969            | CatalogItem::Type(_)
1970            | CatalogItem::Func(_)
1971            | CatalogItem::Connection(_) => None,
1972        }
1973    }
1974
1975    /// Sets the connection ID that this item belongs to, which makes it a
1976    /// temporary item.
1977    pub fn set_conn_id(&mut self, conn_id: Option<ConnectionId>) {
1978        match self {
1979            CatalogItem::View(view) => view.conn_id = conn_id,
1980            CatalogItem::Index(index) => index.conn_id = conn_id,
1981            CatalogItem::Table(table) => table.conn_id = conn_id,
1982            CatalogItem::Log(_)
1983            | CatalogItem::Source(_)
1984            | CatalogItem::Sink(_)
1985            | CatalogItem::MaterializedView(_)
1986            | CatalogItem::Secret(_)
1987            | CatalogItem::Type(_)
1988            | CatalogItem::Func(_)
1989            | CatalogItem::Connection(_) => (),
1990        }
1991    }
1992
1993    /// Indicates whether this item is temporary or not.
1994    pub fn is_temporary(&self) -> bool {
1995        self.conn_id().is_some()
1996    }
1997
1998    pub fn rename_schema_refs(
1999        &self,
2000        database_name: &str,
2001        cur_schema_name: &str,
2002        new_schema_name: &str,
2003    ) -> Result<CatalogItem, (String, String)> {
2004        let do_rewrite = |create_sql: String| -> Result<String, (String, String)> {
2005            let mut create_stmt = mz_sql::parse::parse(&create_sql)
2006                .expect("invalid create sql persisted to catalog")
2007                .into_element()
2008                .ast;
2009
2010            // Rename all references to cur_schema_name.
2011            mz_sql::ast::transform::create_stmt_rename_schema_refs(
2012                &mut create_stmt,
2013                database_name,
2014                cur_schema_name,
2015                new_schema_name,
2016            )?;
2017
2018            Ok(create_stmt.to_ast_string_stable())
2019        };
2020
2021        match self {
2022            CatalogItem::Table(i) => {
2023                let mut i = i.clone();
2024                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2025                Ok(CatalogItem::Table(i))
2026            }
2027            CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
2028            CatalogItem::Source(i) => {
2029                let mut i = i.clone();
2030                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2031                Ok(CatalogItem::Source(i))
2032            }
2033            CatalogItem::Sink(i) => {
2034                let mut i = i.clone();
2035                i.create_sql = do_rewrite(i.create_sql)?;
2036                Ok(CatalogItem::Sink(i))
2037            }
2038            CatalogItem::View(i) => {
2039                let mut i = i.clone();
2040                i.create_sql = do_rewrite(i.create_sql)?;
2041                Ok(CatalogItem::View(i))
2042            }
2043            CatalogItem::MaterializedView(i) => {
2044                let mut i = i.clone();
2045                i.create_sql = do_rewrite(i.create_sql)?;
2046                Ok(CatalogItem::MaterializedView(i))
2047            }
2048            CatalogItem::Index(i) => {
2049                let mut i = i.clone();
2050                i.create_sql = do_rewrite(i.create_sql)?;
2051                Ok(CatalogItem::Index(i))
2052            }
2053            CatalogItem::Secret(i) => {
2054                let mut i = i.clone();
2055                i.create_sql = do_rewrite(i.create_sql)?;
2056                Ok(CatalogItem::Secret(i))
2057            }
2058            CatalogItem::Connection(i) => {
2059                let mut i = i.clone();
2060                i.create_sql = do_rewrite(i.create_sql)?;
2061                Ok(CatalogItem::Connection(i))
2062            }
2063            CatalogItem::Type(i) => {
2064                let mut i = i.clone();
2065                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2066                Ok(CatalogItem::Type(i))
2067            }
2068            CatalogItem::Func(i) => Ok(CatalogItem::Func(i.clone())),
2069        }
2070    }
2071
2072    /// Returns a clone of `self` with all instances of `from` renamed to `to`
2073    /// (with the option of including the item's own name) or errors if request
2074    /// is ambiguous.
2075    pub fn rename_item_refs(
2076        &self,
2077        from: FullItemName,
2078        to_item_name: String,
2079        rename_self: bool,
2080    ) -> Result<CatalogItem, String> {
2081        let do_rewrite = |create_sql: String| -> Result<String, String> {
2082            let mut create_stmt = mz_sql::parse::parse(&create_sql)
2083                .expect("invalid create sql persisted to catalog")
2084                .into_element()
2085                .ast;
2086            if rename_self {
2087                mz_sql::ast::transform::create_stmt_rename(&mut create_stmt, to_item_name.clone());
2088            }
2089            // Determination of what constitutes an ambiguous request is done here.
2090            mz_sql::ast::transform::create_stmt_rename_refs(&mut create_stmt, from, to_item_name)?;
2091            Ok(create_stmt.to_ast_string_stable())
2092        };
2093
2094        match self {
2095            CatalogItem::Table(i) => {
2096                let mut i = i.clone();
2097                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2098                Ok(CatalogItem::Table(i))
2099            }
2100            CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
2101            CatalogItem::Source(i) => {
2102                let mut i = i.clone();
2103                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2104                Ok(CatalogItem::Source(i))
2105            }
2106            CatalogItem::Sink(i) => {
2107                let mut i = i.clone();
2108                i.create_sql = do_rewrite(i.create_sql)?;
2109                Ok(CatalogItem::Sink(i))
2110            }
2111            CatalogItem::View(i) => {
2112                let mut i = i.clone();
2113                i.create_sql = do_rewrite(i.create_sql)?;
2114                Ok(CatalogItem::View(i))
2115            }
2116            CatalogItem::MaterializedView(i) => {
2117                let mut i = i.clone();
2118                i.create_sql = do_rewrite(i.create_sql)?;
2119                Ok(CatalogItem::MaterializedView(i))
2120            }
2121            CatalogItem::Index(i) => {
2122                let mut i = i.clone();
2123                i.create_sql = do_rewrite(i.create_sql)?;
2124                Ok(CatalogItem::Index(i))
2125            }
2126            CatalogItem::Secret(i) => {
2127                let mut i = i.clone();
2128                i.create_sql = do_rewrite(i.create_sql)?;
2129                Ok(CatalogItem::Secret(i))
2130            }
2131            CatalogItem::Func(_) | CatalogItem::Type(_) => {
2132                unreachable!("{}s cannot be renamed", self.typ())
2133            }
2134            CatalogItem::Connection(i) => {
2135                let mut i = i.clone();
2136                i.create_sql = do_rewrite(i.create_sql)?;
2137                Ok(CatalogItem::Connection(i))
2138            }
2139        }
2140    }
2141
2142    /// Returns a clone of `self` with all instances of `old_id` replaced with `new_id`.
2143    pub fn replace_item_refs(&self, old_id: CatalogItemId, new_id: CatalogItemId) -> CatalogItem {
2144        let do_rewrite = |create_sql: String| -> String {
2145            let mut create_stmt = mz_sql::parse::parse(&create_sql)
2146                .expect("invalid create sql persisted to catalog")
2147                .into_element()
2148                .ast;
2149            mz_sql::ast::transform::create_stmt_replace_ids(
2150                &mut create_stmt,
2151                &[(old_id, new_id)].into(),
2152            );
2153            create_stmt.to_ast_string_stable()
2154        };
2155
2156        match self {
2157            CatalogItem::Table(i) => {
2158                let mut i = i.clone();
2159                i.create_sql = i.create_sql.map(do_rewrite);
2160                CatalogItem::Table(i)
2161            }
2162            CatalogItem::Log(i) => CatalogItem::Log(i.clone()),
2163            CatalogItem::Source(i) => {
2164                let mut i = i.clone();
2165                i.create_sql = i.create_sql.map(do_rewrite);
2166                CatalogItem::Source(i)
2167            }
2168            CatalogItem::Sink(i) => {
2169                let mut i = i.clone();
2170                i.create_sql = do_rewrite(i.create_sql);
2171                CatalogItem::Sink(i)
2172            }
2173            CatalogItem::View(i) => {
2174                let mut i = i.clone();
2175                i.create_sql = do_rewrite(i.create_sql);
2176                CatalogItem::View(i)
2177            }
2178            CatalogItem::MaterializedView(i) => {
2179                let mut i = i.clone();
2180                i.create_sql = do_rewrite(i.create_sql);
2181                CatalogItem::MaterializedView(i)
2182            }
2183            CatalogItem::Index(i) => {
2184                let mut i = i.clone();
2185                i.create_sql = do_rewrite(i.create_sql);
2186                CatalogItem::Index(i)
2187            }
2188            CatalogItem::Secret(i) => {
2189                let mut i = i.clone();
2190                i.create_sql = do_rewrite(i.create_sql);
2191                CatalogItem::Secret(i)
2192            }
2193            CatalogItem::Func(_) | CatalogItem::Type(_) => {
2194                unreachable!("references of {}s cannot be replaced", self.typ())
2195            }
2196            CatalogItem::Connection(i) => {
2197                let mut i = i.clone();
2198                i.create_sql = do_rewrite(i.create_sql);
2199                CatalogItem::Connection(i)
2200            }
2201        }
2202    }
2203    /// Updates the retain history for an item. Returns the previous retain history value. Returns
2204    /// an error if this item does not support retain history.
2205    pub fn update_retain_history(
2206        &mut self,
2207        value: Option<Value>,
2208        window: CompactionWindow,
2209    ) -> Result<Option<WithOptionValue<Raw>>, ()> {
2210        let update = |mut ast: &mut Statement<Raw>| {
2211            // Each statement type has unique option types. This macro handles them commonly.
2212            macro_rules! update_retain_history {
2213                ( $stmt:ident, $opt:ident, $name:ident ) => {{
2214                    // Replace or add the option.
2215                    let pos = $stmt
2216                        .with_options
2217                        .iter()
2218                        // In case there are ever multiple, look for the last one.
2219                        .rposition(|o| o.name == mz_sql_parser::ast::$name::RetainHistory);
2220                    if let Some(value) = value {
2221                        let next = mz_sql_parser::ast::$opt {
2222                            name: mz_sql_parser::ast::$name::RetainHistory,
2223                            value: Some(WithOptionValue::RetainHistoryFor(value)),
2224                        };
2225                        if let Some(idx) = pos {
2226                            let previous = $stmt.with_options[idx].clone();
2227                            $stmt.with_options[idx] = next;
2228                            previous.value
2229                        } else {
2230                            $stmt.with_options.push(next);
2231                            None
2232                        }
2233                    } else {
2234                        if let Some(idx) = pos {
2235                            $stmt.with_options.swap_remove(idx).value
2236                        } else {
2237                            None
2238                        }
2239                    }
2240                }};
2241            }
2242            let previous = match &mut ast {
2243                Statement::CreateTable(stmt) => {
2244                    update_retain_history!(stmt, TableOption, TableOptionName)
2245                }
2246                Statement::CreateIndex(stmt) => {
2247                    update_retain_history!(stmt, IndexOption, IndexOptionName)
2248                }
2249                Statement::CreateSource(stmt) => {
2250                    update_retain_history!(stmt, CreateSourceOption, CreateSourceOptionName)
2251                }
2252                Statement::CreateMaterializedView(stmt) => {
2253                    update_retain_history!(stmt, MaterializedViewOption, MaterializedViewOptionName)
2254                }
2255                _ => {
2256                    return Err(());
2257                }
2258            };
2259            Ok(previous)
2260        };
2261
2262        let res = self.update_sql(update)?;
2263        let cw = self
2264            .custom_logical_compaction_window_mut()
2265            .expect("item must have compaction window");
2266        *cw = Some(window);
2267        Ok(res)
2268    }
2269
2270    /// Updates the timestamp interval for a source. Returns an error if this item is not a source.
2271    pub fn update_timestamp_interval(
2272        &mut self,
2273        value: Option<Value>,
2274        interval: Duration,
2275    ) -> Result<(), ()> {
2276        let update = |ast: &mut Statement<Raw>| {
2277            match ast {
2278                Statement::CreateSource(stmt) => {
2279                    let pos = stmt.with_options.iter().rposition(|o| {
2280                        o.name == mz_sql_parser::ast::CreateSourceOptionName::TimestampInterval
2281                    });
2282                    if let Some(value) = value {
2283                        let next = mz_sql_parser::ast::CreateSourceOption {
2284                            name: mz_sql_parser::ast::CreateSourceOptionName::TimestampInterval,
2285                            value: Some(WithOptionValue::Value(value)),
2286                        };
2287                        if let Some(idx) = pos {
2288                            stmt.with_options[idx] = next;
2289                        } else {
2290                            stmt.with_options.push(next);
2291                        }
2292                    } else {
2293                        if let Some(idx) = pos {
2294                            stmt.with_options.swap_remove(idx);
2295                        }
2296                    }
2297                }
2298                _ => return Err(()),
2299            };
2300            Ok(())
2301        };
2302
2303        self.update_sql(update)?;
2304
2305        // Update the in-memory SourceDesc timestamp_interval.
2306        match self {
2307            CatalogItem::Source(source) => {
2308                match &mut source.data_source {
2309                    DataSourceDesc::Ingestion { desc, .. }
2310                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
2311                        desc.timestamp_interval = interval;
2312                    }
2313                    _ => return Err(()),
2314                }
2315                Ok(())
2316            }
2317            _ => Err(()),
2318        }
2319    }
2320
2321    pub fn add_column(
2322        &mut self,
2323        name: ColumnName,
2324        typ: SqlColumnType,
2325        sql: RawDataType,
2326    ) -> Result<RelationVersion, PlanError> {
2327        let CatalogItem::Table(table) = self else {
2328            return Err(PlanError::Unsupported {
2329                feature: "adding columns to a non-Table".to_string(),
2330                discussion_no: None,
2331            });
2332        };
2333        let next_version = table.desc.add_column(name.clone(), typ);
2334
2335        let update = |mut ast: &mut Statement<Raw>| match &mut ast {
2336            Statement::CreateTable(stmt) => {
2337                let version = ColumnOptionDef {
2338                    name: None,
2339                    option: ColumnOption::Versioned {
2340                        action: ColumnVersioned::Added,
2341                        version: next_version.into(),
2342                    },
2343                };
2344                let column = ColumnDef {
2345                    name: name.into(),
2346                    data_type: sql,
2347                    collation: None,
2348                    options: vec![version],
2349                };
2350                stmt.columns.push(column);
2351                Ok(())
2352            }
2353            _ => Err(()),
2354        };
2355
2356        self.update_sql(update)
2357            .map_err(|()| PlanError::Unstructured("expected CREATE TABLE statement".to_string()))?;
2358        Ok(next_version)
2359    }
2360
2361    /// Updates the create_sql field of this item. Returns an error if this is a builtin item,
2362    /// otherwise returns f's result.
2363    pub fn update_sql<F, T>(&mut self, f: F) -> Result<T, ()>
2364    where
2365        F: FnOnce(&mut Statement<Raw>) -> Result<T, ()>,
2366    {
2367        let create_sql = match self {
2368            CatalogItem::Table(Table { create_sql, .. })
2369            | CatalogItem::Type(Type { create_sql, .. })
2370            | CatalogItem::Source(Source { create_sql, .. }) => create_sql.as_mut(),
2371            CatalogItem::Sink(Sink { create_sql, .. })
2372            | CatalogItem::View(View { create_sql, .. })
2373            | CatalogItem::MaterializedView(MaterializedView { create_sql, .. })
2374            | CatalogItem::Index(Index { create_sql, .. })
2375            | CatalogItem::Secret(Secret { create_sql, .. })
2376            | CatalogItem::Connection(Connection { create_sql, .. }) => Some(create_sql),
2377            CatalogItem::Func(_) | CatalogItem::Log(_) => None,
2378        };
2379        let Some(create_sql) = create_sql else {
2380            return Err(());
2381        };
2382        let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
2383            .expect("non-system items must be parseable")
2384            .into_element()
2385            .ast;
2386        debug!("rewrite: {}", ast.to_ast_string_redacted());
2387        let t = f(&mut ast)?;
2388        *create_sql = ast.to_ast_string_stable();
2389        debug!("rewrote: {}", ast.to_ast_string_redacted());
2390        Ok(t)
2391    }
2392
2393    /// If the object is considered a "compute object"
2394    /// (i.e., it is managed by the compute controller),
2395    /// this function returns its cluster ID. Otherwise, it returns nothing.
2396    ///
2397    /// This function differs from `cluster_id` because while all
2398    /// compute objects run on a cluster, the converse is not true.
2399    pub fn is_compute_object_on_cluster(&self) -> Option<ClusterId> {
2400        match self {
2401            CatalogItem::Index(index) => Some(index.cluster_id),
2402            CatalogItem::Table(_)
2403            | CatalogItem::Source(_)
2404            | CatalogItem::Log(_)
2405            | CatalogItem::View(_)
2406            | CatalogItem::MaterializedView(_)
2407            | CatalogItem::Sink(_)
2408            | CatalogItem::Type(_)
2409            | CatalogItem::Func(_)
2410            | CatalogItem::Secret(_)
2411            | CatalogItem::Connection(_) => None,
2412        }
2413    }
2414
2415    pub fn cluster_id(&self) -> Option<ClusterId> {
2416        match self {
2417            CatalogItem::MaterializedView(mv) => Some(mv.cluster_id),
2418            CatalogItem::Index(index) => Some(index.cluster_id),
2419            CatalogItem::Source(source) => match &source.data_source {
2420                DataSourceDesc::Ingestion { cluster_id, .. }
2421                | DataSourceDesc::OldSyntaxIngestion { cluster_id, .. } => Some(*cluster_id),
2422                // This is somewhat of a lie because the export runs on the same
2423                // cluster as its ingestion but we don't yet have a way of
2424                // cross-referencing the items
2425                DataSourceDesc::IngestionExport { .. } => None,
2426                DataSourceDesc::Webhook { cluster_id, .. } => Some(*cluster_id),
2427                DataSourceDesc::Introspection(_)
2428                | DataSourceDesc::Progress
2429                | DataSourceDesc::Catalog => None,
2430            },
2431            CatalogItem::Sink(sink) => Some(sink.cluster_id),
2432            CatalogItem::Table(_)
2433            | CatalogItem::Log(_)
2434            | CatalogItem::View(_)
2435            | CatalogItem::Type(_)
2436            | CatalogItem::Func(_)
2437            | CatalogItem::Secret(_)
2438            | CatalogItem::Connection(_) => None,
2439        }
2440    }
2441
2442    /// The custom compaction window, if any has been set. This does not reflect any propagated
2443    /// compaction window (i.e., source -> subsource).
2444    pub fn custom_logical_compaction_window(&self) -> Option<CompactionWindow> {
2445        match self {
2446            CatalogItem::Table(table) => table.custom_logical_compaction_window,
2447            CatalogItem::Source(source) => source.custom_logical_compaction_window,
2448            CatalogItem::Index(index) => index.custom_logical_compaction_window,
2449            CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
2450            CatalogItem::Log(_)
2451            | CatalogItem::View(_)
2452            | CatalogItem::Sink(_)
2453            | CatalogItem::Type(_)
2454            | CatalogItem::Func(_)
2455            | CatalogItem::Secret(_)
2456            | CatalogItem::Connection(_) => None,
2457        }
2458    }
2459
2460    /// Mutable access to the custom compaction window, or None if this type does not support custom
2461    /// compaction windows. This does not reflect any propagated compaction window (i.e., source ->
2462    /// subsource).
2463    pub fn custom_logical_compaction_window_mut(
2464        &mut self,
2465    ) -> Option<&mut Option<CompactionWindow>> {
2466        let cw = match self {
2467            CatalogItem::Table(table) => &mut table.custom_logical_compaction_window,
2468            CatalogItem::Source(source) => &mut source.custom_logical_compaction_window,
2469            CatalogItem::Index(index) => &mut index.custom_logical_compaction_window,
2470            CatalogItem::MaterializedView(mview) => &mut mview.custom_logical_compaction_window,
2471            CatalogItem::Log(_)
2472            | CatalogItem::View(_)
2473            | CatalogItem::Sink(_)
2474            | CatalogItem::Type(_)
2475            | CatalogItem::Func(_)
2476            | CatalogItem::Secret(_)
2477            | CatalogItem::Connection(_) => return None,
2478        };
2479        Some(cw)
2480    }
2481
2482    /// The initial compaction window, for objects that have one; that is, tables, sources, indexes,
2483    /// and MVs. This does not reflect any propagated compaction window (i.e., source -> subsource).
2484    ///
2485    /// If `custom_logical_compaction_window()` returns something, use that.  Otherwise, use a
2486    /// sensible default (currently 1s).
2487    ///
2488    /// For objects that do not have the concept of compaction window, return None.
2489    pub fn initial_logical_compaction_window(&self) -> Option<CompactionWindow> {
2490        let custom_logical_compaction_window = match self {
2491            CatalogItem::Table(_)
2492            | CatalogItem::Source(_)
2493            | CatalogItem::Index(_)
2494            | CatalogItem::MaterializedView(_) => self.custom_logical_compaction_window(),
2495            CatalogItem::Log(_)
2496            | CatalogItem::View(_)
2497            | CatalogItem::Sink(_)
2498            | CatalogItem::Type(_)
2499            | CatalogItem::Func(_)
2500            | CatalogItem::Secret(_)
2501            | CatalogItem::Connection(_) => return None,
2502        };
2503        Some(custom_logical_compaction_window.unwrap_or(CompactionWindow::Default))
2504    }
2505
2506    /// Whether the item's logical compaction window
2507    /// is controlled by the METRICS_RETENTION
2508    /// system var.
2509    pub fn is_retained_metrics_object(&self) -> bool {
2510        match self {
2511            CatalogItem::Table(table) => table.is_retained_metrics_object,
2512            CatalogItem::Source(source) => source.is_retained_metrics_object,
2513            CatalogItem::Index(index) => index.is_retained_metrics_object,
2514            CatalogItem::Log(_)
2515            | CatalogItem::View(_)
2516            | CatalogItem::MaterializedView(_)
2517            | CatalogItem::Sink(_)
2518            | CatalogItem::Type(_)
2519            | CatalogItem::Func(_)
2520            | CatalogItem::Secret(_)
2521            | CatalogItem::Connection(_) => false,
2522        }
2523    }
2524
2525    pub fn to_serialized(&self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2526        match self {
2527            CatalogItem::Table(table) => {
2528                let create_sql = table
2529                    .create_sql
2530                    .clone()
2531                    .expect("builtin tables cannot be serialized");
2532                let mut collections = table.collections.clone();
2533                let global_id = collections
2534                    .remove(&RelationVersion::root())
2535                    .expect("at least one version");
2536                (create_sql, global_id, collections)
2537            }
2538            CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2539            CatalogItem::Source(source) => {
2540                assert!(
2541                    !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2542                    "cannot serialize introspection/builtin sources",
2543                );
2544                let create_sql = source
2545                    .create_sql
2546                    .clone()
2547                    .expect("builtin sources cannot be serialized");
2548                (create_sql, source.global_id, BTreeMap::new())
2549            }
2550            CatalogItem::View(view) => (view.create_sql.clone(), view.global_id, BTreeMap::new()),
2551            CatalogItem::MaterializedView(mview) => {
2552                let mut collections = mview.collections.clone();
2553                let global_id = collections
2554                    .remove(&RelationVersion::root())
2555                    .expect("at least one version");
2556                (mview.create_sql.clone(), global_id, collections)
2557            }
2558            CatalogItem::Index(index) => {
2559                (index.create_sql.clone(), index.global_id, BTreeMap::new())
2560            }
2561            CatalogItem::Sink(sink) => (sink.create_sql.clone(), sink.global_id, BTreeMap::new()),
2562            CatalogItem::Type(typ) => {
2563                let create_sql = typ
2564                    .create_sql
2565                    .clone()
2566                    .expect("builtin types cannot be serialized");
2567                (create_sql, typ.global_id, BTreeMap::new())
2568            }
2569            CatalogItem::Secret(secret) => {
2570                (secret.create_sql.clone(), secret.global_id, BTreeMap::new())
2571            }
2572            CatalogItem::Connection(connection) => (
2573                connection.create_sql.clone(),
2574                connection.global_id,
2575                BTreeMap::new(),
2576            ),
2577            CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2578        }
2579    }
2580
2581    pub fn into_serialized(self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2582        match self {
2583            CatalogItem::Table(mut table) => {
2584                let create_sql = table
2585                    .create_sql
2586                    .expect("builtin tables cannot be serialized");
2587                let global_id = table
2588                    .collections
2589                    .remove(&RelationVersion::root())
2590                    .expect("at least one version");
2591                (create_sql, global_id, table.collections)
2592            }
2593            CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2594            CatalogItem::Source(source) => {
2595                assert!(
2596                    !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2597                    "cannot serialize introspection/builtin sources",
2598                );
2599                let create_sql = source
2600                    .create_sql
2601                    .expect("builtin sources cannot be serialized");
2602                (create_sql, source.global_id, BTreeMap::new())
2603            }
2604            CatalogItem::View(view) => (view.create_sql, view.global_id, BTreeMap::new()),
2605            CatalogItem::MaterializedView(mut mview) => {
2606                let global_id = mview
2607                    .collections
2608                    .remove(&RelationVersion::root())
2609                    .expect("at least one version");
2610                (mview.create_sql, global_id, mview.collections)
2611            }
2612            CatalogItem::Index(index) => (index.create_sql, index.global_id, BTreeMap::new()),
2613            CatalogItem::Sink(sink) => (sink.create_sql, sink.global_id, BTreeMap::new()),
2614            CatalogItem::Type(typ) => {
2615                let create_sql = typ.create_sql.expect("builtin types cannot be serialized");
2616                (create_sql, typ.global_id, BTreeMap::new())
2617            }
2618            CatalogItem::Secret(secret) => (secret.create_sql, secret.global_id, BTreeMap::new()),
2619            CatalogItem::Connection(connection) => {
2620                (connection.create_sql, connection.global_id, BTreeMap::new())
2621            }
2622            CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2623        }
2624    }
2625
2626    /// Returns a global ID for a specific version selector. Returns `None` if the item does
2627    /// not have versions or if the version does not exist.
2628    pub fn global_id_for_version(&self, version: RelationVersionSelector) -> Option<GlobalId> {
2629        let collections = match self {
2630            CatalogItem::MaterializedView(mv) => &mv.collections,
2631            CatalogItem::Table(table) => &table.collections,
2632            CatalogItem::Source(source) => return Some(source.global_id),
2633            CatalogItem::Log(log) => return Some(log.global_id),
2634            CatalogItem::View(view) => return Some(view.global_id),
2635            CatalogItem::Sink(sink) => return Some(sink.global_id),
2636            CatalogItem::Index(index) => return Some(index.global_id),
2637            CatalogItem::Type(ty) => return Some(ty.global_id),
2638            CatalogItem::Func(func) => return Some(func.global_id),
2639            CatalogItem::Secret(secret) => return Some(secret.global_id),
2640            CatalogItem::Connection(conn) => return Some(conn.global_id),
2641        };
2642        match version {
2643            RelationVersionSelector::Latest => collections.values().last().copied(),
2644            RelationVersionSelector::Specific(version) => collections.get(&version).copied(),
2645        }
2646    }
2647}
2648
2649impl CatalogEntry {
2650    /// Reports the latest [`RelationDesc`] of the rows produced by this [`CatalogEntry`], if it
2651    /// produces rows.
2652    pub fn relation_desc_latest(&self) -> Option<Cow<'_, RelationDesc>> {
2653        self.item.relation_desc(RelationVersionSelector::Latest)
2654    }
2655
2656    /// Reports if the item has columns.
2657    pub fn has_columns(&self) -> bool {
2658        match self.item() {
2659            CatalogItem::Type(Type { details, .. }) => {
2660                matches!(details.typ, CatalogType::Record { .. })
2661            }
2662            _ => self.relation_desc_latest().is_some(),
2663        }
2664    }
2665
2666    /// Returns the [`mz_sql::func::Func`] associated with this `CatalogEntry`.
2667    pub fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
2668        self.item.func(self)
2669    }
2670
2671    /// Returns the inner [`Index`] if this entry is an index, else `None`.
2672    pub fn index(&self) -> Option<&Index> {
2673        match self.item() {
2674            CatalogItem::Index(idx) => Some(idx),
2675            _ => None,
2676        }
2677    }
2678
2679    /// Returns the inner [`MaterializedView`] if this entry is a materialized view, else `None`.
2680    pub fn materialized_view(&self) -> Option<&MaterializedView> {
2681        match self.item() {
2682            CatalogItem::MaterializedView(mv) => Some(mv),
2683            _ => None,
2684        }
2685    }
2686
2687    /// Returns the inner [`Table`] if this entry is a table, else `None`.
2688    pub fn table(&self) -> Option<&Table> {
2689        match self.item() {
2690            CatalogItem::Table(tbl) => Some(tbl),
2691            _ => None,
2692        }
2693    }
2694
2695    /// Returns the inner [`Source`] if this entry is a source, else `None`.
2696    pub fn source(&self) -> Option<&Source> {
2697        match self.item() {
2698            CatalogItem::Source(src) => Some(src),
2699            _ => None,
2700        }
2701    }
2702
2703    /// Returns the inner [`Sink`] if this entry is a sink, else `None`.
2704    pub fn sink(&self) -> Option<&Sink> {
2705        match self.item() {
2706            CatalogItem::Sink(sink) => Some(sink),
2707            _ => None,
2708        }
2709    }
2710
2711    /// Returns the inner [`Secret`] if this entry is a secret, else `None`.
2712    pub fn secret(&self) -> Option<&Secret> {
2713        match self.item() {
2714            CatalogItem::Secret(secret) => Some(secret),
2715            _ => None,
2716        }
2717    }
2718
2719    pub fn connection(&self) -> Result<&Connection, SqlCatalogError> {
2720        match self.item() {
2721            CatalogItem::Connection(connection) => Ok(connection),
2722            _ => {
2723                let db_name = match self.name().qualifiers.database_spec {
2724                    ResolvedDatabaseSpecifier::Ambient => "".to_string(),
2725                    ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
2726                };
2727                Err(SqlCatalogError::UnknownConnection(format!(
2728                    "{}{}.{}",
2729                    db_name,
2730                    self.name().qualifiers.schema_spec,
2731                    self.name().item
2732                )))
2733            }
2734        }
2735    }
2736
2737    /// Returns the [`mz_storage_types::sources::SourceDesc`] associated with
2738    /// this `CatalogEntry`, if any.
2739    pub fn source_desc(
2740        &self,
2741    ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
2742        self.item.source_desc(self)
2743    }
2744
2745    /// Reports whether this catalog entry is a connection.
2746    pub fn is_connection(&self) -> bool {
2747        matches!(self.item(), CatalogItem::Connection(_))
2748    }
2749
2750    /// Reports whether this catalog entry is a table.
2751    pub fn is_table(&self) -> bool {
2752        matches!(self.item(), CatalogItem::Table(_))
2753    }
2754
2755    /// Reports whether this catalog entry is a source. Note that this includes
2756    /// subsources.
2757    pub fn is_source(&self) -> bool {
2758        matches!(self.item(), CatalogItem::Source(_))
2759    }
2760
2761    /// Reports whether this catalog entry is a subsource and, if it is, the
2762    /// ingestion it is an export of, as well as the item it exports.
2763    pub fn subsource_details(
2764        &self,
2765    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
2766        match &self.item() {
2767            CatalogItem::Source(source) => match &source.data_source {
2768                DataSourceDesc::IngestionExport {
2769                    ingestion_id,
2770                    external_reference,
2771                    details,
2772                    data_config: _,
2773                } => Some((*ingestion_id, external_reference, details)),
2774                _ => None,
2775            },
2776            _ => None,
2777        }
2778    }
2779
2780    /// Reports whether this catalog entry is a source export and, if it is, the
2781    /// ingestion it is an export of, as well as the item it exports.
2782    pub fn source_export_details(
2783        &self,
2784    ) -> Option<(
2785        CatalogItemId,
2786        &UnresolvedItemName,
2787        &SourceExportDetails,
2788        &SourceExportDataConfig<ReferencedConnection>,
2789    )> {
2790        match &self.item() {
2791            CatalogItem::Source(source) => match &source.data_source {
2792                DataSourceDesc::IngestionExport {
2793                    ingestion_id,
2794                    external_reference,
2795                    details,
2796                    data_config,
2797                } => Some((*ingestion_id, external_reference, details, data_config)),
2798                _ => None,
2799            },
2800            CatalogItem::Table(table) => match &table.data_source {
2801                TableDataSource::DataSource {
2802                    desc:
2803                        DataSourceDesc::IngestionExport {
2804                            ingestion_id,
2805                            external_reference,
2806                            details,
2807                            data_config,
2808                        },
2809                    timeline: _,
2810                } => Some((*ingestion_id, external_reference, details, data_config)),
2811                _ => None,
2812            },
2813            _ => None,
2814        }
2815    }
2816
2817    /// Reports whether this catalog entry is a progress source.
2818    pub fn is_progress_source(&self) -> bool {
2819        self.item().is_progress_source()
2820    }
2821
2822    /// Returns the `GlobalId` of all of this entry's progress ID.
2823    pub fn progress_id(&self) -> Option<CatalogItemId> {
2824        match &self.item() {
2825            CatalogItem::Source(source) => match &source.data_source {
2826                DataSourceDesc::Ingestion { .. } => Some(self.id),
2827                DataSourceDesc::OldSyntaxIngestion {
2828                    progress_subsource, ..
2829                } => Some(*progress_subsource),
2830                DataSourceDesc::IngestionExport { .. }
2831                | DataSourceDesc::Introspection(_)
2832                | DataSourceDesc::Progress
2833                | DataSourceDesc::Webhook { .. }
2834                | DataSourceDesc::Catalog => None,
2835            },
2836            CatalogItem::Table(_)
2837            | CatalogItem::Log(_)
2838            | CatalogItem::View(_)
2839            | CatalogItem::MaterializedView(_)
2840            | CatalogItem::Sink(_)
2841            | CatalogItem::Index(_)
2842            | CatalogItem::Type(_)
2843            | CatalogItem::Func(_)
2844            | CatalogItem::Secret(_)
2845            | CatalogItem::Connection(_) => None,
2846        }
2847    }
2848
2849    /// Reports whether this catalog entry is a sink.
2850    pub fn is_sink(&self) -> bool {
2851        matches!(self.item(), CatalogItem::Sink(_))
2852    }
2853
2854    /// Reports whether this catalog entry is a materialized view.
2855    pub fn is_materialized_view(&self) -> bool {
2856        matches!(self.item(), CatalogItem::MaterializedView(_))
2857    }
2858
2859    /// Reports whether this catalog entry is a view.
2860    pub fn is_view(&self) -> bool {
2861        matches!(self.item(), CatalogItem::View(_))
2862    }
2863
2864    /// Reports whether this catalog entry is a secret.
2865    pub fn is_secret(&self) -> bool {
2866        matches!(self.item(), CatalogItem::Secret(_))
2867    }
2868
2869    /// Reports whether this catalog entry is an introspection source.
2870    pub fn is_introspection_source(&self) -> bool {
2871        matches!(self.item(), CatalogItem::Log(_))
2872    }
2873
2874    /// Reports whether this catalog entry is an index.
2875    pub fn is_index(&self) -> bool {
2876        matches!(self.item(), CatalogItem::Index(_))
2877    }
2878
2879    /// Reports whether this catalog entry can be treated as a relation, it can produce rows.
2880    pub fn is_relation(&self) -> bool {
2881        mz_sql::catalog::ObjectType::from(self.item_type()).is_relation()
2882    }
2883
2884    /// Collects the identifiers of the objects that were encountered when
2885    /// resolving names in the item's DDL statement.
2886    pub fn references(&self) -> &ResolvedIds {
2887        self.item.references()
2888    }
2889
2890    /// Collects the identifiers of the objects used by this [`CatalogEntry`].
2891    ///
2892    /// Like [`CatalogEntry::references()`] but also includes objects that are not directly
2893    /// referenced. For example this will include any catalog objects used to implement functions
2894    /// and casts in the item.
2895    pub fn uses(&self) -> BTreeSet<CatalogItemId> {
2896        self.item.uses()
2897    }
2898
2899    /// Returns the `CatalogItem` associated with this catalog entry.
2900    pub fn item(&self) -> &CatalogItem {
2901        &self.item
2902    }
2903
2904    /// Returns a mutable reference to the `CatalogItem` associated with this
2905    /// catalog entry.
2906    pub fn item_mut(&mut self) -> &mut CatalogItem {
2907        &mut self.item
2908    }
2909
2910    /// Returns the [`CatalogItemId`] of this catalog entry.
2911    pub fn id(&self) -> CatalogItemId {
2912        self.id
2913    }
2914
2915    /// Returns all of the [`GlobalId`]s associated with this item.
2916    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2917        self.item().global_ids()
2918    }
2919
2920    pub fn latest_global_id(&self) -> GlobalId {
2921        self.item().latest_global_id()
2922    }
2923
2924    /// Returns the OID of this catalog entry.
2925    pub fn oid(&self) -> u32 {
2926        self.oid
2927    }
2928
2929    /// Returns the fully qualified name of this catalog entry.
2930    pub fn name(&self) -> &QualifiedItemName {
2931        &self.name
2932    }
2933
2934    /// Returns the identifiers of the dataflows that are directly referenced by this dataflow.
2935    pub fn referenced_by(&self) -> &[CatalogItemId] {
2936        &self.referenced_by
2937    }
2938
2939    /// Returns the identifiers of the dataflows that depend upon this dataflow.
2940    pub fn used_by(&self) -> &[CatalogItemId] {
2941        &self.used_by
2942    }
2943
2944    /// Returns the connection ID that this item belongs to, if this item is
2945    /// temporary.
2946    pub fn conn_id(&self) -> Option<&ConnectionId> {
2947        self.item.conn_id()
2948    }
2949
2950    /// Returns the role ID of the entry owner.
2951    pub fn owner_id(&self) -> &RoleId {
2952        &self.owner_id
2953    }
2954
2955    /// Returns the privileges of the entry.
2956    pub fn privileges(&self) -> &PrivilegeMap {
2957        &self.privileges
2958    }
2959
2960    /// Returns the comment object ID for this entry.
2961    pub fn comment_object_id(&self) -> CommentObjectId {
2962        use CatalogItemType::*;
2963        match self.item_type() {
2964            Table => CommentObjectId::Table(self.id),
2965            Source => CommentObjectId::Source(self.id),
2966            Sink => CommentObjectId::Sink(self.id),
2967            View => CommentObjectId::View(self.id),
2968            MaterializedView => CommentObjectId::MaterializedView(self.id),
2969            Index => CommentObjectId::Index(self.id),
2970            Func => CommentObjectId::Func(self.id),
2971            Connection => CommentObjectId::Connection(self.id),
2972            Type => CommentObjectId::Type(self.id),
2973            Secret => CommentObjectId::Secret(self.id),
2974        }
2975    }
2976}
2977
2978#[derive(Debug, Clone, Default)]
2979pub struct CommentsMap {
2980    map: BTreeMap<CommentObjectId, BTreeMap<Option<usize>, String>>,
2981}
2982
2983impl CommentsMap {
2984    pub fn update_comment(
2985        &mut self,
2986        object_id: CommentObjectId,
2987        sub_component: Option<usize>,
2988        comment: Option<String>,
2989    ) -> Option<String> {
2990        let object_comments = self.map.entry(object_id).or_default();
2991
2992        // Either replace the existing comment, or remove it if comment is None/NULL.
2993        let (empty, prev) = if let Some(comment) = comment {
2994            let prev = object_comments.insert(sub_component, comment);
2995            (false, prev)
2996        } else {
2997            let prev = object_comments.remove(&sub_component);
2998            (object_comments.is_empty(), prev)
2999        };
3000
3001        // Cleanup entries that are now empty.
3002        if empty {
3003            self.map.remove(&object_id);
3004        }
3005
3006        // Return the previous comment, if there was one, for easy removal.
3007        prev
3008    }
3009
3010    /// Remove all comments for `object_id` from the map.
3011    ///
3012    /// Generally there is one comment for a given [`CommentObjectId`], but in the case of
3013    /// relations you can also have comments on the individual columns. Dropping the comments for a
3014    /// relation will also drop all of the comments on any columns.
3015    pub fn drop_comments(
3016        &mut self,
3017        object_ids: &BTreeSet<CommentObjectId>,
3018    ) -> Vec<(CommentObjectId, Option<usize>, String)> {
3019        let mut removed_comments = Vec::new();
3020
3021        for object_id in object_ids {
3022            if let Some(comments) = self.map.remove(object_id) {
3023                let removed = comments
3024                    .into_iter()
3025                    .map(|(sub_comp, comment)| (object_id.clone(), sub_comp, comment));
3026                removed_comments.extend(removed);
3027            }
3028        }
3029
3030        removed_comments
3031    }
3032
3033    pub fn iter(&self) -> impl Iterator<Item = (CommentObjectId, Option<usize>, &str)> {
3034        self.map
3035            .iter()
3036            .map(|(id, comments)| {
3037                comments
3038                    .iter()
3039                    .map(|(pos, comment)| (*id, *pos, comment.as_str()))
3040            })
3041            .flatten()
3042    }
3043
3044    pub fn get_object_comments(
3045        &self,
3046        object_id: CommentObjectId,
3047    ) -> Option<&BTreeMap<Option<usize>, String>> {
3048        self.map.get(&object_id)
3049    }
3050}
3051
3052impl Serialize for CommentsMap {
3053    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
3054    where
3055        S: serde::Serializer,
3056    {
3057        let comment_count = self
3058            .map
3059            .iter()
3060            .map(|(_object_id, comments)| comments.len())
3061            .sum();
3062
3063        let mut seq = serializer.serialize_seq(Some(comment_count))?;
3064        for (object_id, sub) in &self.map {
3065            for (sub_component, comment) in sub {
3066                seq.serialize_element(&(
3067                    format!("{object_id:?}"),
3068                    format!("{sub_component:?}"),
3069                    comment,
3070                ))?;
3071            }
3072        }
3073        seq.end()
3074    }
3075}
3076
3077#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
3078pub struct DefaultPrivileges {
3079    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
3080    privileges: BTreeMap<DefaultPrivilegeObject, RoleDefaultPrivileges>,
3081}
3082
3083// Use a new type here because otherwise we have two levels of BTreeMap, both needing
3084// map_key_to_string.
3085#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
3086struct RoleDefaultPrivileges(
3087    /// Denormalized, the key is the grantee Role.
3088    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
3089    BTreeMap<RoleId, DefaultPrivilegeAclItem>,
3090);
3091
3092impl Deref for RoleDefaultPrivileges {
3093    type Target = BTreeMap<RoleId, DefaultPrivilegeAclItem>;
3094
3095    fn deref(&self) -> &Self::Target {
3096        &self.0
3097    }
3098}
3099
3100impl DerefMut for RoleDefaultPrivileges {
3101    fn deref_mut(&mut self) -> &mut Self::Target {
3102        &mut self.0
3103    }
3104}
3105
3106impl DefaultPrivileges {
3107    /// Add a new default privilege into the set of all default privileges.
3108    pub fn grant(&mut self, object: DefaultPrivilegeObject, privilege: DefaultPrivilegeAclItem) {
3109        if privilege.acl_mode.is_empty() {
3110            return;
3111        }
3112
3113        let privileges = self.privileges.entry(object).or_default();
3114        if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
3115            default_privilege.acl_mode |= privilege.acl_mode;
3116        } else {
3117            privileges.insert(privilege.grantee, privilege);
3118        }
3119    }
3120
3121    /// Revoke a default privilege from the set of all default privileges.
3122    pub fn revoke(&mut self, object: &DefaultPrivilegeObject, privilege: &DefaultPrivilegeAclItem) {
3123        if let Some(privileges) = self.privileges.get_mut(object) {
3124            if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
3125                default_privilege.acl_mode =
3126                    default_privilege.acl_mode.difference(privilege.acl_mode);
3127                if default_privilege.acl_mode.is_empty() {
3128                    privileges.remove(&privilege.grantee);
3129                }
3130            }
3131            if privileges.is_empty() {
3132                self.privileges.remove(object);
3133            }
3134        }
3135    }
3136
3137    /// Get the privileges that will be granted on all objects matching `object` to `grantee`, if
3138    /// any exist.
3139    pub fn get_privileges_for_grantee(
3140        &self,
3141        object: &DefaultPrivilegeObject,
3142        grantee: &RoleId,
3143    ) -> Option<&AclMode> {
3144        self.privileges
3145            .get(object)
3146            .and_then(|privileges| privileges.get(grantee))
3147            .map(|privilege| &privilege.acl_mode)
3148    }
3149
3150    /// Get all default privileges that apply to the provided object details.
3151    pub fn get_applicable_privileges(
3152        &self,
3153        role_id: RoleId,
3154        database_id: Option<DatabaseId>,
3155        schema_id: Option<SchemaId>,
3156        object_type: mz_sql::catalog::ObjectType,
3157    ) -> impl Iterator<Item = DefaultPrivilegeAclItem> + '_ {
3158        // Privileges consider all relations to be of type table due to PostgreSQL compatibility. We
3159        // don't require the caller to worry about that and we will map their `object_type` to the
3160        // correct type for privileges.
3161        let privilege_object_type = if object_type.is_relation() {
3162            mz_sql::catalog::ObjectType::Table
3163        } else {
3164            object_type
3165        };
3166        let valid_acl_mode = rbac::all_object_privileges(SystemObjectType::Object(object_type));
3167
3168        // Collect all entries that apply to the provided object details.
3169        // If either `database_id` or `schema_id` are `None`, then we might end up with duplicate
3170        // entries in the vec below. That's OK because we consolidate the results after.
3171        [
3172            DefaultPrivilegeObject {
3173                role_id,
3174                database_id,
3175                schema_id,
3176                object_type: privilege_object_type,
3177            },
3178            DefaultPrivilegeObject {
3179                role_id,
3180                database_id,
3181                schema_id: None,
3182                object_type: privilege_object_type,
3183            },
3184            DefaultPrivilegeObject {
3185                role_id,
3186                database_id: None,
3187                schema_id: None,
3188                object_type: privilege_object_type,
3189            },
3190            DefaultPrivilegeObject {
3191                role_id: RoleId::Public,
3192                database_id,
3193                schema_id,
3194                object_type: privilege_object_type,
3195            },
3196            DefaultPrivilegeObject {
3197                role_id: RoleId::Public,
3198                database_id,
3199                schema_id: None,
3200                object_type: privilege_object_type,
3201            },
3202            DefaultPrivilegeObject {
3203                role_id: RoleId::Public,
3204                database_id: None,
3205                schema_id: None,
3206                object_type: privilege_object_type,
3207            },
3208        ]
3209        .into_iter()
3210        .filter_map(|object| self.privileges.get(&object))
3211        .flat_map(|acl_map| acl_map.values())
3212        // Consolidate privileges with a common grantee.
3213        .fold(
3214            BTreeMap::new(),
3215            |mut accum, DefaultPrivilegeAclItem { grantee, acl_mode }| {
3216                let accum_acl_mode = accum.entry(grantee).or_insert_with(AclMode::empty);
3217                *accum_acl_mode |= *acl_mode;
3218                accum
3219            },
3220        )
3221        .into_iter()
3222        // Restrict the acl_mode to only privileges valid for the provided object type. If the
3223        // default privilege has an object type of Table, then it may contain privileges valid for
3224        // tables but not other relations. If the passed in object type is another relation, then
3225        // we need to remove any privilege that is not valid for the specified relation.
3226        .map(move |(grantee, acl_mode)| (grantee, acl_mode & valid_acl_mode))
3227        // Filter out empty privileges.
3228        .filter(|(_, acl_mode)| !acl_mode.is_empty())
3229        .map(|(grantee, acl_mode)| DefaultPrivilegeAclItem {
3230            grantee: *grantee,
3231            acl_mode,
3232        })
3233    }
3234
3235    pub fn iter(
3236        &self,
3237    ) -> impl Iterator<
3238        Item = (
3239            &DefaultPrivilegeObject,
3240            impl Iterator<Item = &DefaultPrivilegeAclItem>,
3241        ),
3242    > {
3243        self.privileges
3244            .iter()
3245            .map(|(object, acl_map)| (object, acl_map.values()))
3246    }
3247}
3248
3249#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3250pub struct ClusterConfig {
3251    pub variant: ClusterVariant,
3252    pub workload_class: Option<String>,
3253}
3254
3255impl ClusterConfig {
3256    pub fn features(&self) -> Option<&OptimizerFeatureOverrides> {
3257        match &self.variant {
3258            ClusterVariant::Managed(managed) => Some(&managed.optimizer_feature_overrides),
3259            ClusterVariant::Unmanaged => None,
3260        }
3261    }
3262}
3263
3264impl From<ClusterConfig> for durable::ClusterConfig {
3265    fn from(config: ClusterConfig) -> Self {
3266        Self {
3267            variant: config.variant.into(),
3268            workload_class: config.workload_class,
3269        }
3270    }
3271}
3272
3273impl From<durable::ClusterConfig> for ClusterConfig {
3274    fn from(config: durable::ClusterConfig) -> Self {
3275        Self {
3276            variant: config.variant.into(),
3277            workload_class: config.workload_class,
3278        }
3279    }
3280}
3281
3282#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3283pub struct ClusterVariantManaged {
3284    pub size: String,
3285    pub availability_zones: Vec<String>,
3286    pub logging: ReplicaLogging,
3287    pub replication_factor: u32,
3288    pub optimizer_feature_overrides: OptimizerFeatureOverrides,
3289    pub schedule: ClusterSchedule,
3290}
3291
3292impl From<ClusterVariantManaged> for durable::ClusterVariantManaged {
3293    fn from(managed: ClusterVariantManaged) -> Self {
3294        Self {
3295            size: managed.size,
3296            availability_zones: managed.availability_zones,
3297            logging: managed.logging,
3298            replication_factor: managed.replication_factor,
3299            optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3300            schedule: managed.schedule,
3301        }
3302    }
3303}
3304
3305impl From<durable::ClusterVariantManaged> for ClusterVariantManaged {
3306    fn from(managed: durable::ClusterVariantManaged) -> Self {
3307        Self {
3308            size: managed.size,
3309            availability_zones: managed.availability_zones,
3310            logging: managed.logging,
3311            replication_factor: managed.replication_factor,
3312            optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3313            schedule: managed.schedule,
3314        }
3315    }
3316}
3317
3318#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3319pub enum ClusterVariant {
3320    Managed(ClusterVariantManaged),
3321    Unmanaged,
3322}
3323
3324impl From<ClusterVariant> for durable::ClusterVariant {
3325    fn from(variant: ClusterVariant) -> Self {
3326        match variant {
3327            ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3328            ClusterVariant::Unmanaged => Self::Unmanaged,
3329        }
3330    }
3331}
3332
3333impl From<durable::ClusterVariant> for ClusterVariant {
3334    fn from(variant: durable::ClusterVariant) -> Self {
3335        match variant {
3336            durable::ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3337            durable::ClusterVariant::Unmanaged => Self::Unmanaged,
3338        }
3339    }
3340}
3341
3342impl mz_sql::catalog::CatalogDatabase for Database {
3343    fn name(&self) -> &str {
3344        &self.name
3345    }
3346
3347    fn id(&self) -> DatabaseId {
3348        self.id
3349    }
3350
3351    fn has_schemas(&self) -> bool {
3352        !self.schemas_by_name.is_empty()
3353    }
3354
3355    fn schema_ids(&self) -> &BTreeMap<String, SchemaId> {
3356        &self.schemas_by_name
3357    }
3358
3359    // `as` is ok to use to cast to a trait object.
3360    #[allow(clippy::as_conversions)]
3361    fn schemas(&self) -> Vec<&dyn CatalogSchema> {
3362        self.schemas_by_id
3363            .values()
3364            .map(|schema| schema as &dyn CatalogSchema)
3365            .collect()
3366    }
3367
3368    fn owner_id(&self) -> RoleId {
3369        self.owner_id
3370    }
3371
3372    fn privileges(&self) -> &PrivilegeMap {
3373        &self.privileges
3374    }
3375}
3376
3377impl mz_sql::catalog::CatalogSchema for Schema {
3378    fn database(&self) -> &ResolvedDatabaseSpecifier {
3379        &self.name.database
3380    }
3381
3382    fn name(&self) -> &QualifiedSchemaName {
3383        &self.name
3384    }
3385
3386    fn id(&self) -> &SchemaSpecifier {
3387        &self.id
3388    }
3389
3390    fn has_items(&self) -> bool {
3391        !self.items.is_empty()
3392    }
3393
3394    fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_> {
3395        Box::new(
3396            self.items
3397                .values()
3398                .chain(self.functions.values())
3399                .chain(self.types.values())
3400                .copied(),
3401        )
3402    }
3403
3404    fn owner_id(&self) -> RoleId {
3405        self.owner_id
3406    }
3407
3408    fn privileges(&self) -> &PrivilegeMap {
3409        &self.privileges
3410    }
3411}
3412
3413impl mz_sql::catalog::CatalogRole for Role {
3414    fn name(&self) -> &str {
3415        &self.name
3416    }
3417
3418    fn id(&self) -> RoleId {
3419        self.id
3420    }
3421
3422    fn membership(&self) -> &BTreeMap<RoleId, RoleId> {
3423        &self.membership.map
3424    }
3425
3426    fn attributes(&self) -> &RoleAttributes {
3427        &self.attributes
3428    }
3429
3430    fn vars(&self) -> &BTreeMap<String, OwnedVarInput> {
3431        &self.vars.map
3432    }
3433}
3434
3435impl mz_sql::catalog::CatalogNetworkPolicy for NetworkPolicy {
3436    fn name(&self) -> &str {
3437        &self.name
3438    }
3439
3440    fn id(&self) -> NetworkPolicyId {
3441        self.id
3442    }
3443
3444    fn owner_id(&self) -> RoleId {
3445        self.owner_id
3446    }
3447
3448    fn privileges(&self) -> &PrivilegeMap {
3449        &self.privileges
3450    }
3451}
3452
3453impl mz_sql::catalog::CatalogCluster<'_> for Cluster {
3454    fn name(&self) -> &str {
3455        &self.name
3456    }
3457
3458    fn id(&self) -> ClusterId {
3459        self.id
3460    }
3461
3462    fn bound_objects(&self) -> &BTreeSet<CatalogItemId> {
3463        &self.bound_objects
3464    }
3465
3466    fn replica_ids(&self) -> &BTreeMap<String, ReplicaId> {
3467        &self.replica_id_by_name_
3468    }
3469
3470    // `as` is ok to use to cast to a trait object.
3471    #[allow(clippy::as_conversions)]
3472    fn replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>> {
3473        self.replicas()
3474            .map(|replica| replica as &dyn CatalogClusterReplica)
3475            .collect()
3476    }
3477
3478    fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica<'_> {
3479        self.replica(id).expect("catalog out of sync")
3480    }
3481
3482    fn owner_id(&self) -> RoleId {
3483        self.owner_id
3484    }
3485
3486    fn privileges(&self) -> &PrivilegeMap {
3487        &self.privileges
3488    }
3489
3490    fn is_managed(&self) -> bool {
3491        self.is_managed()
3492    }
3493
3494    fn managed_size(&self) -> Option<&str> {
3495        match &self.config.variant {
3496            ClusterVariant::Managed(ClusterVariantManaged { size, .. }) => Some(size),
3497            ClusterVariant::Unmanaged => None,
3498        }
3499    }
3500
3501    fn schedule(&self) -> Option<&ClusterSchedule> {
3502        match &self.config.variant {
3503            ClusterVariant::Managed(ClusterVariantManaged { schedule, .. }) => Some(schedule),
3504            ClusterVariant::Unmanaged => None,
3505        }
3506    }
3507
3508    fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
3509        self.try_to_plan()
3510    }
3511}
3512
3513impl mz_sql::catalog::CatalogClusterReplica<'_> for ClusterReplica {
3514    fn name(&self) -> &str {
3515        &self.name
3516    }
3517
3518    fn cluster_id(&self) -> ClusterId {
3519        self.cluster_id
3520    }
3521
3522    fn replica_id(&self) -> ReplicaId {
3523        self.replica_id
3524    }
3525
3526    fn owner_id(&self) -> RoleId {
3527        self.owner_id
3528    }
3529
3530    fn internal(&self) -> bool {
3531        self.config.location.internal()
3532    }
3533}
3534
3535impl mz_sql::catalog::CatalogItem for CatalogEntry {
3536    fn name(&self) -> &QualifiedItemName {
3537        self.name()
3538    }
3539
3540    fn id(&self) -> CatalogItemId {
3541        self.id()
3542    }
3543
3544    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
3545        Box::new(self.global_ids())
3546    }
3547
3548    fn oid(&self) -> u32 {
3549        self.oid()
3550    }
3551
3552    fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
3553        self.func()
3554    }
3555
3556    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
3557        self.source_desc()
3558    }
3559
3560    fn connection(
3561        &self,
3562    ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
3563    {
3564        Ok(self.connection()?.details.to_connection())
3565    }
3566
3567    fn create_sql(&self) -> &str {
3568        match self.item() {
3569            CatalogItem::Table(Table { create_sql, .. }) => {
3570                create_sql.as_deref().unwrap_or("<builtin>")
3571            }
3572            CatalogItem::Source(Source { create_sql, .. }) => {
3573                create_sql.as_deref().unwrap_or("<builtin>")
3574            }
3575            CatalogItem::Sink(Sink { create_sql, .. }) => create_sql,
3576            CatalogItem::View(View { create_sql, .. }) => create_sql,
3577            CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) => create_sql,
3578            CatalogItem::Index(Index { create_sql, .. }) => create_sql,
3579            CatalogItem::Type(Type { create_sql, .. }) => {
3580                create_sql.as_deref().unwrap_or("<builtin>")
3581            }
3582            CatalogItem::Secret(Secret { create_sql, .. }) => create_sql,
3583            CatalogItem::Connection(Connection { create_sql, .. }) => create_sql,
3584            CatalogItem::Func(_) => "<builtin>",
3585            CatalogItem::Log(_) => "<builtin>",
3586        }
3587    }
3588
3589    fn item_type(&self) -> SqlCatalogItemType {
3590        self.item().typ()
3591    }
3592
3593    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
3594        if let CatalogItem::Index(Index { keys, on, .. }) = self.item() {
3595            Some((keys, *on))
3596        } else {
3597            None
3598        }
3599    }
3600
3601    fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
3602        if let CatalogItem::Table(Table {
3603            data_source: TableDataSource::TableWrites { defaults },
3604            ..
3605        }) = self.item()
3606        {
3607            Some(defaults.as_slice())
3608        } else {
3609            None
3610        }
3611    }
3612
3613    fn replacement_target(&self) -> Option<CatalogItemId> {
3614        if let CatalogItem::MaterializedView(mv) = self.item() {
3615            mv.replacement_target
3616        } else {
3617            None
3618        }
3619    }
3620
3621    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
3622        if let CatalogItem::Type(Type { details, .. }) = self.item() {
3623            Some(details)
3624        } else {
3625            None
3626        }
3627    }
3628
3629    fn references(&self) -> &ResolvedIds {
3630        self.references()
3631    }
3632
3633    fn uses(&self) -> BTreeSet<CatalogItemId> {
3634        self.uses()
3635    }
3636
3637    fn referenced_by(&self) -> &[CatalogItemId] {
3638        self.referenced_by()
3639    }
3640
3641    fn used_by(&self) -> &[CatalogItemId] {
3642        self.used_by()
3643    }
3644
3645    fn subsource_details(
3646        &self,
3647    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
3648        self.subsource_details()
3649    }
3650
3651    fn source_export_details(
3652        &self,
3653    ) -> Option<(
3654        CatalogItemId,
3655        &UnresolvedItemName,
3656        &SourceExportDetails,
3657        &SourceExportDataConfig<ReferencedConnection>,
3658    )> {
3659        self.source_export_details()
3660    }
3661
3662    fn is_progress_source(&self) -> bool {
3663        self.is_progress_source()
3664    }
3665
3666    fn progress_id(&self) -> Option<CatalogItemId> {
3667        self.progress_id()
3668    }
3669
3670    fn owner_id(&self) -> RoleId {
3671        self.owner_id
3672    }
3673
3674    fn privileges(&self) -> &PrivilegeMap {
3675        &self.privileges
3676    }
3677
3678    fn cluster_id(&self) -> Option<ClusterId> {
3679        self.item().cluster_id()
3680    }
3681
3682    fn at_version(
3683        &self,
3684        version: RelationVersionSelector,
3685    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
3686        Box::new(CatalogCollectionEntry {
3687            entry: self.clone(),
3688            version,
3689        })
3690    }
3691
3692    fn latest_version(&self) -> Option<RelationVersion> {
3693        self.table().map(|t| t.desc.latest_version())
3694    }
3695}
3696
3697/// A single update to the catalog state.
3698#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3699pub struct StateUpdate {
3700    pub kind: StateUpdateKind,
3701    pub ts: Timestamp,
3702    pub diff: StateDiff,
3703}
3704
3705/// The contents of a single state update.
3706///
3707/// Variants are listed in dependency order.
3708#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3709pub enum StateUpdateKind {
3710    Role(durable::objects::Role),
3711    RoleAuth(durable::objects::RoleAuth),
3712    Database(durable::objects::Database),
3713    Schema(durable::objects::Schema),
3714    DefaultPrivilege(durable::objects::DefaultPrivilege),
3715    SystemPrivilege(MzAclItem),
3716    SystemConfiguration(durable::objects::SystemConfiguration),
3717    Cluster(durable::objects::Cluster),
3718    NetworkPolicy(durable::objects::NetworkPolicy),
3719    IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3720    ClusterReplica(durable::objects::ClusterReplica),
3721    SourceReferences(durable::objects::SourceReferences),
3722    SystemObjectMapping(durable::objects::SystemObjectMapping),
3723    // Temporary items are not actually updated via the durable catalog, but
3724    // this allows us to model them the same way as all other items in parts of
3725    // the pipeline.
3726    TemporaryItem(TemporaryItem),
3727    Item(durable::objects::Item),
3728    Comment(durable::objects::Comment),
3729    AuditLog(durable::objects::AuditLog),
3730    // Storage updates.
3731    StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3732    UnfinalizedShard(durable::objects::UnfinalizedShard),
3733}
3734
3735/// Valid diffs for catalog state updates.
3736#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
3737pub enum StateDiff {
3738    Retraction,
3739    Addition,
3740}
3741
3742impl From<StateDiff> for Diff {
3743    fn from(diff: StateDiff) -> Self {
3744        match diff {
3745            StateDiff::Retraction => Diff::MINUS_ONE,
3746            StateDiff::Addition => Diff::ONE,
3747        }
3748    }
3749}
3750impl TryFrom<Diff> for StateDiff {
3751    type Error = String;
3752
3753    fn try_from(diff: Diff) -> Result<Self, Self::Error> {
3754        match diff {
3755            Diff::MINUS_ONE => Ok(Self::Retraction),
3756            Diff::ONE => Ok(Self::Addition),
3757            diff => Err(format!("invalid diff {diff}")),
3758        }
3759    }
3760}
3761
3762/// Information needed to process an update to a temporary item.
3763#[derive(Debug, Clone, Ord, PartialOrd, PartialEq, Eq)]
3764pub struct TemporaryItem {
3765    pub id: CatalogItemId,
3766    pub oid: u32,
3767    pub global_id: GlobalId,
3768    pub schema_id: SchemaId,
3769    pub name: String,
3770    pub conn_id: Option<ConnectionId>,
3771    pub create_sql: String,
3772    pub owner_id: RoleId,
3773    pub privileges: Vec<MzAclItem>,
3774    pub extra_versions: BTreeMap<RelationVersion, GlobalId>,
3775}
3776
3777impl From<CatalogEntry> for TemporaryItem {
3778    fn from(entry: CatalogEntry) -> Self {
3779        let conn_id = entry.conn_id().cloned();
3780        let (create_sql, global_id, extra_versions) = entry.item.to_serialized();
3781
3782        TemporaryItem {
3783            id: entry.id,
3784            oid: entry.oid,
3785            global_id,
3786            schema_id: entry.name.qualifiers.schema_spec.into(),
3787            name: entry.name.item,
3788            conn_id,
3789            create_sql,
3790            owner_id: entry.owner_id,
3791            privileges: entry.privileges.into_all_values().collect(),
3792            extra_versions,
3793        }
3794    }
3795}
3796
3797impl TemporaryItem {
3798    pub fn item_type(&self) -> CatalogItemType {
3799        item_type(&self.create_sql)
3800    }
3801}
3802
3803/// The same as [`StateUpdateKind`], but without `TemporaryItem` so we can derive [`Ord`].
3804#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
3805pub enum BootstrapStateUpdateKind {
3806    Role(durable::objects::Role),
3807    RoleAuth(durable::objects::RoleAuth),
3808    Database(durable::objects::Database),
3809    Schema(durable::objects::Schema),
3810    DefaultPrivilege(durable::objects::DefaultPrivilege),
3811    SystemPrivilege(MzAclItem),
3812    SystemConfiguration(durable::objects::SystemConfiguration),
3813    Cluster(durable::objects::Cluster),
3814    NetworkPolicy(durable::objects::NetworkPolicy),
3815    IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3816    ClusterReplica(durable::objects::ClusterReplica),
3817    SourceReferences(durable::objects::SourceReferences),
3818    SystemObjectMapping(durable::objects::SystemObjectMapping),
3819    Item(durable::objects::Item),
3820    Comment(durable::objects::Comment),
3821    AuditLog(durable::objects::AuditLog),
3822    // Storage updates.
3823    StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3824    UnfinalizedShard(durable::objects::UnfinalizedShard),
3825}
3826
3827impl From<BootstrapStateUpdateKind> for StateUpdateKind {
3828    fn from(value: BootstrapStateUpdateKind) -> Self {
3829        match value {
3830            BootstrapStateUpdateKind::Role(kind) => StateUpdateKind::Role(kind),
3831            BootstrapStateUpdateKind::RoleAuth(kind) => StateUpdateKind::RoleAuth(kind),
3832            BootstrapStateUpdateKind::Database(kind) => StateUpdateKind::Database(kind),
3833            BootstrapStateUpdateKind::Schema(kind) => StateUpdateKind::Schema(kind),
3834            BootstrapStateUpdateKind::DefaultPrivilege(kind) => {
3835                StateUpdateKind::DefaultPrivilege(kind)
3836            }
3837            BootstrapStateUpdateKind::SystemPrivilege(kind) => {
3838                StateUpdateKind::SystemPrivilege(kind)
3839            }
3840            BootstrapStateUpdateKind::SystemConfiguration(kind) => {
3841                StateUpdateKind::SystemConfiguration(kind)
3842            }
3843            BootstrapStateUpdateKind::SourceReferences(kind) => {
3844                StateUpdateKind::SourceReferences(kind)
3845            }
3846            BootstrapStateUpdateKind::Cluster(kind) => StateUpdateKind::Cluster(kind),
3847            BootstrapStateUpdateKind::NetworkPolicy(kind) => StateUpdateKind::NetworkPolicy(kind),
3848            BootstrapStateUpdateKind::IntrospectionSourceIndex(kind) => {
3849                StateUpdateKind::IntrospectionSourceIndex(kind)
3850            }
3851            BootstrapStateUpdateKind::ClusterReplica(kind) => StateUpdateKind::ClusterReplica(kind),
3852            BootstrapStateUpdateKind::SystemObjectMapping(kind) => {
3853                StateUpdateKind::SystemObjectMapping(kind)
3854            }
3855            BootstrapStateUpdateKind::Item(kind) => StateUpdateKind::Item(kind),
3856            BootstrapStateUpdateKind::Comment(kind) => StateUpdateKind::Comment(kind),
3857            BootstrapStateUpdateKind::AuditLog(kind) => StateUpdateKind::AuditLog(kind),
3858            BootstrapStateUpdateKind::StorageCollectionMetadata(kind) => {
3859                StateUpdateKind::StorageCollectionMetadata(kind)
3860            }
3861            BootstrapStateUpdateKind::UnfinalizedShard(kind) => {
3862                StateUpdateKind::UnfinalizedShard(kind)
3863            }
3864        }
3865    }
3866}
3867
3868impl TryFrom<StateUpdateKind> for BootstrapStateUpdateKind {
3869    type Error = TemporaryItem;
3870
3871    fn try_from(value: StateUpdateKind) -> Result<Self, Self::Error> {
3872        match value {
3873            StateUpdateKind::Role(kind) => Ok(BootstrapStateUpdateKind::Role(kind)),
3874            StateUpdateKind::RoleAuth(kind) => Ok(BootstrapStateUpdateKind::RoleAuth(kind)),
3875            StateUpdateKind::Database(kind) => Ok(BootstrapStateUpdateKind::Database(kind)),
3876            StateUpdateKind::Schema(kind) => Ok(BootstrapStateUpdateKind::Schema(kind)),
3877            StateUpdateKind::DefaultPrivilege(kind) => {
3878                Ok(BootstrapStateUpdateKind::DefaultPrivilege(kind))
3879            }
3880            StateUpdateKind::SystemPrivilege(kind) => {
3881                Ok(BootstrapStateUpdateKind::SystemPrivilege(kind))
3882            }
3883            StateUpdateKind::SystemConfiguration(kind) => {
3884                Ok(BootstrapStateUpdateKind::SystemConfiguration(kind))
3885            }
3886            StateUpdateKind::Cluster(kind) => Ok(BootstrapStateUpdateKind::Cluster(kind)),
3887            StateUpdateKind::NetworkPolicy(kind) => {
3888                Ok(BootstrapStateUpdateKind::NetworkPolicy(kind))
3889            }
3890            StateUpdateKind::IntrospectionSourceIndex(kind) => {
3891                Ok(BootstrapStateUpdateKind::IntrospectionSourceIndex(kind))
3892            }
3893            StateUpdateKind::ClusterReplica(kind) => {
3894                Ok(BootstrapStateUpdateKind::ClusterReplica(kind))
3895            }
3896            StateUpdateKind::SourceReferences(kind) => {
3897                Ok(BootstrapStateUpdateKind::SourceReferences(kind))
3898            }
3899            StateUpdateKind::SystemObjectMapping(kind) => {
3900                Ok(BootstrapStateUpdateKind::SystemObjectMapping(kind))
3901            }
3902            StateUpdateKind::TemporaryItem(kind) => Err(kind),
3903            StateUpdateKind::Item(kind) => Ok(BootstrapStateUpdateKind::Item(kind)),
3904            StateUpdateKind::Comment(kind) => Ok(BootstrapStateUpdateKind::Comment(kind)),
3905            StateUpdateKind::AuditLog(kind) => Ok(BootstrapStateUpdateKind::AuditLog(kind)),
3906            StateUpdateKind::StorageCollectionMetadata(kind) => {
3907                Ok(BootstrapStateUpdateKind::StorageCollectionMetadata(kind))
3908            }
3909            StateUpdateKind::UnfinalizedShard(kind) => {
3910                Ok(BootstrapStateUpdateKind::UnfinalizedShard(kind))
3911            }
3912        }
3913    }
3914}