Skip to main content

mz_catalog/memory/
objects.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The current types used by the in-memory Catalog. Many of the objects in this module are
11//! extremely similar to the objects found in [`crate::durable::objects`] but in a format that is
12//! easier consumed by higher layers.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use chrono::{DateTime, Utc};
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_compute_client::logging::LogVariant;
24use mz_compute_types::dataflows::DataflowDescription;
25use mz_compute_types::plan::Plan as ComputePlan;
26use mz_controller::clusters::{ClusterRole, ClusterStatus, ReplicaConfig, ReplicaLogging};
27use mz_controller_types::{ClusterId, ReplicaId};
28use mz_expr::{MirScalarExpr, OptimizedMirRelationExpr};
29use mz_ore::collections::CollectionExt;
30use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
31use mz_repr::network_policy_id::NetworkPolicyId;
32use mz_repr::optimize::OptimizerFeatureOverrides;
33use mz_repr::refresh_schedule::RefreshSchedule;
34use mz_repr::role_id::RoleId;
35use mz_repr::{
36    CatalogItemId, ColumnName, Diff, GlobalId, RelationDesc, RelationVersion,
37    RelationVersionSelector, SqlColumnType, Timestamp, VersionedRelationDesc,
38};
39use mz_sql::ast::display::AstDisplay;
40use mz_sql::ast::{
41    ColumnDef, ColumnOption, ColumnOptionDef, ColumnVersioned, Expr, Raw, RawDataType, Statement,
42    UnresolvedItemName, Value, WithOptionValue,
43};
44use mz_sql::catalog::{
45    CatalogClusterReplica, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem,
46    CatalogItemType as SqlCatalogItemType, CatalogItemType, CatalogSchema, CatalogType,
47    CatalogTypeDetails, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference,
48    RoleAttributes, RoleMembership, RoleVars, SystemObjectType,
49};
50use mz_sql::names::{
51    Aug, CommentObjectId, DatabaseId, DependencyIds, FullItemName, QualifiedItemName,
52    QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier,
53};
54use mz_sql::plan::{
55    ClusterSchedule, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, ConnectionDetails,
56    CreateClusterManagedPlan, CreateClusterPlan, CreateClusterVariant, CreateSourcePlan,
57    HirRelationExpr, NetworkPolicyRule, PlanError, WebhookBodyFormat, WebhookHeaders,
58    WebhookValidation,
59};
60use mz_sql::rbac;
61use mz_sql::session::vars::OwnedVarInput;
62use mz_storage_client::controller::IntrospectionType;
63use mz_storage_types::connections::inline::ReferencedConnection;
64use mz_storage_types::sinks::{SinkEnvelope, StorageSinkConnection};
65use mz_storage_types::sources::load_generator::LoadGenerator;
66use mz_storage_types::sources::{
67    GenericSourceConnection, SourceConnection, SourceDesc, SourceEnvelope, SourceExportDataConfig,
68    SourceExportDetails, Timeline,
69};
70use mz_transform::dataflow::DataflowMetainfo;
71use mz_transform::notice::OptimizerNotice;
72use serde::ser::SerializeSeq;
73use serde::{Deserialize, Serialize};
74use timely::progress::Antichain;
75use tracing::debug;
76
77use crate::builtin::{MZ_CATALOG_SERVER_CLUSTER, MZ_SYSTEM_CLUSTER};
78use crate::durable;
79use crate::durable::objects::item_type;
80
81/// Used to update `self` from the input value while consuming the input value.
82pub trait UpdateFrom<T>: From<T> {
83    fn update_from(&mut self, from: T);
84}
85
86#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
87pub struct Database {
88    pub name: String,
89    pub id: DatabaseId,
90    pub oid: u32,
91    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
92    pub schemas_by_id: BTreeMap<SchemaId, Schema>,
93    pub schemas_by_name: BTreeMap<String, SchemaId>,
94    pub owner_id: RoleId,
95    pub privileges: PrivilegeMap,
96}
97
98impl From<Database> for durable::Database {
99    fn from(database: Database) -> durable::Database {
100        durable::Database {
101            id: database.id,
102            oid: database.oid,
103            name: database.name,
104            owner_id: database.owner_id,
105            privileges: database.privileges.into_all_values().collect(),
106        }
107    }
108}
109
110impl From<durable::Database> for Database {
111    fn from(
112        durable::Database {
113            id,
114            oid,
115            name,
116            owner_id,
117            privileges,
118        }: durable::Database,
119    ) -> Database {
120        Database {
121            id,
122            oid,
123            schemas_by_id: BTreeMap::new(),
124            schemas_by_name: BTreeMap::new(),
125            name,
126            owner_id,
127            privileges: PrivilegeMap::from_mz_acl_items(privileges),
128        }
129    }
130}
131
132impl UpdateFrom<durable::Database> for Database {
133    fn update_from(
134        &mut self,
135        durable::Database {
136            id,
137            oid,
138            name,
139            owner_id,
140            privileges,
141        }: durable::Database,
142    ) {
143        self.id = id;
144        self.oid = oid;
145        self.name = name;
146        self.owner_id = owner_id;
147        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
148    }
149}
150
151#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
152pub struct Schema {
153    pub name: QualifiedSchemaName,
154    pub id: SchemaSpecifier,
155    pub oid: u32,
156    pub items: BTreeMap<String, CatalogItemId>,
157    pub functions: BTreeMap<String, CatalogItemId>,
158    pub types: BTreeMap<String, CatalogItemId>,
159    pub owner_id: RoleId,
160    pub privileges: PrivilegeMap,
161}
162
163impl From<Schema> for durable::Schema {
164    fn from(schema: Schema) -> durable::Schema {
165        durable::Schema {
166            id: schema.id.into(),
167            oid: schema.oid,
168            name: schema.name.schema,
169            database_id: schema.name.database.id(),
170            owner_id: schema.owner_id,
171            privileges: schema.privileges.into_all_values().collect(),
172        }
173    }
174}
175
176impl From<durable::Schema> for Schema {
177    fn from(
178        durable::Schema {
179            id,
180            oid,
181            name,
182            database_id,
183            owner_id,
184            privileges,
185        }: durable::Schema,
186    ) -> Schema {
187        Schema {
188            name: QualifiedSchemaName {
189                database: database_id.into(),
190                schema: name,
191            },
192            id: id.into(),
193            oid,
194            items: BTreeMap::new(),
195            functions: BTreeMap::new(),
196            types: BTreeMap::new(),
197            owner_id,
198            privileges: PrivilegeMap::from_mz_acl_items(privileges),
199        }
200    }
201}
202
203impl UpdateFrom<durable::Schema> for Schema {
204    fn update_from(
205        &mut self,
206        durable::Schema {
207            id,
208            oid,
209            name,
210            database_id,
211            owner_id,
212            privileges,
213        }: durable::Schema,
214    ) {
215        self.name = QualifiedSchemaName {
216            database: database_id.into(),
217            schema: name,
218        };
219        self.id = id.into();
220        self.oid = oid;
221        self.owner_id = owner_id;
222        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
223    }
224}
225
226#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
227pub struct Role {
228    pub name: String,
229    pub id: RoleId,
230    pub oid: u32,
231    pub attributes: RoleAttributes,
232    pub membership: RoleMembership,
233    pub vars: RoleVars,
234}
235
236impl Role {
237    pub fn is_user(&self) -> bool {
238        self.id.is_user()
239    }
240
241    pub fn vars<'a>(&'a self) -> impl Iterator<Item = (&'a str, &'a OwnedVarInput)> {
242        self.vars.map.iter().map(|(name, val)| (name.as_str(), val))
243    }
244}
245
246impl From<Role> for durable::Role {
247    fn from(role: Role) -> durable::Role {
248        durable::Role {
249            id: role.id,
250            oid: role.oid,
251            name: role.name,
252            attributes: role.attributes,
253            membership: role.membership,
254            vars: role.vars,
255        }
256    }
257}
258
259impl From<durable::Role> for Role {
260    fn from(
261        durable::Role {
262            id,
263            oid,
264            name,
265            attributes,
266            membership,
267            vars,
268        }: durable::Role,
269    ) -> Self {
270        Role {
271            name,
272            id,
273            oid,
274            attributes,
275            membership,
276            vars,
277        }
278    }
279}
280
281impl UpdateFrom<durable::Role> for Role {
282    fn update_from(
283        &mut self,
284        durable::Role {
285            id,
286            oid,
287            name,
288            attributes,
289            membership,
290            vars,
291        }: durable::Role,
292    ) {
293        self.id = id;
294        self.oid = oid;
295        self.name = name;
296        self.attributes = attributes;
297        self.membership = membership;
298        self.vars = vars;
299    }
300}
301
302#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
303pub struct RoleAuth {
304    pub role_id: RoleId,
305    pub password_hash: Option<String>,
306    pub updated_at: u64,
307}
308
309impl From<RoleAuth> for durable::RoleAuth {
310    fn from(role_auth: RoleAuth) -> durable::RoleAuth {
311        durable::RoleAuth {
312            role_id: role_auth.role_id,
313            password_hash: role_auth.password_hash,
314            updated_at: role_auth.updated_at,
315        }
316    }
317}
318
319impl From<durable::RoleAuth> for RoleAuth {
320    fn from(
321        durable::RoleAuth {
322            role_id,
323            password_hash,
324            updated_at,
325        }: durable::RoleAuth,
326    ) -> RoleAuth {
327        RoleAuth {
328            role_id,
329            password_hash,
330            updated_at,
331        }
332    }
333}
334
335impl UpdateFrom<durable::RoleAuth> for RoleAuth {
336    fn update_from(&mut self, from: durable::RoleAuth) {
337        self.role_id = from.role_id;
338        self.password_hash = from.password_hash;
339    }
340}
341
342#[derive(Debug, Serialize, Clone, PartialEq)]
343pub struct Cluster {
344    pub name: String,
345    pub id: ClusterId,
346    pub config: ClusterConfig,
347    #[serde(skip)]
348    pub log_indexes: BTreeMap<LogVariant, GlobalId>,
349    /// Objects bound to this cluster. Does not include introspection source
350    /// indexes.
351    pub bound_objects: BTreeSet<CatalogItemId>,
352    pub replica_id_by_name_: BTreeMap<String, ReplicaId>,
353    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
354    pub replicas_by_id_: BTreeMap<ReplicaId, ClusterReplica>,
355    pub owner_id: RoleId,
356    pub privileges: PrivilegeMap,
357}
358
359impl Cluster {
360    /// The role of the cluster. Currently used to set alert severity.
361    pub fn role(&self) -> ClusterRole {
362        // NOTE - These roles power monitoring systems. Do not change
363        // them without talking to the cloud or observability groups.
364        if self.name == MZ_SYSTEM_CLUSTER.name {
365            ClusterRole::SystemCritical
366        } else if self.name == MZ_CATALOG_SERVER_CLUSTER.name {
367            ClusterRole::System
368        } else {
369            ClusterRole::User
370        }
371    }
372
373    /// Returns `true` if the cluster is a managed cluster.
374    pub fn is_managed(&self) -> bool {
375        matches!(self.config.variant, ClusterVariant::Managed { .. })
376    }
377
378    /// Lists the user replicas, which are those that do not have the internal flag set.
379    pub fn user_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
380        self.replicas().filter(|r| !r.config.location.internal())
381    }
382
383    /// Lists all replicas in the cluster
384    pub fn replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
385        self.replicas_by_id_.values()
386    }
387
388    /// Lookup a replica by ID.
389    pub fn replica(&self, replica_id: ReplicaId) -> Option<&ClusterReplica> {
390        self.replicas_by_id_.get(&replica_id)
391    }
392
393    /// Lookup a replica ID by name.
394    pub fn replica_id(&self, name: &str) -> Option<ReplicaId> {
395        self.replica_id_by_name_.get(name).copied()
396    }
397
398    /// Returns the availability zones of this cluster, if they exist.
399    pub fn availability_zones(&self) -> Option<&[String]> {
400        match &self.config.variant {
401            ClusterVariant::Managed(managed) => Some(&managed.availability_zones),
402            ClusterVariant::Unmanaged => None,
403        }
404    }
405
406    pub fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
407        let name = self.name.clone();
408        let variant = match &self.config.variant {
409            ClusterVariant::Managed(ClusterVariantManaged {
410                size,
411                availability_zones,
412                logging,
413                replication_factor,
414                optimizer_feature_overrides,
415                schedule,
416            }) => {
417                let introspection = match logging {
418                    ReplicaLogging {
419                        log_logging,
420                        interval: Some(interval),
421                    } => Some(ComputeReplicaIntrospectionConfig {
422                        debugging: *log_logging,
423                        interval: interval.clone(),
424                    }),
425                    ReplicaLogging {
426                        log_logging: _,
427                        interval: None,
428                    } => None,
429                };
430                let compute = ComputeReplicaConfig { introspection };
431                CreateClusterVariant::Managed(CreateClusterManagedPlan {
432                    replication_factor: replication_factor.clone(),
433                    size: size.clone(),
434                    availability_zones: availability_zones.clone(),
435                    compute,
436                    optimizer_feature_overrides: optimizer_feature_overrides.clone(),
437                    schedule: schedule.clone(),
438                })
439            }
440            ClusterVariant::Unmanaged => {
441                // Unmanaged clusters are deprecated, so hopefully we can remove
442                // them before we have to implement this.
443                return Err(PlanError::Unsupported {
444                    feature: "SHOW CREATE for unmanaged clusters".to_string(),
445                    discussion_no: None,
446                });
447            }
448        };
449        let workload_class = self.config.workload_class.clone();
450        Ok(CreateClusterPlan {
451            name,
452            variant,
453            workload_class,
454        })
455    }
456}
457
458impl From<Cluster> for durable::Cluster {
459    fn from(cluster: Cluster) -> durable::Cluster {
460        durable::Cluster {
461            id: cluster.id,
462            name: cluster.name,
463            owner_id: cluster.owner_id,
464            privileges: cluster.privileges.into_all_values().collect(),
465            config: cluster.config.into(),
466        }
467    }
468}
469
470impl From<durable::Cluster> for Cluster {
471    fn from(
472        durable::Cluster {
473            id,
474            name,
475            owner_id,
476            privileges,
477            config,
478        }: durable::Cluster,
479    ) -> Self {
480        Cluster {
481            name: name.clone(),
482            id,
483            bound_objects: BTreeSet::new(),
484            log_indexes: BTreeMap::new(),
485            replica_id_by_name_: BTreeMap::new(),
486            replicas_by_id_: BTreeMap::new(),
487            owner_id,
488            privileges: PrivilegeMap::from_mz_acl_items(privileges),
489            config: config.into(),
490        }
491    }
492}
493
494impl UpdateFrom<durable::Cluster> for Cluster {
495    fn update_from(
496        &mut self,
497        durable::Cluster {
498            id,
499            name,
500            owner_id,
501            privileges,
502            config,
503        }: durable::Cluster,
504    ) {
505        self.id = id;
506        self.name = name;
507        self.owner_id = owner_id;
508        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
509        self.config = config.into();
510    }
511}
512
513#[derive(Debug, Serialize, Clone, PartialEq)]
514pub struct ClusterReplica {
515    pub name: String,
516    pub cluster_id: ClusterId,
517    pub replica_id: ReplicaId,
518    pub config: ReplicaConfig,
519    pub owner_id: RoleId,
520}
521
522impl From<ClusterReplica> for durable::ClusterReplica {
523    fn from(replica: ClusterReplica) -> durable::ClusterReplica {
524        durable::ClusterReplica {
525            cluster_id: replica.cluster_id,
526            replica_id: replica.replica_id,
527            name: replica.name,
528            config: replica.config.into(),
529            owner_id: replica.owner_id,
530        }
531    }
532}
533
534#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
535pub struct ClusterReplicaProcessStatus {
536    pub status: ClusterStatus,
537    pub time: DateTime<Utc>,
538}
539
540#[derive(Debug, Serialize, Clone, PartialEq)]
541pub struct SourceReferences {
542    pub updated_at: u64,
543    pub references: Vec<SourceReference>,
544}
545
546#[derive(Debug, Serialize, Clone, PartialEq)]
547pub struct SourceReference {
548    pub name: String,
549    pub namespace: Option<String>,
550    pub columns: Vec<String>,
551}
552
553impl From<SourceReference> for durable::SourceReference {
554    fn from(source_reference: SourceReference) -> durable::SourceReference {
555        durable::SourceReference {
556            name: source_reference.name,
557            namespace: source_reference.namespace,
558            columns: source_reference.columns,
559        }
560    }
561}
562
563impl SourceReferences {
564    pub fn to_durable(self, source_id: CatalogItemId) -> durable::SourceReferences {
565        durable::SourceReferences {
566            source_id,
567            updated_at: self.updated_at,
568            references: self.references.into_iter().map(Into::into).collect(),
569        }
570    }
571}
572
573impl From<durable::SourceReference> for SourceReference {
574    fn from(source_reference: durable::SourceReference) -> SourceReference {
575        SourceReference {
576            name: source_reference.name,
577            namespace: source_reference.namespace,
578            columns: source_reference.columns,
579        }
580    }
581}
582
583impl From<durable::SourceReferences> for SourceReferences {
584    fn from(source_references: durable::SourceReferences) -> SourceReferences {
585        SourceReferences {
586            updated_at: source_references.updated_at,
587            references: source_references
588                .references
589                .into_iter()
590                .map(|source_reference| source_reference.into())
591                .collect(),
592        }
593    }
594}
595
596impl From<mz_sql::plan::SourceReference> for SourceReference {
597    fn from(source_reference: mz_sql::plan::SourceReference) -> SourceReference {
598        SourceReference {
599            name: source_reference.name,
600            namespace: source_reference.namespace,
601            columns: source_reference.columns,
602        }
603    }
604}
605
606impl From<mz_sql::plan::SourceReferences> for SourceReferences {
607    fn from(source_references: mz_sql::plan::SourceReferences) -> SourceReferences {
608        SourceReferences {
609            updated_at: source_references.updated_at,
610            references: source_references
611                .references
612                .into_iter()
613                .map(|source_reference| source_reference.into())
614                .collect(),
615        }
616    }
617}
618
619impl From<SourceReferences> for mz_sql::plan::SourceReferences {
620    fn from(source_references: SourceReferences) -> mz_sql::plan::SourceReferences {
621        mz_sql::plan::SourceReferences {
622            updated_at: source_references.updated_at,
623            references: source_references
624                .references
625                .into_iter()
626                .map(|source_reference| source_reference.into())
627                .collect(),
628        }
629    }
630}
631
632impl From<SourceReference> for mz_sql::plan::SourceReference {
633    fn from(source_reference: SourceReference) -> mz_sql::plan::SourceReference {
634        mz_sql::plan::SourceReference {
635            name: source_reference.name,
636            namespace: source_reference.namespace,
637            columns: source_reference.columns,
638        }
639    }
640}
641
642#[derive(Clone, Debug, Serialize)]
643pub struct CatalogEntry {
644    pub item: CatalogItem,
645    #[serde(skip)]
646    pub referenced_by: Vec<CatalogItemId>,
647    // TODO(database-issues#7922)––this should have an invariant tied to it that all
648    // dependents (i.e. entries in this field) have IDs greater than this
649    // entry's ID.
650    #[serde(skip)]
651    pub used_by: Vec<CatalogItemId>,
652    pub id: CatalogItemId,
653    pub oid: u32,
654    pub name: QualifiedItemName,
655    pub owner_id: RoleId,
656    pub privileges: PrivilegeMap,
657}
658
659/// A [`CatalogEntry`] that is associated with a specific "collection" of data.
660/// A single item in the catalog may be associated with multiple "collections".
661///
662/// Here "collection" generally means a pTVC, e.g. a Persist Shard, an Index, a
663/// currently running dataflow, etc.
664///
665/// Items in the Catalog have a stable name -> ID mapping, in other words for
666/// the entire lifetime of an object its [`CatalogItemId`] will _never_ change.
667/// Similarly, we need to maintain a stable mapping from [`GlobalId`] to pTVC.
668/// This presents a challenge when `ALTER`-ing an object, e.g. adding columns
669/// to a table. We can't just change the schema of the underlying Persist Shard
670/// because that would be rebinding the [`GlobalId`] of the pTVC. Instead we
671/// allocate a new [`GlobalId`] to refer to the new version of the table, and
672/// then the [`CatalogEntry`] tracks the [`GlobalId`] for each version.
673///
674/// TODO(ct): Add a note here if we end up using this for associating continual
675/// tasks with a single catalog item.
676#[derive(Clone, Debug)]
677pub struct CatalogCollectionEntry {
678    pub entry: CatalogEntry,
679    pub version: RelationVersionSelector,
680}
681
682impl CatalogCollectionEntry {
683    pub fn relation_desc(&self) -> Option<Cow<'_, RelationDesc>> {
684        self.item().relation_desc(self.version)
685    }
686}
687
688impl mz_sql::catalog::CatalogCollectionItem for CatalogCollectionEntry {
689    fn relation_desc(&self) -> Option<Cow<'_, RelationDesc>> {
690        self.item().relation_desc(self.version)
691    }
692
693    fn global_id(&self) -> GlobalId {
694        self.entry
695            .item()
696            .global_id_for_version(self.version)
697            .expect("catalog corruption, missing version!")
698    }
699}
700
701impl Deref for CatalogCollectionEntry {
702    type Target = CatalogEntry;
703
704    fn deref(&self) -> &CatalogEntry {
705        &self.entry
706    }
707}
708
709impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry {
710    fn name(&self) -> &QualifiedItemName {
711        self.entry.name()
712    }
713
714    fn id(&self) -> CatalogItemId {
715        self.entry.id()
716    }
717
718    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
719        Box::new(self.entry.global_ids())
720    }
721
722    fn oid(&self) -> u32 {
723        self.entry.oid()
724    }
725
726    fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
727        self.entry.func()
728    }
729
730    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
731        self.entry.source_desc()
732    }
733
734    fn connection(
735        &self,
736    ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
737    {
738        mz_sql::catalog::CatalogItem::connection(&self.entry)
739    }
740
741    fn create_sql(&self) -> &str {
742        self.entry.create_sql()
743    }
744
745    fn item_type(&self) -> SqlCatalogItemType {
746        self.entry.item_type()
747    }
748
749    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
750        self.entry.index_details()
751    }
752
753    fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
754        self.entry.writable_table_details()
755    }
756
757    fn replacement_target(&self) -> Option<CatalogItemId> {
758        self.entry.replacement_target()
759    }
760
761    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
762        self.entry.type_details()
763    }
764
765    fn references(&self) -> &ResolvedIds {
766        self.entry.references()
767    }
768
769    fn uses(&self) -> BTreeSet<CatalogItemId> {
770        self.entry.uses()
771    }
772
773    fn referenced_by(&self) -> &[CatalogItemId] {
774        self.entry.referenced_by()
775    }
776
777    fn used_by(&self) -> &[CatalogItemId] {
778        self.entry.used_by()
779    }
780
781    fn subsource_details(
782        &self,
783    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
784        self.entry.subsource_details()
785    }
786
787    fn source_export_details(
788        &self,
789    ) -> Option<(
790        CatalogItemId,
791        &UnresolvedItemName,
792        &SourceExportDetails,
793        &SourceExportDataConfig<ReferencedConnection>,
794    )> {
795        self.entry.source_export_details()
796    }
797
798    fn is_progress_source(&self) -> bool {
799        self.entry.is_progress_source()
800    }
801
802    fn progress_id(&self) -> Option<CatalogItemId> {
803        self.entry.progress_id()
804    }
805
806    fn owner_id(&self) -> RoleId {
807        *self.entry.owner_id()
808    }
809
810    fn privileges(&self) -> &PrivilegeMap {
811        self.entry.privileges()
812    }
813
814    fn cluster_id(&self) -> Option<ClusterId> {
815        self.entry.item().cluster_id()
816    }
817
818    fn at_version(
819        &self,
820        version: RelationVersionSelector,
821    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
822        Box::new(CatalogCollectionEntry {
823            entry: self.entry.clone(),
824            version,
825        })
826    }
827
828    fn latest_version(&self) -> Option<RelationVersion> {
829        self.entry.latest_version()
830    }
831}
832
833#[derive(Debug, Clone, Serialize)]
834pub enum CatalogItem {
835    Table(Table),
836    Source(Source),
837    Log(Log),
838    View(View),
839    MaterializedView(MaterializedView),
840    Sink(Sink),
841    Index(Index),
842    Type(Type),
843    Func(Func),
844    Secret(Secret),
845    Connection(Connection),
846    ContinualTask(ContinualTask),
847}
848
849impl From<CatalogEntry> for durable::Item {
850    fn from(entry: CatalogEntry) -> durable::Item {
851        let (create_sql, global_id, extra_versions) = entry.item.into_serialized();
852        durable::Item {
853            id: entry.id,
854            oid: entry.oid,
855            global_id,
856            schema_id: entry.name.qualifiers.schema_spec.into(),
857            name: entry.name.item,
858            create_sql,
859            owner_id: entry.owner_id,
860            privileges: entry.privileges.into_all_values().collect(),
861            extra_versions,
862        }
863    }
864}
865
866#[derive(Debug, Clone, Serialize)]
867pub struct Table {
868    /// Parse-able SQL that defines this table.
869    pub create_sql: Option<String>,
870    /// [`VersionedRelationDesc`] of this table, derived from the `create_sql`.
871    pub desc: VersionedRelationDesc,
872    /// Versions of this table, and the [`GlobalId`]s that refer to them.
873    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
874    pub collections: BTreeMap<RelationVersion, GlobalId>,
875    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
876    #[serde(skip)]
877    pub conn_id: Option<ConnectionId>,
878    /// Other catalog objects referenced by this table, e.g. custom types.
879    pub resolved_ids: ResolvedIds,
880    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
881    pub custom_logical_compaction_window: Option<CompactionWindow>,
882    /// Whether the table's logical compaction window is controlled by the ['metrics_retention']
883    /// session variable.
884    ///
885    /// ['metrics_retention']: mz_sql::session::vars::METRICS_RETENTION
886    pub is_retained_metrics_object: bool,
887    /// Where data for this table comes from, e.g. `INSERT` statements or an upstream source.
888    pub data_source: TableDataSource,
889}
890
891impl Table {
892    pub fn timeline(&self) -> Timeline {
893        match &self.data_source {
894            // The Coordinator controls insertions for writable tables
895            // (including system tables), so they are realtime.
896            TableDataSource::TableWrites { .. } => Timeline::EpochMilliseconds,
897            TableDataSource::DataSource { timeline, .. } => timeline.clone(),
898        }
899    }
900
901    /// Returns all of the [`GlobalId`]s that this [`Table`] can be referenced by.
902    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
903        self.collections.values().copied()
904    }
905
906    /// Returns the latest [`GlobalId`] for this [`Table`] which should be used for writes.
907    pub fn global_id_writes(&self) -> GlobalId {
908        *self
909            .collections
910            .last_key_value()
911            .expect("at least one version of a table")
912            .1
913    }
914
915    /// Returns all of the collections and their [`RelationDesc`]s associated with this [`Table`].
916    pub fn collection_descs(
917        &self,
918    ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
919        self.collections.iter().map(|(version, gid)| {
920            let desc = self
921                .desc
922                .at_version(RelationVersionSelector::Specific(*version));
923            (*gid, *version, desc)
924        })
925    }
926
927    /// Returns the [`RelationDesc`] for a specific [`GlobalId`].
928    pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
929        let (version, _gid) = self
930            .collections
931            .iter()
932            .find(|(_version, gid)| *gid == id)
933            .expect("GlobalId to exist");
934        self.desc
935            .at_version(RelationVersionSelector::Specific(*version))
936    }
937}
938
939#[derive(Clone, Debug, Serialize)]
940pub enum TableDataSource {
941    /// The table owns data created via INSERT/UPDATE/DELETE statements.
942    TableWrites {
943        #[serde(skip)]
944        defaults: Vec<Expr<Aug>>,
945    },
946
947    /// The table receives its data from the identified `DataSourceDesc`.
948    /// This table type does not support INSERT/UPDATE/DELETE statements.
949    DataSource {
950        desc: DataSourceDesc,
951        timeline: Timeline,
952    },
953}
954
955#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
956pub enum DataSourceDesc {
957    /// Receives data from an external system
958    Ingestion {
959        desc: SourceDesc<ReferencedConnection>,
960        cluster_id: ClusterId,
961    },
962    /// Receives data from an external system
963    OldSyntaxIngestion {
964        desc: SourceDesc<ReferencedConnection>,
965        cluster_id: ClusterId,
966        // If we're dealing with an old syntax ingestion the progress id will be some other collection
967        // and the ingestion itself will have the data from an external reference
968        progress_subsource: CatalogItemId,
969        data_config: SourceExportDataConfig<ReferencedConnection>,
970        details: SourceExportDetails,
971    },
972    /// This source receives its data from the identified ingestion,
973    /// specifically the output identified by `external_reference`.
974    /// N.B. that `external_reference` should not be used to identify
975    /// anything downstream of purification, as the purification process
976    /// encodes source-specific identifiers into the `details` struct.
977    /// The `external_reference` field is only used here for displaying
978    /// human-readable names in system tables.
979    IngestionExport {
980        ingestion_id: CatalogItemId,
981        external_reference: UnresolvedItemName,
982        details: SourceExportDetails,
983        data_config: SourceExportDataConfig<ReferencedConnection>,
984    },
985    /// Receives introspection data from an internal system
986    Introspection(IntrospectionType),
987    /// Receives data from the source's reclocking/remapping operations.
988    Progress,
989    /// Receives data from HTTP requests.
990    Webhook {
991        /// Optional components used to validation a webhook request.
992        validate_using: Option<WebhookValidation>,
993        /// Describes how we deserialize the body of a webhook request.
994        body_format: WebhookBodyFormat,
995        /// Describes whether or not to include headers and how to map them.
996        headers: WebhookHeaders,
997        /// The cluster which this source is associated with.
998        cluster_id: ClusterId,
999    },
1000    /// Exposes the contents of the catalog shard.
1001    Catalog,
1002}
1003
1004impl From<IntrospectionType> for DataSourceDesc {
1005    fn from(typ: IntrospectionType) -> Self {
1006        Self::Introspection(typ)
1007    }
1008}
1009
1010impl DataSourceDesc {
1011    /// The key and value formats of the data source.
1012    pub fn formats(&self) -> (Option<&str>, Option<&str>) {
1013        match &self {
1014            DataSourceDesc::Ingestion { .. } => (None, None),
1015            DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1016                match &data_config.encoding.as_ref() {
1017                    Some(encoding) => match &encoding.key {
1018                        Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1019                        None => (None, Some(encoding.value.type_())),
1020                    },
1021                    None => (None, None),
1022                }
1023            }
1024            DataSourceDesc::IngestionExport { data_config, .. } => match &data_config.encoding {
1025                Some(encoding) => match &encoding.key {
1026                    Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1027                    None => (None, Some(encoding.value.type_())),
1028                },
1029                None => (None, None),
1030            },
1031            DataSourceDesc::Introspection(_)
1032            | DataSourceDesc::Webhook { .. }
1033            | DataSourceDesc::Progress
1034            | DataSourceDesc::Catalog => (None, None),
1035        }
1036    }
1037
1038    /// Envelope of the data source.
1039    pub fn envelope(&self) -> Option<&str> {
1040        // Note how "none"/"append-only" is different from `None`. Source
1041        // sources don't have an envelope (internal logs, for example), while
1042        // other sources have an envelope that we call the "NONE"-envelope.
1043
1044        fn envelope_string(envelope: &SourceEnvelope) -> &str {
1045            match envelope {
1046                SourceEnvelope::None(_) => "none",
1047                SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
1048                    mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert",
1049                    mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
1050                        // NOTE(aljoscha): Should we somehow mark that this is
1051                        // using upsert internally? See note above about
1052                        // DEBEZIUM.
1053                        "debezium"
1054                    }
1055                    mz_storage_types::sources::envelope::UpsertStyle::ValueErrInline { .. } => {
1056                        "upsert-value-err-inline"
1057                    }
1058                },
1059                SourceEnvelope::CdcV2 => {
1060                    // TODO(aljoscha): Should we even report this? It's
1061                    // currently not exposed.
1062                    "materialize"
1063                }
1064            }
1065        }
1066
1067        match self {
1068            // NOTE(aljoscha): We could move the block for ingestions into
1069            // `SourceEnvelope` itself, but that one feels more like an internal
1070            // thing and adapter should own how we represent envelopes as a
1071            // string? It would not be hard to convince me otherwise, though.
1072            DataSourceDesc::Ingestion { .. } => None,
1073            DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1074                Some(envelope_string(&data_config.envelope))
1075            }
1076            DataSourceDesc::IngestionExport { data_config, .. } => {
1077                Some(envelope_string(&data_config.envelope))
1078            }
1079            DataSourceDesc::Introspection(_)
1080            | DataSourceDesc::Webhook { .. }
1081            | DataSourceDesc::Progress
1082            | DataSourceDesc::Catalog => None,
1083        }
1084    }
1085}
1086
1087#[derive(Debug, Clone, Serialize)]
1088pub struct Source {
1089    /// Parse-able SQL that defines this table.
1090    pub create_sql: Option<String>,
1091    /// [`GlobalId`] used to reference this source from outside the catalog.
1092    pub global_id: GlobalId,
1093    // TODO: Unskip: currently blocked on some inner BTreeMap<X, _> problems.
1094    #[serde(skip)]
1095    pub data_source: DataSourceDesc,
1096    /// [`RelationDesc`] of this source, derived from the `create_sql`.
1097    pub desc: RelationDesc,
1098    /// The timeline this source exists on.
1099    pub timeline: Timeline,
1100    /// Other catalog objects referenced by this table, e.g. custom types.
1101    pub resolved_ids: ResolvedIds,
1102    /// This value is ignored for subsources, i.e. for
1103    /// [`DataSourceDesc::IngestionExport`]. Instead, it uses the primary
1104    /// sources logical compaction window.
1105    pub custom_logical_compaction_window: Option<CompactionWindow>,
1106    /// Whether the source's logical compaction window is controlled by
1107    /// METRICS_RETENTION
1108    pub is_retained_metrics_object: bool,
1109}
1110
1111impl Source {
1112    /// Creates a new `Source`.
1113    ///
1114    /// # Panics
1115    /// - If an ingestion-based plan is not given a cluster_id.
1116    /// - If a non-ingestion-based source has a defined cluster config in its plan.
1117    /// - If a non-ingestion-based source is given a cluster_id.
1118    pub fn new(
1119        plan: CreateSourcePlan,
1120        global_id: GlobalId,
1121        resolved_ids: ResolvedIds,
1122        custom_logical_compaction_window: Option<CompactionWindow>,
1123        is_retained_metrics_object: bool,
1124    ) -> Source {
1125        Source {
1126            create_sql: Some(plan.source.create_sql),
1127            data_source: match plan.source.data_source {
1128                mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1129                    desc,
1130                    cluster_id: plan
1131                        .in_cluster
1132                        .expect("ingestion-based sources must be given a cluster ID"),
1133                },
1134                mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1135                    desc,
1136                    progress_subsource,
1137                    data_config,
1138                    details,
1139                } => DataSourceDesc::OldSyntaxIngestion {
1140                    desc,
1141                    cluster_id: plan
1142                        .in_cluster
1143                        .expect("ingestion-based sources must be given a cluster ID"),
1144                    progress_subsource,
1145                    data_config,
1146                    details,
1147                },
1148                mz_sql::plan::DataSourceDesc::Progress => {
1149                    assert!(
1150                        plan.in_cluster.is_none(),
1151                        "subsources must not have a host config or cluster_id defined"
1152                    );
1153                    DataSourceDesc::Progress
1154                }
1155                mz_sql::plan::DataSourceDesc::IngestionExport {
1156                    ingestion_id,
1157                    external_reference,
1158                    details,
1159                    data_config,
1160                } => {
1161                    assert!(
1162                        plan.in_cluster.is_none(),
1163                        "subsources must not have a host config or cluster_id defined"
1164                    );
1165                    DataSourceDesc::IngestionExport {
1166                        ingestion_id,
1167                        external_reference,
1168                        details,
1169                        data_config,
1170                    }
1171                }
1172                mz_sql::plan::DataSourceDesc::Webhook {
1173                    validate_using,
1174                    body_format,
1175                    headers,
1176                    cluster_id,
1177                } => {
1178                    mz_ore::soft_assert_or_log!(
1179                        cluster_id.is_none(),
1180                        "cluster_id set at Source level for Webhooks"
1181                    );
1182                    DataSourceDesc::Webhook {
1183                        validate_using,
1184                        body_format,
1185                        headers,
1186                        cluster_id: plan
1187                            .in_cluster
1188                            .expect("webhook sources must be given a cluster ID"),
1189                    }
1190                }
1191            },
1192            desc: plan.source.desc,
1193            global_id,
1194            timeline: plan.timeline,
1195            resolved_ids,
1196            custom_logical_compaction_window: plan
1197                .source
1198                .compaction_window
1199                .or(custom_logical_compaction_window),
1200            is_retained_metrics_object,
1201        }
1202    }
1203
1204    /// Type of the source.
1205    pub fn source_type(&self) -> &str {
1206        match &self.data_source {
1207            DataSourceDesc::Ingestion { desc, .. }
1208            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.name(),
1209            DataSourceDesc::Progress => "progress",
1210            DataSourceDesc::IngestionExport { .. } => "subsource",
1211            DataSourceDesc::Introspection(_) | DataSourceDesc::Catalog => "source",
1212            DataSourceDesc::Webhook { .. } => "webhook",
1213        }
1214    }
1215
1216    /// Connection ID of the source, if one exists.
1217    pub fn connection_id(&self) -> Option<CatalogItemId> {
1218        match &self.data_source {
1219            DataSourceDesc::Ingestion { desc, .. }
1220            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.connection_id(),
1221            DataSourceDesc::IngestionExport { .. }
1222            | DataSourceDesc::Introspection(_)
1223            | DataSourceDesc::Webhook { .. }
1224            | DataSourceDesc::Progress
1225            | DataSourceDesc::Catalog => None,
1226        }
1227    }
1228
1229    /// The single [`GlobalId`] that refers to this Source.
1230    pub fn global_id(&self) -> GlobalId {
1231        self.global_id
1232    }
1233
1234    /// The expensive resource that each source consumes is persist shards. To
1235    /// prevent abuse, we want to prevent users from creating sources that use an
1236    /// unbounded number of persist shards. But we also don't want to count
1237    /// persist shards that are mandated by the system (e.g., the progress
1238    /// shard) so that future versions of Materialize can introduce additional
1239    /// per-source shards (e.g., a per-source status shard) without impacting
1240    /// the limit calculation.
1241    pub fn user_controllable_persist_shard_count(&self) -> i64 {
1242        match &self.data_source {
1243            DataSourceDesc::Ingestion { .. } => 0,
1244            DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1245                match &desc.connection {
1246                    // These multi-output sources do not use their primary
1247                    // source's data shard, so we don't include it in accounting
1248                    // for users.
1249                    GenericSourceConnection::Postgres(_)
1250                    | GenericSourceConnection::MySql(_)
1251                    | GenericSourceConnection::SqlServer(_) => 0,
1252                    GenericSourceConnection::LoadGenerator(lg) => match lg.load_generator {
1253                        // Load generators that output data in their primary shard
1254                        LoadGenerator::Clock
1255                        | LoadGenerator::Counter { .. }
1256                        | LoadGenerator::Datums
1257                        | LoadGenerator::KeyValue(_) => 1,
1258                        LoadGenerator::Auction
1259                        | LoadGenerator::Marketing
1260                        | LoadGenerator::Tpch { .. } => 0,
1261                    },
1262                    GenericSourceConnection::Kafka(_) => 1,
1263                }
1264            }
1265            //  DataSourceDesc::IngestionExport represents a subsource, which
1266            //  use a data shard.
1267            DataSourceDesc::IngestionExport { .. } => 1,
1268            DataSourceDesc::Webhook { .. } => 1,
1269            // Introspection, catalog, and progress subsources are not under the user's control, so
1270            // shouldn't count toward their quota.
1271            DataSourceDesc::Introspection(_)
1272            | DataSourceDesc::Progress
1273            | DataSourceDesc::Catalog => 0,
1274        }
1275    }
1276}
1277
1278#[derive(Debug, Clone, Serialize)]
1279pub struct Log {
1280    /// The category of data this log stores.
1281    pub variant: LogVariant,
1282    /// [`GlobalId`] used to reference this log from outside the catalog.
1283    pub global_id: GlobalId,
1284}
1285
1286impl Log {
1287    /// The single [`GlobalId`] that refers to this Log.
1288    pub fn global_id(&self) -> GlobalId {
1289        self.global_id
1290    }
1291}
1292
1293#[derive(Debug, Clone, Serialize)]
1294pub struct Sink {
1295    /// Parse-able SQL that defines this sink.
1296    pub create_sql: String,
1297    /// [`GlobalId`] used to reference this sink from outside the catalog, e.g storage.
1298    pub global_id: GlobalId,
1299    /// Collection we read into this sink.
1300    pub from: GlobalId,
1301    /// Connection to the external service we're sinking into, e.g. Kafka.
1302    pub connection: StorageSinkConnection<ReferencedConnection>,
1303    /// Envelope we use to sink into the external system.
1304    ///
1305    /// TODO(guswynn): this probably should just be in the `connection`.
1306    pub envelope: SinkEnvelope,
1307    /// Emit an initial snapshot into the sink.
1308    pub with_snapshot: bool,
1309    /// Used to fence other writes into this sink as we evolve the upstream materialized view.
1310    pub version: u64,
1311    /// Other catalog objects this sink references.
1312    pub resolved_ids: ResolvedIds,
1313    /// Cluster this sink runs on.
1314    pub cluster_id: ClusterId,
1315    /// Commit interval for the sink.
1316    pub commit_interval: Option<Duration>,
1317}
1318
1319impl Sink {
1320    pub fn sink_type(&self) -> &str {
1321        self.connection.name()
1322    }
1323
1324    /// Envelope of the sink.
1325    pub fn envelope(&self) -> Option<&str> {
1326        match &self.envelope {
1327            SinkEnvelope::Debezium => Some("debezium"),
1328            SinkEnvelope::Upsert => Some("upsert"),
1329            SinkEnvelope::Append => Some("append"),
1330        }
1331    }
1332
1333    /// Output a combined format string of the sink. For legacy reasons
1334    /// if the key-format is none or the key & value formats are
1335    /// both the same (either avro or json), we return the value format name,
1336    /// otherwise we return a composite name.
1337    pub fn combined_format(&self) -> Option<Cow<'_, str>> {
1338        match &self.connection {
1339            StorageSinkConnection::Kafka(connection) => Some(connection.format.get_format_name()),
1340            StorageSinkConnection::Iceberg(_) => None,
1341        }
1342    }
1343
1344    /// Output distinct key_format and value_format of the sink.
1345    pub fn formats(&self) -> Option<(Option<&str>, &str)> {
1346        match &self.connection {
1347            StorageSinkConnection::Kafka(connection) => {
1348                let key_format = connection
1349                    .format
1350                    .key_format
1351                    .as_ref()
1352                    .map(|f| f.get_format_name());
1353                let value_format = connection.format.value_format.get_format_name();
1354                Some((key_format, value_format))
1355            }
1356            StorageSinkConnection::Iceberg(_) => None,
1357        }
1358    }
1359
1360    pub fn connection_id(&self) -> Option<CatalogItemId> {
1361        self.connection.connection_id()
1362    }
1363
1364    /// The single [`GlobalId`] that this Sink can be referenced by.
1365    pub fn global_id(&self) -> GlobalId {
1366        self.global_id
1367    }
1368}
1369
1370#[derive(Debug, Clone, Serialize)]
1371pub struct View {
1372    /// Parse-able SQL that defines this view.
1373    pub create_sql: String,
1374    /// [`GlobalId`] used to reference this view from outside the catalog, e.g. compute.
1375    pub global_id: GlobalId,
1376    /// Unoptimized high-level expression from parsing the `create_sql`.
1377    pub raw_expr: Arc<HirRelationExpr>,
1378    /// Optimized mid-level expression from (locally) optimizing the `raw_expr`.
1379    pub locally_optimized_expr: Arc<OptimizedMirRelationExpr>,
1380    /// Columns of this view.
1381    pub desc: RelationDesc,
1382    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
1383    pub conn_id: Option<ConnectionId>,
1384    /// Other catalog objects that are referenced by this view, determined at name resolution.
1385    pub resolved_ids: ResolvedIds,
1386    /// All of the catalog objects that are referenced by this view.
1387    pub dependencies: DependencyIds,
1388}
1389
1390impl View {
1391    /// The single [`GlobalId`] this [`View`] can be referenced by.
1392    pub fn global_id(&self) -> GlobalId {
1393        self.global_id
1394    }
1395}
1396
1397#[derive(Debug, Clone, Serialize)]
1398pub struct MaterializedView {
1399    /// Parse-able SQL that defines this materialized view.
1400    pub create_sql: String,
1401    /// Versions of this materialized view, and the [`GlobalId`]s that refer to them.
1402    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
1403    pub collections: BTreeMap<RelationVersion, GlobalId>,
1404    /// Raw high-level expression from planning, derived from the `create_sql`.
1405    pub raw_expr: Arc<HirRelationExpr>,
1406    /// Optimized mid-level expression, derived from the `raw_expr`.
1407    pub locally_optimized_expr: Arc<OptimizedMirRelationExpr>,
1408    /// [`VersionedRelationDesc`] of this materialized view, derived from the `create_sql`.
1409    pub desc: VersionedRelationDesc,
1410    /// Other catalog items that this materialized view references, determined at name resolution.
1411    pub resolved_ids: ResolvedIds,
1412    /// All of the catalog objects that are referenced by this view.
1413    pub dependencies: DependencyIds,
1414    /// ID of the materialized view this materialized view is intended to replace.
1415    pub replacement_target: Option<CatalogItemId>,
1416    /// Cluster that this materialized view runs on.
1417    pub cluster_id: ClusterId,
1418    /// If set, only install this materialized view's dataflow on the specified replica.
1419    pub target_replica: Option<ReplicaId>,
1420    /// Column indexes that we assert are not `NULL`.
1421    ///
1422    /// TODO(parkmycar): Switch this to use the `ColumnIdx` type.
1423    pub non_null_assertions: Vec<usize>,
1424    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
1425    pub custom_logical_compaction_window: Option<CompactionWindow>,
1426    /// Schedule to refresh this materialized view, e.g. set via `REFRESH EVERY` option.
1427    pub refresh_schedule: Option<RefreshSchedule>,
1428    /// The initial `as_of` of the storage collection associated with the materialized view.
1429    ///
1430    /// Note: This doesn't change upon restarts.
1431    /// (The dataflow's initial `as_of` can be different.)
1432    pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1433    // The catalog `dump` method uses serde to serialize catalog state, e.g., Testdrive catalog
1434    // consistency checks do two dumps and compare them. One of these states comes from the durable
1435    // catalog, but the following fields are not restored when the consistency check loads the
1436    // durable catalog, hence we need `#[serde(skip)]`.
1437    /// Optimized global MIR plan, set after global optimization.
1438    #[serde(skip)]
1439    pub optimized_plan: Option<Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
1440    /// Physical (LIR) plan, set after physical optimization.
1441    #[serde(skip)]
1442    pub physical_plan: Option<Arc<DataflowDescription<ComputePlan>>>,
1443    /// Dataflow metainfo (optimizer notices, etc.), set after optimization.
1444    #[serde(skip)]
1445    pub dataflow_metainfo: Option<DataflowMetainfo<Arc<OptimizerNotice>>>,
1446}
1447
1448impl MaterializedView {
1449    /// Returns all [`GlobalId`]s that this [`MaterializedView`] can be referenced by.
1450    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1451        self.collections.values().copied()
1452    }
1453
1454    /// The latest [`GlobalId`] for this [`MaterializedView`] which represents the writing
1455    /// version.
1456    pub fn global_id_writes(&self) -> GlobalId {
1457        *self
1458            .collections
1459            .last_key_value()
1460            .expect("at least one version of a materialized view")
1461            .1
1462    }
1463
1464    /// Returns all collections and their [`RelationDesc`]s associated with this [`MaterializedView`].
1465    pub fn collection_descs(
1466        &self,
1467    ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
1468        self.collections.iter().map(|(version, gid)| {
1469            let desc = self
1470                .desc
1471                .at_version(RelationVersionSelector::Specific(*version));
1472            (*gid, *version, desc)
1473        })
1474    }
1475
1476    /// Returns the [`RelationDesc`] for a specific [`GlobalId`].
1477    pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
1478        let (version, _gid) = self
1479            .collections
1480            .iter()
1481            .find(|(_version, gid)| *gid == id)
1482            .expect("GlobalId to exist");
1483        self.desc
1484            .at_version(RelationVersionSelector::Specific(*version))
1485    }
1486
1487    /// Apply the given replacement materialized view to this [`MaterializedView`].
1488    pub fn apply_replacement(&mut self, replacement: Self) {
1489        let target_id = replacement
1490            .replacement_target
1491            .expect("replacement has target");
1492
1493        fn parse(create_sql: &str) -> mz_sql::ast::CreateMaterializedViewStatement<Raw> {
1494            let res = mz_sql::parse::parse(create_sql).unwrap_or_else(|e| {
1495                panic!("invalid create_sql persisted in catalog: {e}\n{create_sql}");
1496            });
1497            if let Statement::CreateMaterializedView(cmvs) = res.into_element().ast {
1498                cmvs
1499            } else {
1500                panic!("invalid MV create_sql persisted in catalog\n{create_sql}");
1501            }
1502        }
1503
1504        let old_stmt = parse(&self.create_sql);
1505        let rpl_stmt = parse(&replacement.create_sql);
1506        let new_stmt = mz_sql::ast::CreateMaterializedViewStatement {
1507            if_exists: old_stmt.if_exists,
1508            name: old_stmt.name,
1509            columns: rpl_stmt.columns,
1510            replacement_for: None,
1511            in_cluster: rpl_stmt.in_cluster,
1512            in_cluster_replica: rpl_stmt.in_cluster_replica,
1513            query: rpl_stmt.query,
1514            as_of: rpl_stmt.as_of,
1515            with_options: rpl_stmt.with_options,
1516        };
1517        let create_sql = new_stmt.to_ast_string_stable();
1518
1519        let mut collections = std::mem::take(&mut self.collections);
1520        // Note: We can't use `self.desc.latest_version` here because a replacement doesn't
1521        // necessary evolve the relation schema, so that version might be lower than the actual
1522        // latest version.
1523        let latest_version = collections.keys().max().expect("at least one version");
1524        let new_version = latest_version.bump();
1525        collections.insert(new_version, replacement.global_id_writes());
1526
1527        let mut resolved_ids = replacement.resolved_ids;
1528        resolved_ids.remove_item(&target_id);
1529        let mut dependencies = replacement.dependencies;
1530        dependencies.0.remove(&target_id);
1531
1532        *self = Self {
1533            create_sql,
1534            collections,
1535            raw_expr: replacement.raw_expr,
1536            locally_optimized_expr: replacement.locally_optimized_expr,
1537            desc: replacement.desc,
1538            resolved_ids,
1539            dependencies,
1540            replacement_target: None,
1541            cluster_id: replacement.cluster_id,
1542            target_replica: replacement.target_replica,
1543            non_null_assertions: replacement.non_null_assertions,
1544            custom_logical_compaction_window: replacement.custom_logical_compaction_window,
1545            refresh_schedule: replacement.refresh_schedule,
1546            initial_as_of: replacement.initial_as_of,
1547            optimized_plan: replacement.optimized_plan,
1548            physical_plan: replacement.physical_plan,
1549            dataflow_metainfo: replacement.dataflow_metainfo,
1550        };
1551    }
1552}
1553
1554#[derive(Debug, Clone, Serialize)]
1555pub struct Index {
1556    /// Parse-able SQL that defines this table.
1557    pub create_sql: String,
1558    /// [`GlobalId`] used to reference this index from outside the catalog, e.g. compute.
1559    pub global_id: GlobalId,
1560    /// The [`GlobalId`] this Index is on.
1561    pub on: GlobalId,
1562    /// Keys of the index.
1563    pub keys: Arc<[MirScalarExpr]>,
1564    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
1565    pub conn_id: Option<ConnectionId>,
1566    /// Other catalog objects referenced by this index, e.g. the object we're indexing.
1567    pub resolved_ids: ResolvedIds,
1568    /// Cluster this index is installed on.
1569    pub cluster_id: ClusterId,
1570    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
1571    pub custom_logical_compaction_window: Option<CompactionWindow>,
1572    /// Whether the table's logical compaction window is controlled by the ['metrics_retention']
1573    /// session variable.
1574    ///
1575    /// ['metrics_retention']: mz_sql::session::vars::METRICS_RETENTION
1576    pub is_retained_metrics_object: bool,
1577    // The catalog `dump` method uses serde to serialize catalog state, e.g., Testdrive catalog
1578    // consistency checks do two dumps and compare them. One of these states comes from the durable
1579    // catalog, but the following fields are not restored when the consistency check loads the
1580    // durable catalog, hence we need `#[serde(skip)]`.
1581    /// Optimized global MIR plan, set after global optimization.
1582    #[serde(skip)]
1583    pub optimized_plan: Option<Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
1584    /// Physical (LIR) plan, set after physical optimization.
1585    #[serde(skip)]
1586    pub physical_plan: Option<Arc<DataflowDescription<ComputePlan>>>,
1587    /// Dataflow metainfo (optimizer notices, etc.), set after optimization.
1588    #[serde(skip)]
1589    pub dataflow_metainfo: Option<DataflowMetainfo<Arc<OptimizerNotice>>>,
1590}
1591
1592impl Index {
1593    /// The [`GlobalId`] that refers to this Index.
1594    pub fn global_id(&self) -> GlobalId {
1595        self.global_id
1596    }
1597}
1598
1599#[derive(Debug, Clone, Serialize)]
1600pub struct Type {
1601    /// Parse-able SQL that defines this type.
1602    pub create_sql: Option<String>,
1603    /// [`GlobalId`] used to reference this type from outside the catalog.
1604    pub global_id: GlobalId,
1605    #[serde(skip)]
1606    pub details: CatalogTypeDetails<IdReference>,
1607    /// Other catalog objects referenced by this type.
1608    pub resolved_ids: ResolvedIds,
1609}
1610
1611#[derive(Debug, Clone, Serialize)]
1612pub struct Func {
1613    /// Static definition of the function.
1614    #[serde(skip)]
1615    pub inner: &'static mz_sql::func::Func,
1616    /// [`GlobalId`] used to reference this function from outside the catalog.
1617    pub global_id: GlobalId,
1618}
1619
1620#[derive(Debug, Clone, Serialize)]
1621pub struct Secret {
1622    /// Parse-able SQL that defines this secret.
1623    pub create_sql: String,
1624    /// [`GlobalId`] used to reference this secret from outside the catalog.
1625    pub global_id: GlobalId,
1626}
1627
1628#[derive(Debug, Clone, Serialize)]
1629pub struct Connection {
1630    /// Parse-able SQL that defines this connection.
1631    pub create_sql: String,
1632    /// [`GlobalId`] used to reference this connection from the storage layer.
1633    pub global_id: GlobalId,
1634    /// The kind of connection.
1635    pub details: ConnectionDetails,
1636    /// Other objects this connection depends on.
1637    pub resolved_ids: ResolvedIds,
1638}
1639
1640impl Connection {
1641    /// The single [`GlobalId`] used to reference this connection.
1642    pub fn global_id(&self) -> GlobalId {
1643        self.global_id
1644    }
1645}
1646
1647#[derive(Debug, Clone, Serialize)]
1648pub struct ContinualTask {
1649    /// Parse-able SQL that defines this continual task.
1650    pub create_sql: String,
1651    /// [`GlobalId`] used to reference this continual task from outside the catalog.
1652    pub global_id: GlobalId,
1653    /// [`GlobalId`] of the collection that we read into this continual task.
1654    pub input_id: GlobalId,
1655    pub with_snapshot: bool,
1656    /// ContinualTasks are self-referential. We make this work by using a
1657    /// placeholder `LocalId` for the CT itself through name resolution and
1658    /// planning. Then we fill in the real `GlobalId` before constructing this
1659    /// catalog item.
1660    pub raw_expr: Arc<HirRelationExpr>,
1661    /// Columns for this continual task.
1662    pub desc: RelationDesc,
1663    /// Other catalog items that this continual task references, determined at name resolution.
1664    pub resolved_ids: ResolvedIds,
1665    /// All of the catalog objects that are referenced by this continual task.
1666    pub dependencies: DependencyIds,
1667    /// Cluster that this continual task runs on.
1668    pub cluster_id: ClusterId,
1669    /// See the comment on [MaterializedView::initial_as_of].
1670    pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1671    // The catalog `dump` method uses serde to serialize catalog state, e.g., Testdrive catalog
1672    // consistency checks do two dumps and compare them. One of these states comes from the durable
1673    // catalog, but the following fields are not restored when the consistency check loads the
1674    // durable catalog, hence we need `#[serde(skip)]`.
1675    /// Optimized global MIR plan, set after global optimization.
1676    #[serde(skip)]
1677    pub optimized_plan: Option<Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
1678    /// Physical (LIR) plan, set after physical optimization.
1679    #[serde(skip)]
1680    pub physical_plan: Option<Arc<DataflowDescription<ComputePlan>>>,
1681    /// Dataflow metainfo (optimizer notices, etc.), set after optimization.
1682    #[serde(skip)]
1683    pub dataflow_metainfo: Option<DataflowMetainfo<Arc<OptimizerNotice>>>,
1684}
1685
1686impl ContinualTask {
1687    /// The single [`GlobalId`] used to reference this continual task.
1688    pub fn global_id(&self) -> GlobalId {
1689        self.global_id
1690    }
1691}
1692
1693#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
1694pub struct NetworkPolicy {
1695    pub name: String,
1696    pub id: NetworkPolicyId,
1697    pub oid: u32,
1698    pub rules: Vec<NetworkPolicyRule>,
1699    pub owner_id: RoleId,
1700    pub privileges: PrivilegeMap,
1701}
1702
1703impl From<NetworkPolicy> for durable::NetworkPolicy {
1704    fn from(policy: NetworkPolicy) -> durable::NetworkPolicy {
1705        durable::NetworkPolicy {
1706            id: policy.id,
1707            oid: policy.oid,
1708            name: policy.name,
1709            rules: policy.rules,
1710            owner_id: policy.owner_id,
1711            privileges: policy.privileges.into_all_values().collect(),
1712        }
1713    }
1714}
1715
1716impl From<durable::NetworkPolicy> for NetworkPolicy {
1717    fn from(
1718        durable::NetworkPolicy {
1719            id,
1720            oid,
1721            name,
1722            rules,
1723            owner_id,
1724            privileges,
1725        }: durable::NetworkPolicy,
1726    ) -> Self {
1727        NetworkPolicy {
1728            id,
1729            oid,
1730            name,
1731            rules,
1732            owner_id,
1733            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1734        }
1735    }
1736}
1737
1738impl UpdateFrom<durable::NetworkPolicy> for NetworkPolicy {
1739    fn update_from(
1740        &mut self,
1741        durable::NetworkPolicy {
1742            id,
1743            oid,
1744            name,
1745            rules,
1746            owner_id,
1747            privileges,
1748        }: durable::NetworkPolicy,
1749    ) {
1750        self.id = id;
1751        self.oid = oid;
1752        self.name = name;
1753        self.rules = rules;
1754        self.owner_id = owner_id;
1755        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1756    }
1757}
1758
1759impl CatalogItem {
1760    /// Returns a string indicating the type of this catalog entry.
1761    pub fn typ(&self) -> mz_sql::catalog::CatalogItemType {
1762        match self {
1763            CatalogItem::Table(_) => CatalogItemType::Table,
1764            CatalogItem::Source(_) => CatalogItemType::Source,
1765            CatalogItem::Log(_) => CatalogItemType::Source,
1766            CatalogItem::Sink(_) => CatalogItemType::Sink,
1767            CatalogItem::View(_) => CatalogItemType::View,
1768            CatalogItem::MaterializedView(_) => CatalogItemType::MaterializedView,
1769            CatalogItem::Index(_) => CatalogItemType::Index,
1770            CatalogItem::Type(_) => CatalogItemType::Type,
1771            CatalogItem::Func(_) => CatalogItemType::Func,
1772            CatalogItem::Secret(_) => CatalogItemType::Secret,
1773            CatalogItem::Connection(_) => CatalogItemType::Connection,
1774            CatalogItem::ContinualTask(_) => CatalogItemType::ContinualTask,
1775        }
1776    }
1777
1778    /// Returns the [`GlobalId`]s that reference this item, if any.
1779    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1780        let gid = match self {
1781            CatalogItem::Source(source) => source.global_id,
1782            CatalogItem::Log(log) => log.global_id,
1783            CatalogItem::Sink(sink) => sink.global_id,
1784            CatalogItem::View(view) => view.global_id,
1785            CatalogItem::MaterializedView(mv) => {
1786                return itertools::Either::Left(mv.collections.values().copied());
1787            }
1788            CatalogItem::ContinualTask(ct) => ct.global_id,
1789            CatalogItem::Index(index) => index.global_id,
1790            CatalogItem::Func(func) => func.global_id,
1791            CatalogItem::Type(ty) => ty.global_id,
1792            CatalogItem::Secret(secret) => secret.global_id,
1793            CatalogItem::Connection(conn) => conn.global_id,
1794            CatalogItem::Table(table) => {
1795                return itertools::Either::Left(table.collections.values().copied());
1796            }
1797        };
1798        itertools::Either::Right(std::iter::once(gid))
1799    }
1800
1801    /// Returns the most up-to-date [`GlobalId`] for this item.
1802    ///
1803    /// Note: The only type of object that can have multiple [`GlobalId`]s are tables.
1804    pub fn latest_global_id(&self) -> GlobalId {
1805        match self {
1806            CatalogItem::Source(source) => source.global_id,
1807            CatalogItem::Log(log) => log.global_id,
1808            CatalogItem::Sink(sink) => sink.global_id,
1809            CatalogItem::View(view) => view.global_id,
1810            CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
1811            CatalogItem::ContinualTask(ct) => ct.global_id,
1812            CatalogItem::Index(index) => index.global_id,
1813            CatalogItem::Func(func) => func.global_id,
1814            CatalogItem::Type(ty) => ty.global_id,
1815            CatalogItem::Secret(secret) => secret.global_id,
1816            CatalogItem::Connection(conn) => conn.global_id,
1817            CatalogItem::Table(table) => table.global_id_writes(),
1818        }
1819    }
1820
1821    /// Returns the optimized global MIR plan, if this item has one.
1822    pub fn optimized_plan(&self) -> Option<&Arc<DataflowDescription<OptimizedMirRelationExpr>>> {
1823        match self {
1824            CatalogItem::Index(idx) => idx.optimized_plan.as_ref(),
1825            CatalogItem::MaterializedView(mv) => mv.optimized_plan.as_ref(),
1826            CatalogItem::ContinualTask(ct) => ct.optimized_plan.as_ref(),
1827            _ => None,
1828        }
1829    }
1830
1831    /// Returns the physical (LIR) plan, if this item has one.
1832    pub fn physical_plan(&self) -> Option<&Arc<DataflowDescription<ComputePlan>>> {
1833        match self {
1834            CatalogItem::Index(idx) => idx.physical_plan.as_ref(),
1835            CatalogItem::MaterializedView(mv) => mv.physical_plan.as_ref(),
1836            CatalogItem::ContinualTask(ct) => ct.physical_plan.as_ref(),
1837            _ => None,
1838        }
1839    }
1840
1841    /// Returns the dataflow metainfo, if this item has one.
1842    pub fn dataflow_metainfo(&self) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
1843        match self {
1844            CatalogItem::Index(idx) => idx.dataflow_metainfo.as_ref(),
1845            CatalogItem::MaterializedView(mv) => mv.dataflow_metainfo.as_ref(),
1846            CatalogItem::ContinualTask(ct) => ct.dataflow_metainfo.as_ref(),
1847            _ => None,
1848        }
1849    }
1850
1851    /// Returns mutable references to the plan fields (`optimized_plan`,
1852    /// `physical_plan`, `dataflow_metainfo`) on plan-bearing items
1853    /// (`Index`, `MaterializedView`, `ContinualTask`), or `None` for
1854    /// other item kinds.
1855    pub fn plan_fields_mut(
1856        &mut self,
1857    ) -> Option<(
1858        &mut Option<Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
1859        &mut Option<Arc<DataflowDescription<ComputePlan>>>,
1860        &mut Option<DataflowMetainfo<Arc<OptimizerNotice>>>,
1861    )> {
1862        match self {
1863            CatalogItem::Index(idx) => Some((
1864                &mut idx.optimized_plan,
1865                &mut idx.physical_plan,
1866                &mut idx.dataflow_metainfo,
1867            )),
1868            CatalogItem::MaterializedView(mv) => Some((
1869                &mut mv.optimized_plan,
1870                &mut mv.physical_plan,
1871                &mut mv.dataflow_metainfo,
1872            )),
1873            CatalogItem::ContinualTask(ct) => Some((
1874                &mut ct.optimized_plan,
1875                &mut ct.physical_plan,
1876                &mut ct.dataflow_metainfo,
1877            )),
1878            _ => None,
1879        }
1880    }
1881
1882    /// Whether this item represents a storage collection.
1883    pub fn is_storage_collection(&self) -> bool {
1884        match self {
1885            CatalogItem::Table(_)
1886            | CatalogItem::Source(_)
1887            | CatalogItem::MaterializedView(_)
1888            | CatalogItem::Sink(_)
1889            | CatalogItem::ContinualTask(_) => true,
1890            CatalogItem::Log(_)
1891            | CatalogItem::View(_)
1892            | CatalogItem::Index(_)
1893            | CatalogItem::Type(_)
1894            | CatalogItem::Func(_)
1895            | CatalogItem::Secret(_)
1896            | CatalogItem::Connection(_) => false,
1897        }
1898    }
1899
1900    /// Returns the [`RelationDesc`] for items that yield rows, at the requested
1901    /// version.
1902    ///
1903    /// Some item types honor `version` so callers can ask for the schema that
1904    /// matches a specific [`GlobalId`] or historical definition. Other relation
1905    /// types ignore `version` because they have a single shape. Non-relational
1906    /// items ( for example functions, indexes, sinks, secrets, and connections)
1907    /// return `None`.
1908    pub fn relation_desc(&self, version: RelationVersionSelector) -> Option<Cow<'_, RelationDesc>> {
1909        match &self {
1910            CatalogItem::Source(src) => Some(Cow::Borrowed(&src.desc)),
1911            CatalogItem::Log(log) => Some(Cow::Owned(log.variant.desc())),
1912            CatalogItem::Table(tbl) => Some(Cow::Owned(tbl.desc.at_version(version))),
1913            CatalogItem::View(view) => Some(Cow::Borrowed(&view.desc)),
1914            CatalogItem::MaterializedView(mview) => {
1915                Some(Cow::Owned(mview.desc.at_version(version)))
1916            }
1917            CatalogItem::ContinualTask(ct) => Some(Cow::Borrowed(&ct.desc)),
1918            CatalogItem::Func(_)
1919            | CatalogItem::Index(_)
1920            | CatalogItem::Sink(_)
1921            | CatalogItem::Secret(_)
1922            | CatalogItem::Connection(_)
1923            | CatalogItem::Type(_) => None,
1924        }
1925    }
1926
1927    pub fn func(
1928        &self,
1929        entry: &CatalogEntry,
1930    ) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
1931        match &self {
1932            CatalogItem::Func(func) => Ok(func.inner),
1933            _ => Err(SqlCatalogError::UnexpectedType {
1934                name: entry.name().item.to_string(),
1935                actual_type: entry.item_type(),
1936                expected_type: CatalogItemType::Func,
1937            }),
1938        }
1939    }
1940
1941    pub fn source_desc(
1942        &self,
1943        entry: &CatalogEntry,
1944    ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
1945        match &self {
1946            CatalogItem::Source(source) => match &source.data_source {
1947                DataSourceDesc::Ingestion { desc, .. }
1948                | DataSourceDesc::OldSyntaxIngestion { desc, .. } => Ok(Some(desc)),
1949                DataSourceDesc::IngestionExport { .. }
1950                | DataSourceDesc::Introspection(_)
1951                | DataSourceDesc::Webhook { .. }
1952                | DataSourceDesc::Progress
1953                | DataSourceDesc::Catalog => Ok(None),
1954            },
1955            _ => Err(SqlCatalogError::UnexpectedType {
1956                name: entry.name().item.to_string(),
1957                actual_type: entry.item_type(),
1958                expected_type: CatalogItemType::Source,
1959            }),
1960        }
1961    }
1962
1963    /// Reports whether this catalog entry is a progress source.
1964    pub fn is_progress_source(&self) -> bool {
1965        matches!(
1966            self,
1967            CatalogItem::Source(Source {
1968                data_source: DataSourceDesc::Progress,
1969                ..
1970            })
1971        )
1972    }
1973
1974    /// Collects the identifiers of the objects that were encountered when resolving names in the
1975    /// item's DDL statement.
1976    pub fn references(&self) -> &ResolvedIds {
1977        static EMPTY: LazyLock<ResolvedIds> = LazyLock::new(ResolvedIds::empty);
1978        match self {
1979            CatalogItem::Func(_) => &*EMPTY,
1980            CatalogItem::Index(idx) => &idx.resolved_ids,
1981            CatalogItem::Sink(sink) => &sink.resolved_ids,
1982            CatalogItem::Source(source) => &source.resolved_ids,
1983            CatalogItem::Log(_) => &*EMPTY,
1984            CatalogItem::Table(table) => &table.resolved_ids,
1985            CatalogItem::Type(typ) => &typ.resolved_ids,
1986            CatalogItem::View(view) => &view.resolved_ids,
1987            CatalogItem::MaterializedView(mview) => &mview.resolved_ids,
1988            CatalogItem::Secret(_) => &*EMPTY,
1989            CatalogItem::Connection(connection) => &connection.resolved_ids,
1990            CatalogItem::ContinualTask(ct) => &ct.resolved_ids,
1991        }
1992    }
1993
1994    /// Collects the identifiers of the objects used by this [`CatalogItem`].
1995    ///
1996    /// Like [`CatalogItem::references()`] but also includes objects that are not directly
1997    /// referenced. For example this will include any catalog objects used to implement functions
1998    /// and casts in the item.
1999    pub fn uses(&self) -> BTreeSet<CatalogItemId> {
2000        let mut uses: BTreeSet<_> = self.references().items().copied().collect();
2001        match self {
2002            // TODO(jkosh44) This isn't really correct for functions. They may use other objects in
2003            // their implementation. However, currently there's no way to get that information.
2004            CatalogItem::Func(_) => {}
2005            CatalogItem::Index(_) => {}
2006            CatalogItem::Sink(_) => {}
2007            CatalogItem::Source(_) => {}
2008            CatalogItem::Log(_) => {}
2009            CatalogItem::Table(_) => {}
2010            CatalogItem::Type(_) => {}
2011            CatalogItem::View(view) => uses.extend(view.dependencies.0.iter().copied()),
2012            CatalogItem::MaterializedView(mview) => {
2013                uses.extend(mview.dependencies.0.iter().copied())
2014            }
2015            CatalogItem::ContinualTask(ct) => uses.extend(ct.dependencies.0.iter().copied()),
2016            CatalogItem::Secret(_) => {}
2017            CatalogItem::Connection(_) => {}
2018        }
2019        uses
2020    }
2021
2022    /// Returns the connection ID that this item belongs to, if this item is
2023    /// temporary.
2024    pub fn conn_id(&self) -> Option<&ConnectionId> {
2025        match self {
2026            CatalogItem::View(view) => view.conn_id.as_ref(),
2027            CatalogItem::Index(index) => index.conn_id.as_ref(),
2028            CatalogItem::Table(table) => table.conn_id.as_ref(),
2029            CatalogItem::Log(_)
2030            | CatalogItem::Source(_)
2031            | CatalogItem::Sink(_)
2032            | CatalogItem::MaterializedView(_)
2033            | CatalogItem::Secret(_)
2034            | CatalogItem::Type(_)
2035            | CatalogItem::Func(_)
2036            | CatalogItem::Connection(_)
2037            | CatalogItem::ContinualTask(_) => None,
2038        }
2039    }
2040
2041    /// Sets the connection ID that this item belongs to, which makes it a
2042    /// temporary item.
2043    pub fn set_conn_id(&mut self, conn_id: Option<ConnectionId>) {
2044        match self {
2045            CatalogItem::View(view) => view.conn_id = conn_id,
2046            CatalogItem::Index(index) => index.conn_id = conn_id,
2047            CatalogItem::Table(table) => table.conn_id = conn_id,
2048            CatalogItem::Log(_)
2049            | CatalogItem::Source(_)
2050            | CatalogItem::Sink(_)
2051            | CatalogItem::MaterializedView(_)
2052            | CatalogItem::Secret(_)
2053            | CatalogItem::Type(_)
2054            | CatalogItem::Func(_)
2055            | CatalogItem::Connection(_)
2056            | CatalogItem::ContinualTask(_) => (),
2057        }
2058    }
2059
2060    /// Indicates whether this item is temporary or not.
2061    pub fn is_temporary(&self) -> bool {
2062        self.conn_id().is_some()
2063    }
2064
2065    pub fn rename_schema_refs(
2066        &self,
2067        database_name: &str,
2068        cur_schema_name: &str,
2069        new_schema_name: &str,
2070    ) -> Result<CatalogItem, (String, String)> {
2071        let do_rewrite = |create_sql: String| -> Result<String, (String, String)> {
2072            let mut create_stmt = mz_sql::parse::parse(&create_sql)
2073                .expect("invalid create sql persisted to catalog")
2074                .into_element()
2075                .ast;
2076
2077            // Rename all references to cur_schema_name.
2078            mz_sql::ast::transform::create_stmt_rename_schema_refs(
2079                &mut create_stmt,
2080                database_name,
2081                cur_schema_name,
2082                new_schema_name,
2083            )?;
2084
2085            Ok(create_stmt.to_ast_string_stable())
2086        };
2087
2088        match self {
2089            CatalogItem::Table(i) => {
2090                let mut i = i.clone();
2091                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2092                Ok(CatalogItem::Table(i))
2093            }
2094            CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
2095            CatalogItem::Source(i) => {
2096                let mut i = i.clone();
2097                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2098                Ok(CatalogItem::Source(i))
2099            }
2100            CatalogItem::Sink(i) => {
2101                let mut i = i.clone();
2102                i.create_sql = do_rewrite(i.create_sql)?;
2103                Ok(CatalogItem::Sink(i))
2104            }
2105            CatalogItem::View(i) => {
2106                let mut i = i.clone();
2107                i.create_sql = do_rewrite(i.create_sql)?;
2108                Ok(CatalogItem::View(i))
2109            }
2110            CatalogItem::MaterializedView(i) => {
2111                let mut i = i.clone();
2112                i.create_sql = do_rewrite(i.create_sql)?;
2113                Ok(CatalogItem::MaterializedView(i))
2114            }
2115            CatalogItem::Index(i) => {
2116                let mut i = i.clone();
2117                i.create_sql = do_rewrite(i.create_sql)?;
2118                Ok(CatalogItem::Index(i))
2119            }
2120            CatalogItem::Secret(i) => {
2121                let mut i = i.clone();
2122                i.create_sql = do_rewrite(i.create_sql)?;
2123                Ok(CatalogItem::Secret(i))
2124            }
2125            CatalogItem::Connection(i) => {
2126                let mut i = i.clone();
2127                i.create_sql = do_rewrite(i.create_sql)?;
2128                Ok(CatalogItem::Connection(i))
2129            }
2130            CatalogItem::Type(i) => {
2131                let mut i = i.clone();
2132                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2133                Ok(CatalogItem::Type(i))
2134            }
2135            CatalogItem::Func(i) => Ok(CatalogItem::Func(i.clone())),
2136            CatalogItem::ContinualTask(i) => {
2137                let mut i = i.clone();
2138                i.create_sql = do_rewrite(i.create_sql)?;
2139                Ok(CatalogItem::ContinualTask(i))
2140            }
2141        }
2142    }
2143
2144    /// Returns a clone of `self` with all instances of `from` renamed to `to`
2145    /// (with the option of including the item's own name) or errors if request
2146    /// is ambiguous.
2147    pub fn rename_item_refs(
2148        &self,
2149        from: FullItemName,
2150        to_item_name: String,
2151        rename_self: bool,
2152    ) -> Result<CatalogItem, String> {
2153        let do_rewrite = |create_sql: String| -> Result<String, String> {
2154            let mut create_stmt = mz_sql::parse::parse(&create_sql)
2155                .expect("invalid create sql persisted to catalog")
2156                .into_element()
2157                .ast;
2158            if rename_self {
2159                mz_sql::ast::transform::create_stmt_rename(&mut create_stmt, to_item_name.clone());
2160            }
2161            // Determination of what constitutes an ambiguous request is done here.
2162            mz_sql::ast::transform::create_stmt_rename_refs(&mut create_stmt, from, to_item_name)?;
2163            Ok(create_stmt.to_ast_string_stable())
2164        };
2165
2166        match self {
2167            CatalogItem::Table(i) => {
2168                let mut i = i.clone();
2169                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2170                Ok(CatalogItem::Table(i))
2171            }
2172            CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
2173            CatalogItem::Source(i) => {
2174                let mut i = i.clone();
2175                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2176                Ok(CatalogItem::Source(i))
2177            }
2178            CatalogItem::Sink(i) => {
2179                let mut i = i.clone();
2180                i.create_sql = do_rewrite(i.create_sql)?;
2181                Ok(CatalogItem::Sink(i))
2182            }
2183            CatalogItem::View(i) => {
2184                let mut i = i.clone();
2185                i.create_sql = do_rewrite(i.create_sql)?;
2186                Ok(CatalogItem::View(i))
2187            }
2188            CatalogItem::MaterializedView(i) => {
2189                let mut i = i.clone();
2190                i.create_sql = do_rewrite(i.create_sql)?;
2191                Ok(CatalogItem::MaterializedView(i))
2192            }
2193            CatalogItem::Index(i) => {
2194                let mut i = i.clone();
2195                i.create_sql = do_rewrite(i.create_sql)?;
2196                Ok(CatalogItem::Index(i))
2197            }
2198            CatalogItem::Secret(i) => {
2199                let mut i = i.clone();
2200                i.create_sql = do_rewrite(i.create_sql)?;
2201                Ok(CatalogItem::Secret(i))
2202            }
2203            CatalogItem::Func(_) | CatalogItem::Type(_) => {
2204                unreachable!("{}s cannot be renamed", self.typ())
2205            }
2206            CatalogItem::Connection(i) => {
2207                let mut i = i.clone();
2208                i.create_sql = do_rewrite(i.create_sql)?;
2209                Ok(CatalogItem::Connection(i))
2210            }
2211            CatalogItem::ContinualTask(i) => {
2212                let mut i = i.clone();
2213                i.create_sql = do_rewrite(i.create_sql)?;
2214                Ok(CatalogItem::ContinualTask(i))
2215            }
2216        }
2217    }
2218
2219    /// Returns a clone of `self` with all instances of `old_id` replaced with `new_id`.
2220    pub fn replace_item_refs(&self, old_id: CatalogItemId, new_id: CatalogItemId) -> CatalogItem {
2221        let do_rewrite = |create_sql: String| -> String {
2222            let mut create_stmt = mz_sql::parse::parse(&create_sql)
2223                .expect("invalid create sql persisted to catalog")
2224                .into_element()
2225                .ast;
2226            mz_sql::ast::transform::create_stmt_replace_ids(
2227                &mut create_stmt,
2228                &[(old_id, new_id)].into(),
2229            );
2230            create_stmt.to_ast_string_stable()
2231        };
2232
2233        match self {
2234            CatalogItem::Table(i) => {
2235                let mut i = i.clone();
2236                i.create_sql = i.create_sql.map(do_rewrite);
2237                CatalogItem::Table(i)
2238            }
2239            CatalogItem::Log(i) => CatalogItem::Log(i.clone()),
2240            CatalogItem::Source(i) => {
2241                let mut i = i.clone();
2242                i.create_sql = i.create_sql.map(do_rewrite);
2243                CatalogItem::Source(i)
2244            }
2245            CatalogItem::Sink(i) => {
2246                let mut i = i.clone();
2247                i.create_sql = do_rewrite(i.create_sql);
2248                CatalogItem::Sink(i)
2249            }
2250            CatalogItem::View(i) => {
2251                let mut i = i.clone();
2252                i.create_sql = do_rewrite(i.create_sql);
2253                CatalogItem::View(i)
2254            }
2255            CatalogItem::MaterializedView(i) => {
2256                let mut i = i.clone();
2257                i.create_sql = do_rewrite(i.create_sql);
2258                CatalogItem::MaterializedView(i)
2259            }
2260            CatalogItem::Index(i) => {
2261                let mut i = i.clone();
2262                i.create_sql = do_rewrite(i.create_sql);
2263                CatalogItem::Index(i)
2264            }
2265            CatalogItem::Secret(i) => {
2266                let mut i = i.clone();
2267                i.create_sql = do_rewrite(i.create_sql);
2268                CatalogItem::Secret(i)
2269            }
2270            CatalogItem::Func(_) | CatalogItem::Type(_) => {
2271                unreachable!("references of {}s cannot be replaced", self.typ())
2272            }
2273            CatalogItem::Connection(i) => {
2274                let mut i = i.clone();
2275                i.create_sql = do_rewrite(i.create_sql);
2276                CatalogItem::Connection(i)
2277            }
2278            CatalogItem::ContinualTask(i) => {
2279                let mut i = i.clone();
2280                i.create_sql = do_rewrite(i.create_sql);
2281                CatalogItem::ContinualTask(i)
2282            }
2283        }
2284    }
2285    /// Updates the retain history for an item. Returns the previous retain history value. Returns
2286    /// an error if this item does not support retain history.
2287    pub fn update_retain_history(
2288        &mut self,
2289        value: Option<Value>,
2290        window: CompactionWindow,
2291    ) -> Result<Option<WithOptionValue<Raw>>, ()> {
2292        let update = |mut ast: &mut Statement<Raw>| {
2293            // Each statement type has unique option types. This macro handles them commonly.
2294            macro_rules! update_retain_history {
2295                ( $stmt:ident, $opt:ident, $name:ident ) => {{
2296                    // Replace or add the option.
2297                    let pos = $stmt
2298                        .with_options
2299                        .iter()
2300                        // In case there are ever multiple, look for the last one.
2301                        .rposition(|o| o.name == mz_sql_parser::ast::$name::RetainHistory);
2302                    if let Some(value) = value {
2303                        let next = mz_sql_parser::ast::$opt {
2304                            name: mz_sql_parser::ast::$name::RetainHistory,
2305                            value: Some(WithOptionValue::RetainHistoryFor(value)),
2306                        };
2307                        if let Some(idx) = pos {
2308                            let previous = $stmt.with_options[idx].clone();
2309                            $stmt.with_options[idx] = next;
2310                            previous.value
2311                        } else {
2312                            $stmt.with_options.push(next);
2313                            None
2314                        }
2315                    } else {
2316                        if let Some(idx) = pos {
2317                            $stmt.with_options.swap_remove(idx).value
2318                        } else {
2319                            None
2320                        }
2321                    }
2322                }};
2323            }
2324            let previous = match &mut ast {
2325                Statement::CreateTable(stmt) => {
2326                    update_retain_history!(stmt, TableOption, TableOptionName)
2327                }
2328                Statement::CreateIndex(stmt) => {
2329                    update_retain_history!(stmt, IndexOption, IndexOptionName)
2330                }
2331                Statement::CreateSource(stmt) => {
2332                    update_retain_history!(stmt, CreateSourceOption, CreateSourceOptionName)
2333                }
2334                Statement::CreateMaterializedView(stmt) => {
2335                    update_retain_history!(stmt, MaterializedViewOption, MaterializedViewOptionName)
2336                }
2337                _ => {
2338                    return Err(());
2339                }
2340            };
2341            Ok(previous)
2342        };
2343
2344        let res = self.update_sql(update)?;
2345        let cw = self
2346            .custom_logical_compaction_window_mut()
2347            .expect("item must have compaction window");
2348        *cw = Some(window);
2349        Ok(res)
2350    }
2351
2352    /// Updates the timestamp interval for a source. Returns an error if this item is not a source.
2353    pub fn update_timestamp_interval(
2354        &mut self,
2355        value: Option<Value>,
2356        interval: Duration,
2357    ) -> Result<(), ()> {
2358        let update = |ast: &mut Statement<Raw>| {
2359            match ast {
2360                Statement::CreateSource(stmt) => {
2361                    let pos = stmt.with_options.iter().rposition(|o| {
2362                        o.name == mz_sql_parser::ast::CreateSourceOptionName::TimestampInterval
2363                    });
2364                    if let Some(value) = value {
2365                        let next = mz_sql_parser::ast::CreateSourceOption {
2366                            name: mz_sql_parser::ast::CreateSourceOptionName::TimestampInterval,
2367                            value: Some(WithOptionValue::Value(value)),
2368                        };
2369                        if let Some(idx) = pos {
2370                            stmt.with_options[idx] = next;
2371                        } else {
2372                            stmt.with_options.push(next);
2373                        }
2374                    } else {
2375                        if let Some(idx) = pos {
2376                            stmt.with_options.swap_remove(idx);
2377                        }
2378                    }
2379                }
2380                _ => return Err(()),
2381            };
2382            Ok(())
2383        };
2384
2385        self.update_sql(update)?;
2386
2387        // Update the in-memory SourceDesc timestamp_interval.
2388        match self {
2389            CatalogItem::Source(source) => {
2390                match &mut source.data_source {
2391                    DataSourceDesc::Ingestion { desc, .. }
2392                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
2393                        desc.timestamp_interval = interval;
2394                    }
2395                    _ => return Err(()),
2396                }
2397                Ok(())
2398            }
2399            _ => Err(()),
2400        }
2401    }
2402
2403    pub fn add_column(
2404        &mut self,
2405        name: ColumnName,
2406        typ: SqlColumnType,
2407        sql: RawDataType,
2408    ) -> Result<RelationVersion, PlanError> {
2409        let CatalogItem::Table(table) = self else {
2410            return Err(PlanError::Unsupported {
2411                feature: "adding columns to a non-Table".to_string(),
2412                discussion_no: None,
2413            });
2414        };
2415        let next_version = table.desc.add_column(name.clone(), typ);
2416
2417        let update = |mut ast: &mut Statement<Raw>| match &mut ast {
2418            Statement::CreateTable(stmt) => {
2419                let version = ColumnOptionDef {
2420                    name: None,
2421                    option: ColumnOption::Versioned {
2422                        action: ColumnVersioned::Added,
2423                        version: next_version.into(),
2424                    },
2425                };
2426                let column = ColumnDef {
2427                    name: name.into(),
2428                    data_type: sql,
2429                    collation: None,
2430                    options: vec![version],
2431                };
2432                stmt.columns.push(column);
2433                Ok(())
2434            }
2435            _ => Err(()),
2436        };
2437
2438        self.update_sql(update)
2439            .map_err(|()| PlanError::Unstructured("expected CREATE TABLE statement".to_string()))?;
2440        Ok(next_version)
2441    }
2442
2443    /// Updates the create_sql field of this item. Returns an error if this is a builtin item,
2444    /// otherwise returns f's result.
2445    pub fn update_sql<F, T>(&mut self, f: F) -> Result<T, ()>
2446    where
2447        F: FnOnce(&mut Statement<Raw>) -> Result<T, ()>,
2448    {
2449        let create_sql = match self {
2450            CatalogItem::Table(Table { create_sql, .. })
2451            | CatalogItem::Type(Type { create_sql, .. })
2452            | CatalogItem::Source(Source { create_sql, .. }) => create_sql.as_mut(),
2453            CatalogItem::Sink(Sink { create_sql, .. })
2454            | CatalogItem::View(View { create_sql, .. })
2455            | CatalogItem::MaterializedView(MaterializedView { create_sql, .. })
2456            | CatalogItem::Index(Index { create_sql, .. })
2457            | CatalogItem::Secret(Secret { create_sql, .. })
2458            | CatalogItem::Connection(Connection { create_sql, .. })
2459            | CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => Some(create_sql),
2460            CatalogItem::Func(_) | CatalogItem::Log(_) => None,
2461        };
2462        let Some(create_sql) = create_sql else {
2463            return Err(());
2464        };
2465        let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
2466            .expect("non-system items must be parseable")
2467            .into_element()
2468            .ast;
2469        debug!("rewrite: {}", ast.to_ast_string_redacted());
2470        let t = f(&mut ast)?;
2471        *create_sql = ast.to_ast_string_stable();
2472        debug!("rewrote: {}", ast.to_ast_string_redacted());
2473        Ok(t)
2474    }
2475
2476    /// If the object is considered a "compute object"
2477    /// (i.e., it is managed by the compute controller),
2478    /// this function returns its cluster ID. Otherwise, it returns nothing.
2479    ///
2480    /// This function differs from `cluster_id` because while all
2481    /// compute objects run on a cluster, the converse is not true.
2482    pub fn is_compute_object_on_cluster(&self) -> Option<ClusterId> {
2483        match self {
2484            CatalogItem::Index(index) => Some(index.cluster_id),
2485            CatalogItem::Table(_)
2486            | CatalogItem::Source(_)
2487            | CatalogItem::Log(_)
2488            | CatalogItem::View(_)
2489            | CatalogItem::MaterializedView(_)
2490            | CatalogItem::Sink(_)
2491            | CatalogItem::Type(_)
2492            | CatalogItem::Func(_)
2493            | CatalogItem::Secret(_)
2494            | CatalogItem::Connection(_)
2495            | CatalogItem::ContinualTask(_) => None,
2496        }
2497    }
2498
2499    pub fn cluster_id(&self) -> Option<ClusterId> {
2500        match self {
2501            CatalogItem::MaterializedView(mv) => Some(mv.cluster_id),
2502            CatalogItem::Index(index) => Some(index.cluster_id),
2503            CatalogItem::Source(source) => match &source.data_source {
2504                DataSourceDesc::Ingestion { cluster_id, .. }
2505                | DataSourceDesc::OldSyntaxIngestion { cluster_id, .. } => Some(*cluster_id),
2506                // This is somewhat of a lie because the export runs on the same
2507                // cluster as its ingestion but we don't yet have a way of
2508                // cross-referencing the items
2509                DataSourceDesc::IngestionExport { .. } => None,
2510                DataSourceDesc::Webhook { cluster_id, .. } => Some(*cluster_id),
2511                DataSourceDesc::Introspection(_)
2512                | DataSourceDesc::Progress
2513                | DataSourceDesc::Catalog => None,
2514            },
2515            CatalogItem::Sink(sink) => Some(sink.cluster_id),
2516            CatalogItem::ContinualTask(ct) => Some(ct.cluster_id),
2517            CatalogItem::Table(_)
2518            | CatalogItem::Log(_)
2519            | CatalogItem::View(_)
2520            | CatalogItem::Type(_)
2521            | CatalogItem::Func(_)
2522            | CatalogItem::Secret(_)
2523            | CatalogItem::Connection(_) => None,
2524        }
2525    }
2526
2527    /// The custom compaction window, if any has been set. This does not reflect any propagated
2528    /// compaction window (i.e., source -> subsource).
2529    pub fn custom_logical_compaction_window(&self) -> Option<CompactionWindow> {
2530        match self {
2531            CatalogItem::Table(table) => table.custom_logical_compaction_window,
2532            CatalogItem::Source(source) => source.custom_logical_compaction_window,
2533            CatalogItem::Index(index) => index.custom_logical_compaction_window,
2534            CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
2535            CatalogItem::Log(_)
2536            | CatalogItem::View(_)
2537            | CatalogItem::Sink(_)
2538            | CatalogItem::Type(_)
2539            | CatalogItem::Func(_)
2540            | CatalogItem::Secret(_)
2541            | CatalogItem::Connection(_)
2542            | CatalogItem::ContinualTask(_) => None,
2543        }
2544    }
2545
2546    /// Mutable access to the custom compaction window, or None if this type does not support custom
2547    /// compaction windows. This does not reflect any propagated compaction window (i.e., source ->
2548    /// subsource).
2549    pub fn custom_logical_compaction_window_mut(
2550        &mut self,
2551    ) -> Option<&mut Option<CompactionWindow>> {
2552        let cw = match self {
2553            CatalogItem::Table(table) => &mut table.custom_logical_compaction_window,
2554            CatalogItem::Source(source) => &mut source.custom_logical_compaction_window,
2555            CatalogItem::Index(index) => &mut index.custom_logical_compaction_window,
2556            CatalogItem::MaterializedView(mview) => &mut mview.custom_logical_compaction_window,
2557            CatalogItem::Log(_)
2558            | CatalogItem::View(_)
2559            | CatalogItem::Sink(_)
2560            | CatalogItem::Type(_)
2561            | CatalogItem::Func(_)
2562            | CatalogItem::Secret(_)
2563            | CatalogItem::Connection(_)
2564            | CatalogItem::ContinualTask(_) => return None,
2565        };
2566        Some(cw)
2567    }
2568
2569    /// The initial compaction window, for objects that have one; that is, tables, sources, indexes,
2570    /// and MVs. This does not reflect any propagated compaction window (i.e., source -> subsource).
2571    ///
2572    /// If `custom_logical_compaction_window()` returns something, use that.  Otherwise, use a
2573    /// sensible default (currently 1s).
2574    ///
2575    /// For objects that do not have the concept of compaction window, return None.
2576    pub fn initial_logical_compaction_window(&self) -> Option<CompactionWindow> {
2577        let custom_logical_compaction_window = match self {
2578            CatalogItem::Table(_)
2579            | CatalogItem::Source(_)
2580            | CatalogItem::Index(_)
2581            | CatalogItem::MaterializedView(_)
2582            | CatalogItem::ContinualTask(_) => self.custom_logical_compaction_window(),
2583            CatalogItem::Log(_)
2584            | CatalogItem::View(_)
2585            | CatalogItem::Sink(_)
2586            | CatalogItem::Type(_)
2587            | CatalogItem::Func(_)
2588            | CatalogItem::Secret(_)
2589            | CatalogItem::Connection(_) => return None,
2590        };
2591        Some(custom_logical_compaction_window.unwrap_or(CompactionWindow::Default))
2592    }
2593
2594    /// Whether the item's logical compaction window
2595    /// is controlled by the METRICS_RETENTION
2596    /// system var.
2597    pub fn is_retained_metrics_object(&self) -> bool {
2598        match self {
2599            CatalogItem::Table(table) => table.is_retained_metrics_object,
2600            CatalogItem::Source(source) => source.is_retained_metrics_object,
2601            CatalogItem::Index(index) => index.is_retained_metrics_object,
2602            CatalogItem::Log(_)
2603            | CatalogItem::View(_)
2604            | CatalogItem::MaterializedView(_)
2605            | CatalogItem::Sink(_)
2606            | CatalogItem::Type(_)
2607            | CatalogItem::Func(_)
2608            | CatalogItem::Secret(_)
2609            | CatalogItem::Connection(_)
2610            | CatalogItem::ContinualTask(_) => false,
2611        }
2612    }
2613
2614    pub fn to_serialized(&self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2615        match self {
2616            CatalogItem::Table(table) => {
2617                let create_sql = table
2618                    .create_sql
2619                    .clone()
2620                    .expect("builtin tables cannot be serialized");
2621                let mut collections = table.collections.clone();
2622                let global_id = collections
2623                    .remove(&RelationVersion::root())
2624                    .expect("at least one version");
2625                (create_sql, global_id, collections)
2626            }
2627            CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2628            CatalogItem::Source(source) => {
2629                assert!(
2630                    !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2631                    "cannot serialize introspection/builtin sources",
2632                );
2633                let create_sql = source
2634                    .create_sql
2635                    .clone()
2636                    .expect("builtin sources cannot be serialized");
2637                (create_sql, source.global_id, BTreeMap::new())
2638            }
2639            CatalogItem::View(view) => (view.create_sql.clone(), view.global_id, BTreeMap::new()),
2640            CatalogItem::MaterializedView(mview) => {
2641                let mut collections = mview.collections.clone();
2642                let global_id = collections
2643                    .remove(&RelationVersion::root())
2644                    .expect("at least one version");
2645                (mview.create_sql.clone(), global_id, collections)
2646            }
2647            CatalogItem::Index(index) => {
2648                (index.create_sql.clone(), index.global_id, BTreeMap::new())
2649            }
2650            CatalogItem::Sink(sink) => (sink.create_sql.clone(), sink.global_id, BTreeMap::new()),
2651            CatalogItem::Type(typ) => {
2652                let create_sql = typ
2653                    .create_sql
2654                    .clone()
2655                    .expect("builtin types cannot be serialized");
2656                (create_sql, typ.global_id, BTreeMap::new())
2657            }
2658            CatalogItem::Secret(secret) => {
2659                (secret.create_sql.clone(), secret.global_id, BTreeMap::new())
2660            }
2661            CatalogItem::Connection(connection) => (
2662                connection.create_sql.clone(),
2663                connection.global_id,
2664                BTreeMap::new(),
2665            ),
2666            CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2667            CatalogItem::ContinualTask(ct) => {
2668                (ct.create_sql.clone(), ct.global_id, BTreeMap::new())
2669            }
2670        }
2671    }
2672
2673    pub fn into_serialized(self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2674        match self {
2675            CatalogItem::Table(mut table) => {
2676                let create_sql = table
2677                    .create_sql
2678                    .expect("builtin tables cannot be serialized");
2679                let global_id = table
2680                    .collections
2681                    .remove(&RelationVersion::root())
2682                    .expect("at least one version");
2683                (create_sql, global_id, table.collections)
2684            }
2685            CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2686            CatalogItem::Source(source) => {
2687                assert!(
2688                    !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2689                    "cannot serialize introspection/builtin sources",
2690                );
2691                let create_sql = source
2692                    .create_sql
2693                    .expect("builtin sources cannot be serialized");
2694                (create_sql, source.global_id, BTreeMap::new())
2695            }
2696            CatalogItem::View(view) => (view.create_sql, view.global_id, BTreeMap::new()),
2697            CatalogItem::MaterializedView(mut mview) => {
2698                let global_id = mview
2699                    .collections
2700                    .remove(&RelationVersion::root())
2701                    .expect("at least one version");
2702                (mview.create_sql, global_id, mview.collections)
2703            }
2704            CatalogItem::Index(index) => (index.create_sql, index.global_id, BTreeMap::new()),
2705            CatalogItem::Sink(sink) => (sink.create_sql, sink.global_id, BTreeMap::new()),
2706            CatalogItem::Type(typ) => {
2707                let create_sql = typ.create_sql.expect("builtin types cannot be serialized");
2708                (create_sql, typ.global_id, BTreeMap::new())
2709            }
2710            CatalogItem::Secret(secret) => (secret.create_sql, secret.global_id, BTreeMap::new()),
2711            CatalogItem::Connection(connection) => {
2712                (connection.create_sql, connection.global_id, BTreeMap::new())
2713            }
2714            CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2715            CatalogItem::ContinualTask(ct) => (ct.create_sql, ct.global_id, BTreeMap::new()),
2716        }
2717    }
2718
2719    /// Returns a global ID for a specific version selector. Returns `None` if the item does
2720    /// not have versions or if the version does not exist.
2721    pub fn global_id_for_version(&self, version: RelationVersionSelector) -> Option<GlobalId> {
2722        let collections = match self {
2723            CatalogItem::MaterializedView(mv) => &mv.collections,
2724            CatalogItem::Table(table) => &table.collections,
2725            CatalogItem::Source(source) => return Some(source.global_id),
2726            CatalogItem::Log(log) => return Some(log.global_id),
2727            CatalogItem::View(view) => return Some(view.global_id),
2728            CatalogItem::Sink(sink) => return Some(sink.global_id),
2729            CatalogItem::Index(index) => return Some(index.global_id),
2730            CatalogItem::Type(ty) => return Some(ty.global_id),
2731            CatalogItem::Func(func) => return Some(func.global_id),
2732            CatalogItem::Secret(secret) => return Some(secret.global_id),
2733            CatalogItem::Connection(conn) => return Some(conn.global_id),
2734            CatalogItem::ContinualTask(ct) => return Some(ct.global_id),
2735        };
2736        match version {
2737            RelationVersionSelector::Latest => collections.values().last().copied(),
2738            RelationVersionSelector::Specific(version) => collections.get(&version).copied(),
2739        }
2740    }
2741}
2742
2743impl CatalogEntry {
2744    /// Reports the latest [`RelationDesc`] of the rows produced by this [`CatalogEntry`], if it
2745    /// produces rows.
2746    pub fn relation_desc_latest(&self) -> Option<Cow<'_, RelationDesc>> {
2747        self.item.relation_desc(RelationVersionSelector::Latest)
2748    }
2749
2750    /// Reports if the item has columns.
2751    pub fn has_columns(&self) -> bool {
2752        match self.item() {
2753            CatalogItem::Type(Type { details, .. }) => {
2754                matches!(details.typ, CatalogType::Record { .. })
2755            }
2756            _ => self.relation_desc_latest().is_some(),
2757        }
2758    }
2759
2760    /// Returns the [`mz_sql::func::Func`] associated with this `CatalogEntry`.
2761    pub fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
2762        self.item.func(self)
2763    }
2764
2765    /// Returns the inner [`Index`] if this entry is an index, else `None`.
2766    pub fn index(&self) -> Option<&Index> {
2767        match self.item() {
2768            CatalogItem::Index(idx) => Some(idx),
2769            _ => None,
2770        }
2771    }
2772
2773    /// Returns the inner [`MaterializedView`] if this entry is a materialized view, else `None`.
2774    pub fn materialized_view(&self) -> Option<&MaterializedView> {
2775        match self.item() {
2776            CatalogItem::MaterializedView(mv) => Some(mv),
2777            _ => None,
2778        }
2779    }
2780
2781    /// Returns the inner [`Table`] if this entry is a table, else `None`.
2782    pub fn table(&self) -> Option<&Table> {
2783        match self.item() {
2784            CatalogItem::Table(tbl) => Some(tbl),
2785            _ => None,
2786        }
2787    }
2788
2789    /// Returns the inner [`Source`] if this entry is a source, else `None`.
2790    pub fn source(&self) -> Option<&Source> {
2791        match self.item() {
2792            CatalogItem::Source(src) => Some(src),
2793            _ => None,
2794        }
2795    }
2796
2797    /// Returns the inner [`Sink`] if this entry is a sink, else `None`.
2798    pub fn sink(&self) -> Option<&Sink> {
2799        match self.item() {
2800            CatalogItem::Sink(sink) => Some(sink),
2801            _ => None,
2802        }
2803    }
2804
2805    /// Returns the inner [`Secret`] if this entry is a secret, else `None`.
2806    pub fn secret(&self) -> Option<&Secret> {
2807        match self.item() {
2808            CatalogItem::Secret(secret) => Some(secret),
2809            _ => None,
2810        }
2811    }
2812
2813    pub fn connection(&self) -> Result<&Connection, SqlCatalogError> {
2814        match self.item() {
2815            CatalogItem::Connection(connection) => Ok(connection),
2816            _ => {
2817                let db_name = match self.name().qualifiers.database_spec {
2818                    ResolvedDatabaseSpecifier::Ambient => "".to_string(),
2819                    ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
2820                };
2821                Err(SqlCatalogError::UnknownConnection(format!(
2822                    "{}{}.{}",
2823                    db_name,
2824                    self.name().qualifiers.schema_spec,
2825                    self.name().item
2826                )))
2827            }
2828        }
2829    }
2830
2831    /// Returns the [`mz_storage_types::sources::SourceDesc`] associated with
2832    /// this `CatalogEntry`, if any.
2833    pub fn source_desc(
2834        &self,
2835    ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
2836        self.item.source_desc(self)
2837    }
2838
2839    /// Reports whether this catalog entry is a connection.
2840    pub fn is_connection(&self) -> bool {
2841        matches!(self.item(), CatalogItem::Connection(_))
2842    }
2843
2844    /// Reports whether this catalog entry is a table.
2845    pub fn is_table(&self) -> bool {
2846        matches!(self.item(), CatalogItem::Table(_))
2847    }
2848
2849    /// Reports whether this catalog entry is a source. Note that this includes
2850    /// subsources.
2851    pub fn is_source(&self) -> bool {
2852        matches!(self.item(), CatalogItem::Source(_))
2853    }
2854
2855    /// Reports whether this catalog entry is a subsource and, if it is, the
2856    /// ingestion it is an export of, as well as the item it exports.
2857    pub fn subsource_details(
2858        &self,
2859    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
2860        match &self.item() {
2861            CatalogItem::Source(source) => match &source.data_source {
2862                DataSourceDesc::IngestionExport {
2863                    ingestion_id,
2864                    external_reference,
2865                    details,
2866                    data_config: _,
2867                } => Some((*ingestion_id, external_reference, details)),
2868                _ => None,
2869            },
2870            _ => None,
2871        }
2872    }
2873
2874    /// Reports whether this catalog entry is a source export and, if it is, the
2875    /// ingestion it is an export of, as well as the item it exports.
2876    pub fn source_export_details(
2877        &self,
2878    ) -> Option<(
2879        CatalogItemId,
2880        &UnresolvedItemName,
2881        &SourceExportDetails,
2882        &SourceExportDataConfig<ReferencedConnection>,
2883    )> {
2884        match &self.item() {
2885            CatalogItem::Source(source) => match &source.data_source {
2886                DataSourceDesc::IngestionExport {
2887                    ingestion_id,
2888                    external_reference,
2889                    details,
2890                    data_config,
2891                } => Some((*ingestion_id, external_reference, details, data_config)),
2892                _ => None,
2893            },
2894            CatalogItem::Table(table) => match &table.data_source {
2895                TableDataSource::DataSource {
2896                    desc:
2897                        DataSourceDesc::IngestionExport {
2898                            ingestion_id,
2899                            external_reference,
2900                            details,
2901                            data_config,
2902                        },
2903                    timeline: _,
2904                } => Some((*ingestion_id, external_reference, details, data_config)),
2905                _ => None,
2906            },
2907            _ => None,
2908        }
2909    }
2910
2911    /// Reports whether this catalog entry is a progress source.
2912    pub fn is_progress_source(&self) -> bool {
2913        self.item().is_progress_source()
2914    }
2915
2916    /// Returns the `GlobalId` of all of this entry's progress ID.
2917    pub fn progress_id(&self) -> Option<CatalogItemId> {
2918        match &self.item() {
2919            CatalogItem::Source(source) => match &source.data_source {
2920                DataSourceDesc::Ingestion { .. } => Some(self.id),
2921                DataSourceDesc::OldSyntaxIngestion {
2922                    progress_subsource, ..
2923                } => Some(*progress_subsource),
2924                DataSourceDesc::IngestionExport { .. }
2925                | DataSourceDesc::Introspection(_)
2926                | DataSourceDesc::Progress
2927                | DataSourceDesc::Webhook { .. }
2928                | DataSourceDesc::Catalog => None,
2929            },
2930            CatalogItem::Table(_)
2931            | CatalogItem::Log(_)
2932            | CatalogItem::View(_)
2933            | CatalogItem::MaterializedView(_)
2934            | CatalogItem::Sink(_)
2935            | CatalogItem::Index(_)
2936            | CatalogItem::Type(_)
2937            | CatalogItem::Func(_)
2938            | CatalogItem::Secret(_)
2939            | CatalogItem::Connection(_)
2940            | CatalogItem::ContinualTask(_) => None,
2941        }
2942    }
2943
2944    /// Reports whether this catalog entry is a sink.
2945    pub fn is_sink(&self) -> bool {
2946        matches!(self.item(), CatalogItem::Sink(_))
2947    }
2948
2949    /// Reports whether this catalog entry is a materialized view.
2950    pub fn is_materialized_view(&self) -> bool {
2951        matches!(self.item(), CatalogItem::MaterializedView(_))
2952    }
2953
2954    /// Reports whether this catalog entry is a view.
2955    pub fn is_view(&self) -> bool {
2956        matches!(self.item(), CatalogItem::View(_))
2957    }
2958
2959    /// Reports whether this catalog entry is a secret.
2960    pub fn is_secret(&self) -> bool {
2961        matches!(self.item(), CatalogItem::Secret(_))
2962    }
2963
2964    /// Reports whether this catalog entry is an introspection source.
2965    pub fn is_introspection_source(&self) -> bool {
2966        matches!(self.item(), CatalogItem::Log(_))
2967    }
2968
2969    /// Reports whether this catalog entry is an index.
2970    pub fn is_index(&self) -> bool {
2971        matches!(self.item(), CatalogItem::Index(_))
2972    }
2973
2974    /// Reports whether this catalog entry is a continual task.
2975    pub fn is_continual_task(&self) -> bool {
2976        matches!(self.item(), CatalogItem::ContinualTask(_))
2977    }
2978
2979    /// Reports whether this catalog entry can be treated as a relation, it can produce rows.
2980    pub fn is_relation(&self) -> bool {
2981        mz_sql::catalog::ObjectType::from(self.item_type()).is_relation()
2982    }
2983
2984    /// Collects the identifiers of the objects that were encountered when
2985    /// resolving names in the item's DDL statement.
2986    pub fn references(&self) -> &ResolvedIds {
2987        self.item.references()
2988    }
2989
2990    /// Collects the identifiers of the objects used by this [`CatalogEntry`].
2991    ///
2992    /// Like [`CatalogEntry::references()`] but also includes objects that are not directly
2993    /// referenced. For example this will include any catalog objects used to implement functions
2994    /// and casts in the item.
2995    pub fn uses(&self) -> BTreeSet<CatalogItemId> {
2996        self.item.uses()
2997    }
2998
2999    /// Returns the `CatalogItem` associated with this catalog entry.
3000    pub fn item(&self) -> &CatalogItem {
3001        &self.item
3002    }
3003
3004    /// Returns a mutable reference to the `CatalogItem` associated with this
3005    /// catalog entry.
3006    pub fn item_mut(&mut self) -> &mut CatalogItem {
3007        &mut self.item
3008    }
3009
3010    /// Returns the [`CatalogItemId`] of this catalog entry.
3011    pub fn id(&self) -> CatalogItemId {
3012        self.id
3013    }
3014
3015    /// Returns all of the [`GlobalId`]s associated with this item.
3016    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
3017        self.item().global_ids()
3018    }
3019
3020    pub fn latest_global_id(&self) -> GlobalId {
3021        self.item().latest_global_id()
3022    }
3023
3024    /// Returns the OID of this catalog entry.
3025    pub fn oid(&self) -> u32 {
3026        self.oid
3027    }
3028
3029    /// Returns the fully qualified name of this catalog entry.
3030    pub fn name(&self) -> &QualifiedItemName {
3031        &self.name
3032    }
3033
3034    /// Returns the identifiers of the dataflows that are directly referenced by this dataflow.
3035    pub fn referenced_by(&self) -> &[CatalogItemId] {
3036        &self.referenced_by
3037    }
3038
3039    /// Returns the identifiers of the dataflows that depend upon this dataflow.
3040    pub fn used_by(&self) -> &[CatalogItemId] {
3041        &self.used_by
3042    }
3043
3044    /// Returns the connection ID that this item belongs to, if this item is
3045    /// temporary.
3046    pub fn conn_id(&self) -> Option<&ConnectionId> {
3047        self.item.conn_id()
3048    }
3049
3050    /// Returns the role ID of the entry owner.
3051    pub fn owner_id(&self) -> &RoleId {
3052        &self.owner_id
3053    }
3054
3055    /// Returns the privileges of the entry.
3056    pub fn privileges(&self) -> &PrivilegeMap {
3057        &self.privileges
3058    }
3059
3060    /// Returns the comment object ID for this entry.
3061    pub fn comment_object_id(&self) -> CommentObjectId {
3062        use CatalogItemType::*;
3063        match self.item_type() {
3064            Table => CommentObjectId::Table(self.id),
3065            Source => CommentObjectId::Source(self.id),
3066            Sink => CommentObjectId::Sink(self.id),
3067            View => CommentObjectId::View(self.id),
3068            MaterializedView => CommentObjectId::MaterializedView(self.id),
3069            Index => CommentObjectId::Index(self.id),
3070            Func => CommentObjectId::Func(self.id),
3071            Connection => CommentObjectId::Connection(self.id),
3072            Type => CommentObjectId::Type(self.id),
3073            Secret => CommentObjectId::Secret(self.id),
3074            ContinualTask => CommentObjectId::ContinualTask(self.id),
3075        }
3076    }
3077}
3078
3079#[derive(Debug, Clone, Default)]
3080pub struct CommentsMap {
3081    map: BTreeMap<CommentObjectId, BTreeMap<Option<usize>, String>>,
3082}
3083
3084impl CommentsMap {
3085    pub fn update_comment(
3086        &mut self,
3087        object_id: CommentObjectId,
3088        sub_component: Option<usize>,
3089        comment: Option<String>,
3090    ) -> Option<String> {
3091        let object_comments = self.map.entry(object_id).or_default();
3092
3093        // Either replace the existing comment, or remove it if comment is None/NULL.
3094        let (empty, prev) = if let Some(comment) = comment {
3095            let prev = object_comments.insert(sub_component, comment);
3096            (false, prev)
3097        } else {
3098            let prev = object_comments.remove(&sub_component);
3099            (object_comments.is_empty(), prev)
3100        };
3101
3102        // Cleanup entries that are now empty.
3103        if empty {
3104            self.map.remove(&object_id);
3105        }
3106
3107        // Return the previous comment, if there was one, for easy removal.
3108        prev
3109    }
3110
3111    /// Remove all comments for `object_id` from the map.
3112    ///
3113    /// Generally there is one comment for a given [`CommentObjectId`], but in the case of
3114    /// relations you can also have comments on the individual columns. Dropping the comments for a
3115    /// relation will also drop all of the comments on any columns.
3116    pub fn drop_comments(
3117        &mut self,
3118        object_ids: &BTreeSet<CommentObjectId>,
3119    ) -> Vec<(CommentObjectId, Option<usize>, String)> {
3120        let mut removed_comments = Vec::new();
3121
3122        for object_id in object_ids {
3123            if let Some(comments) = self.map.remove(object_id) {
3124                let removed = comments
3125                    .into_iter()
3126                    .map(|(sub_comp, comment)| (object_id.clone(), sub_comp, comment));
3127                removed_comments.extend(removed);
3128            }
3129        }
3130
3131        removed_comments
3132    }
3133
3134    pub fn iter(&self) -> impl Iterator<Item = (CommentObjectId, Option<usize>, &str)> {
3135        self.map
3136            .iter()
3137            .map(|(id, comments)| {
3138                comments
3139                    .iter()
3140                    .map(|(pos, comment)| (*id, *pos, comment.as_str()))
3141            })
3142            .flatten()
3143    }
3144
3145    pub fn get_object_comments(
3146        &self,
3147        object_id: CommentObjectId,
3148    ) -> Option<&BTreeMap<Option<usize>, String>> {
3149        self.map.get(&object_id)
3150    }
3151}
3152
3153impl Serialize for CommentsMap {
3154    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
3155    where
3156        S: serde::Serializer,
3157    {
3158        let comment_count = self
3159            .map
3160            .iter()
3161            .map(|(_object_id, comments)| comments.len())
3162            .sum();
3163
3164        let mut seq = serializer.serialize_seq(Some(comment_count))?;
3165        for (object_id, sub) in &self.map {
3166            for (sub_component, comment) in sub {
3167                seq.serialize_element(&(
3168                    format!("{object_id:?}"),
3169                    format!("{sub_component:?}"),
3170                    comment,
3171                ))?;
3172            }
3173        }
3174        seq.end()
3175    }
3176}
3177
3178#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
3179pub struct DefaultPrivileges {
3180    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
3181    privileges: BTreeMap<DefaultPrivilegeObject, RoleDefaultPrivileges>,
3182}
3183
3184// Use a new type here because otherwise we have two levels of BTreeMap, both needing
3185// map_key_to_string.
3186#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
3187struct RoleDefaultPrivileges(
3188    /// Denormalized, the key is the grantee Role.
3189    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
3190    BTreeMap<RoleId, DefaultPrivilegeAclItem>,
3191);
3192
3193impl Deref for RoleDefaultPrivileges {
3194    type Target = BTreeMap<RoleId, DefaultPrivilegeAclItem>;
3195
3196    fn deref(&self) -> &Self::Target {
3197        &self.0
3198    }
3199}
3200
3201impl DerefMut for RoleDefaultPrivileges {
3202    fn deref_mut(&mut self) -> &mut Self::Target {
3203        &mut self.0
3204    }
3205}
3206
3207impl DefaultPrivileges {
3208    /// Add a new default privilege into the set of all default privileges.
3209    pub fn grant(&mut self, object: DefaultPrivilegeObject, privilege: DefaultPrivilegeAclItem) {
3210        if privilege.acl_mode.is_empty() {
3211            return;
3212        }
3213
3214        let privileges = self.privileges.entry(object).or_default();
3215        if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
3216            default_privilege.acl_mode |= privilege.acl_mode;
3217        } else {
3218            privileges.insert(privilege.grantee, privilege);
3219        }
3220    }
3221
3222    /// Revoke a default privilege from the set of all default privileges.
3223    pub fn revoke(&mut self, object: &DefaultPrivilegeObject, privilege: &DefaultPrivilegeAclItem) {
3224        if let Some(privileges) = self.privileges.get_mut(object) {
3225            if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
3226                default_privilege.acl_mode =
3227                    default_privilege.acl_mode.difference(privilege.acl_mode);
3228                if default_privilege.acl_mode.is_empty() {
3229                    privileges.remove(&privilege.grantee);
3230                }
3231            }
3232            if privileges.is_empty() {
3233                self.privileges.remove(object);
3234            }
3235        }
3236    }
3237
3238    /// Get the privileges that will be granted on all objects matching `object` to `grantee`, if
3239    /// any exist.
3240    pub fn get_privileges_for_grantee(
3241        &self,
3242        object: &DefaultPrivilegeObject,
3243        grantee: &RoleId,
3244    ) -> Option<&AclMode> {
3245        self.privileges
3246            .get(object)
3247            .and_then(|privileges| privileges.get(grantee))
3248            .map(|privilege| &privilege.acl_mode)
3249    }
3250
3251    /// Get all default privileges that apply to the provided object details.
3252    pub fn get_applicable_privileges(
3253        &self,
3254        role_id: RoleId,
3255        database_id: Option<DatabaseId>,
3256        schema_id: Option<SchemaId>,
3257        object_type: mz_sql::catalog::ObjectType,
3258    ) -> impl Iterator<Item = DefaultPrivilegeAclItem> + '_ {
3259        // Privileges consider all relations to be of type table due to PostgreSQL compatibility. We
3260        // don't require the caller to worry about that and we will map their `object_type` to the
3261        // correct type for privileges.
3262        let privilege_object_type = if object_type.is_relation() {
3263            mz_sql::catalog::ObjectType::Table
3264        } else {
3265            object_type
3266        };
3267        let valid_acl_mode = rbac::all_object_privileges(SystemObjectType::Object(object_type));
3268
3269        // Collect all entries that apply to the provided object details.
3270        // If either `database_id` or `schema_id` are `None`, then we might end up with duplicate
3271        // entries in the vec below. That's OK because we consolidate the results after.
3272        [
3273            DefaultPrivilegeObject {
3274                role_id,
3275                database_id,
3276                schema_id,
3277                object_type: privilege_object_type,
3278            },
3279            DefaultPrivilegeObject {
3280                role_id,
3281                database_id,
3282                schema_id: None,
3283                object_type: privilege_object_type,
3284            },
3285            DefaultPrivilegeObject {
3286                role_id,
3287                database_id: None,
3288                schema_id: None,
3289                object_type: privilege_object_type,
3290            },
3291            DefaultPrivilegeObject {
3292                role_id: RoleId::Public,
3293                database_id,
3294                schema_id,
3295                object_type: privilege_object_type,
3296            },
3297            DefaultPrivilegeObject {
3298                role_id: RoleId::Public,
3299                database_id,
3300                schema_id: None,
3301                object_type: privilege_object_type,
3302            },
3303            DefaultPrivilegeObject {
3304                role_id: RoleId::Public,
3305                database_id: None,
3306                schema_id: None,
3307                object_type: privilege_object_type,
3308            },
3309        ]
3310        .into_iter()
3311        .filter_map(|object| self.privileges.get(&object))
3312        .flat_map(|acl_map| acl_map.values())
3313        // Consolidate privileges with a common grantee.
3314        .fold(
3315            BTreeMap::new(),
3316            |mut accum, DefaultPrivilegeAclItem { grantee, acl_mode }| {
3317                let accum_acl_mode = accum.entry(grantee).or_insert_with(AclMode::empty);
3318                *accum_acl_mode |= *acl_mode;
3319                accum
3320            },
3321        )
3322        .into_iter()
3323        // Restrict the acl_mode to only privileges valid for the provided object type. If the
3324        // default privilege has an object type of Table, then it may contain privileges valid for
3325        // tables but not other relations. If the passed in object type is another relation, then
3326        // we need to remove any privilege that is not valid for the specified relation.
3327        .map(move |(grantee, acl_mode)| (grantee, acl_mode & valid_acl_mode))
3328        // Filter out empty privileges.
3329        .filter(|(_, acl_mode)| !acl_mode.is_empty())
3330        .map(|(grantee, acl_mode)| DefaultPrivilegeAclItem {
3331            grantee: *grantee,
3332            acl_mode,
3333        })
3334    }
3335
3336    pub fn iter(
3337        &self,
3338    ) -> impl Iterator<
3339        Item = (
3340            &DefaultPrivilegeObject,
3341            impl Iterator<Item = &DefaultPrivilegeAclItem>,
3342        ),
3343    > {
3344        self.privileges
3345            .iter()
3346            .map(|(object, acl_map)| (object, acl_map.values()))
3347    }
3348}
3349
3350#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3351pub struct ClusterConfig {
3352    pub variant: ClusterVariant,
3353    pub workload_class: Option<String>,
3354}
3355
3356impl ClusterConfig {
3357    pub fn features(&self) -> Option<&OptimizerFeatureOverrides> {
3358        match &self.variant {
3359            ClusterVariant::Managed(managed) => Some(&managed.optimizer_feature_overrides),
3360            ClusterVariant::Unmanaged => None,
3361        }
3362    }
3363}
3364
3365impl From<ClusterConfig> for durable::ClusterConfig {
3366    fn from(config: ClusterConfig) -> Self {
3367        Self {
3368            variant: config.variant.into(),
3369            workload_class: config.workload_class,
3370        }
3371    }
3372}
3373
3374impl From<durable::ClusterConfig> for ClusterConfig {
3375    fn from(config: durable::ClusterConfig) -> Self {
3376        Self {
3377            variant: config.variant.into(),
3378            workload_class: config.workload_class,
3379        }
3380    }
3381}
3382
3383#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3384pub struct ClusterVariantManaged {
3385    pub size: String,
3386    pub availability_zones: Vec<String>,
3387    pub logging: ReplicaLogging,
3388    pub replication_factor: u32,
3389    pub optimizer_feature_overrides: OptimizerFeatureOverrides,
3390    pub schedule: ClusterSchedule,
3391}
3392
3393impl From<ClusterVariantManaged> for durable::ClusterVariantManaged {
3394    fn from(managed: ClusterVariantManaged) -> Self {
3395        Self {
3396            size: managed.size,
3397            availability_zones: managed.availability_zones,
3398            logging: managed.logging,
3399            replication_factor: managed.replication_factor,
3400            optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3401            schedule: managed.schedule,
3402        }
3403    }
3404}
3405
3406impl From<durable::ClusterVariantManaged> for ClusterVariantManaged {
3407    fn from(managed: durable::ClusterVariantManaged) -> Self {
3408        Self {
3409            size: managed.size,
3410            availability_zones: managed.availability_zones,
3411            logging: managed.logging,
3412            replication_factor: managed.replication_factor,
3413            optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3414            schedule: managed.schedule,
3415        }
3416    }
3417}
3418
3419#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3420pub enum ClusterVariant {
3421    Managed(ClusterVariantManaged),
3422    Unmanaged,
3423}
3424
3425impl From<ClusterVariant> for durable::ClusterVariant {
3426    fn from(variant: ClusterVariant) -> Self {
3427        match variant {
3428            ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3429            ClusterVariant::Unmanaged => Self::Unmanaged,
3430        }
3431    }
3432}
3433
3434impl From<durable::ClusterVariant> for ClusterVariant {
3435    fn from(variant: durable::ClusterVariant) -> Self {
3436        match variant {
3437            durable::ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3438            durable::ClusterVariant::Unmanaged => Self::Unmanaged,
3439        }
3440    }
3441}
3442
3443impl mz_sql::catalog::CatalogDatabase for Database {
3444    fn name(&self) -> &str {
3445        &self.name
3446    }
3447
3448    fn id(&self) -> DatabaseId {
3449        self.id
3450    }
3451
3452    fn has_schemas(&self) -> bool {
3453        !self.schemas_by_name.is_empty()
3454    }
3455
3456    fn schema_ids(&self) -> &BTreeMap<String, SchemaId> {
3457        &self.schemas_by_name
3458    }
3459
3460    // `as` is ok to use to cast to a trait object.
3461    #[allow(clippy::as_conversions)]
3462    fn schemas(&self) -> Vec<&dyn CatalogSchema> {
3463        self.schemas_by_id
3464            .values()
3465            .map(|schema| schema as &dyn CatalogSchema)
3466            .collect()
3467    }
3468
3469    fn owner_id(&self) -> RoleId {
3470        self.owner_id
3471    }
3472
3473    fn privileges(&self) -> &PrivilegeMap {
3474        &self.privileges
3475    }
3476}
3477
3478impl mz_sql::catalog::CatalogSchema for Schema {
3479    fn database(&self) -> &ResolvedDatabaseSpecifier {
3480        &self.name.database
3481    }
3482
3483    fn name(&self) -> &QualifiedSchemaName {
3484        &self.name
3485    }
3486
3487    fn id(&self) -> &SchemaSpecifier {
3488        &self.id
3489    }
3490
3491    fn has_items(&self) -> bool {
3492        !self.items.is_empty()
3493    }
3494
3495    fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_> {
3496        Box::new(
3497            self.items
3498                .values()
3499                .chain(self.functions.values())
3500                .chain(self.types.values())
3501                .copied(),
3502        )
3503    }
3504
3505    fn owner_id(&self) -> RoleId {
3506        self.owner_id
3507    }
3508
3509    fn privileges(&self) -> &PrivilegeMap {
3510        &self.privileges
3511    }
3512}
3513
3514impl mz_sql::catalog::CatalogRole for Role {
3515    fn name(&self) -> &str {
3516        &self.name
3517    }
3518
3519    fn id(&self) -> RoleId {
3520        self.id
3521    }
3522
3523    fn membership(&self) -> &BTreeMap<RoleId, RoleId> {
3524        &self.membership.map
3525    }
3526
3527    fn attributes(&self) -> &RoleAttributes {
3528        &self.attributes
3529    }
3530
3531    fn vars(&self) -> &BTreeMap<String, OwnedVarInput> {
3532        &self.vars.map
3533    }
3534}
3535
3536impl mz_sql::catalog::CatalogNetworkPolicy for NetworkPolicy {
3537    fn name(&self) -> &str {
3538        &self.name
3539    }
3540
3541    fn id(&self) -> NetworkPolicyId {
3542        self.id
3543    }
3544
3545    fn owner_id(&self) -> RoleId {
3546        self.owner_id
3547    }
3548
3549    fn privileges(&self) -> &PrivilegeMap {
3550        &self.privileges
3551    }
3552}
3553
3554impl mz_sql::catalog::CatalogCluster<'_> for Cluster {
3555    fn name(&self) -> &str {
3556        &self.name
3557    }
3558
3559    fn id(&self) -> ClusterId {
3560        self.id
3561    }
3562
3563    fn bound_objects(&self) -> &BTreeSet<CatalogItemId> {
3564        &self.bound_objects
3565    }
3566
3567    fn replica_ids(&self) -> &BTreeMap<String, ReplicaId> {
3568        &self.replica_id_by_name_
3569    }
3570
3571    // `as` is ok to use to cast to a trait object.
3572    #[allow(clippy::as_conversions)]
3573    fn replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>> {
3574        self.replicas()
3575            .map(|replica| replica as &dyn CatalogClusterReplica)
3576            .collect()
3577    }
3578
3579    fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica<'_> {
3580        self.replica(id).expect("catalog out of sync")
3581    }
3582
3583    fn owner_id(&self) -> RoleId {
3584        self.owner_id
3585    }
3586
3587    fn privileges(&self) -> &PrivilegeMap {
3588        &self.privileges
3589    }
3590
3591    fn is_managed(&self) -> bool {
3592        self.is_managed()
3593    }
3594
3595    fn managed_size(&self) -> Option<&str> {
3596        match &self.config.variant {
3597            ClusterVariant::Managed(ClusterVariantManaged { size, .. }) => Some(size),
3598            ClusterVariant::Unmanaged => None,
3599        }
3600    }
3601
3602    fn schedule(&self) -> Option<&ClusterSchedule> {
3603        match &self.config.variant {
3604            ClusterVariant::Managed(ClusterVariantManaged { schedule, .. }) => Some(schedule),
3605            ClusterVariant::Unmanaged => None,
3606        }
3607    }
3608
3609    fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
3610        self.try_to_plan()
3611    }
3612}
3613
3614impl mz_sql::catalog::CatalogClusterReplica<'_> for ClusterReplica {
3615    fn name(&self) -> &str {
3616        &self.name
3617    }
3618
3619    fn cluster_id(&self) -> ClusterId {
3620        self.cluster_id
3621    }
3622
3623    fn replica_id(&self) -> ReplicaId {
3624        self.replica_id
3625    }
3626
3627    fn owner_id(&self) -> RoleId {
3628        self.owner_id
3629    }
3630
3631    fn internal(&self) -> bool {
3632        self.config.location.internal()
3633    }
3634}
3635
3636impl mz_sql::catalog::CatalogItem for CatalogEntry {
3637    fn name(&self) -> &QualifiedItemName {
3638        self.name()
3639    }
3640
3641    fn id(&self) -> CatalogItemId {
3642        self.id()
3643    }
3644
3645    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
3646        Box::new(self.global_ids())
3647    }
3648
3649    fn oid(&self) -> u32 {
3650        self.oid()
3651    }
3652
3653    fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
3654        self.func()
3655    }
3656
3657    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
3658        self.source_desc()
3659    }
3660
3661    fn connection(
3662        &self,
3663    ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
3664    {
3665        Ok(self.connection()?.details.to_connection())
3666    }
3667
3668    fn create_sql(&self) -> &str {
3669        match self.item() {
3670            CatalogItem::Table(Table { create_sql, .. }) => {
3671                create_sql.as_deref().unwrap_or("<builtin>")
3672            }
3673            CatalogItem::Source(Source { create_sql, .. }) => {
3674                create_sql.as_deref().unwrap_or("<builtin>")
3675            }
3676            CatalogItem::Sink(Sink { create_sql, .. }) => create_sql,
3677            CatalogItem::View(View { create_sql, .. }) => create_sql,
3678            CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) => create_sql,
3679            CatalogItem::Index(Index { create_sql, .. }) => create_sql,
3680            CatalogItem::Type(Type { create_sql, .. }) => {
3681                create_sql.as_deref().unwrap_or("<builtin>")
3682            }
3683            CatalogItem::Secret(Secret { create_sql, .. }) => create_sql,
3684            CatalogItem::Connection(Connection { create_sql, .. }) => create_sql,
3685            CatalogItem::Func(_) => "<builtin>",
3686            CatalogItem::Log(_) => "<builtin>",
3687            CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => create_sql,
3688        }
3689    }
3690
3691    fn item_type(&self) -> SqlCatalogItemType {
3692        self.item().typ()
3693    }
3694
3695    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
3696        if let CatalogItem::Index(Index { keys, on, .. }) = self.item() {
3697            Some((keys, *on))
3698        } else {
3699            None
3700        }
3701    }
3702
3703    fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
3704        if let CatalogItem::Table(Table {
3705            data_source: TableDataSource::TableWrites { defaults },
3706            ..
3707        }) = self.item()
3708        {
3709            Some(defaults.as_slice())
3710        } else {
3711            None
3712        }
3713    }
3714
3715    fn replacement_target(&self) -> Option<CatalogItemId> {
3716        if let CatalogItem::MaterializedView(mv) = self.item() {
3717            mv.replacement_target
3718        } else {
3719            None
3720        }
3721    }
3722
3723    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
3724        if let CatalogItem::Type(Type { details, .. }) = self.item() {
3725            Some(details)
3726        } else {
3727            None
3728        }
3729    }
3730
3731    fn references(&self) -> &ResolvedIds {
3732        self.references()
3733    }
3734
3735    fn uses(&self) -> BTreeSet<CatalogItemId> {
3736        self.uses()
3737    }
3738
3739    fn referenced_by(&self) -> &[CatalogItemId] {
3740        self.referenced_by()
3741    }
3742
3743    fn used_by(&self) -> &[CatalogItemId] {
3744        self.used_by()
3745    }
3746
3747    fn subsource_details(
3748        &self,
3749    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
3750        self.subsource_details()
3751    }
3752
3753    fn source_export_details(
3754        &self,
3755    ) -> Option<(
3756        CatalogItemId,
3757        &UnresolvedItemName,
3758        &SourceExportDetails,
3759        &SourceExportDataConfig<ReferencedConnection>,
3760    )> {
3761        self.source_export_details()
3762    }
3763
3764    fn is_progress_source(&self) -> bool {
3765        self.is_progress_source()
3766    }
3767
3768    fn progress_id(&self) -> Option<CatalogItemId> {
3769        self.progress_id()
3770    }
3771
3772    fn owner_id(&self) -> RoleId {
3773        self.owner_id
3774    }
3775
3776    fn privileges(&self) -> &PrivilegeMap {
3777        &self.privileges
3778    }
3779
3780    fn cluster_id(&self) -> Option<ClusterId> {
3781        self.item().cluster_id()
3782    }
3783
3784    fn at_version(
3785        &self,
3786        version: RelationVersionSelector,
3787    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
3788        Box::new(CatalogCollectionEntry {
3789            entry: self.clone(),
3790            version,
3791        })
3792    }
3793
3794    fn latest_version(&self) -> Option<RelationVersion> {
3795        self.table().map(|t| t.desc.latest_version())
3796    }
3797}
3798
3799/// A single update to the catalog state.
3800#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3801pub struct StateUpdate {
3802    pub kind: StateUpdateKind,
3803    pub ts: Timestamp,
3804    pub diff: StateDiff,
3805}
3806
3807/// The contents of a single state update.
3808///
3809/// Variants are listed in dependency order.
3810#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3811pub enum StateUpdateKind {
3812    Role(durable::objects::Role),
3813    RoleAuth(durable::objects::RoleAuth),
3814    Database(durable::objects::Database),
3815    Schema(durable::objects::Schema),
3816    DefaultPrivilege(durable::objects::DefaultPrivilege),
3817    SystemPrivilege(MzAclItem),
3818    SystemConfiguration(durable::objects::SystemConfiguration),
3819    Cluster(durable::objects::Cluster),
3820    NetworkPolicy(durable::objects::NetworkPolicy),
3821    IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3822    ClusterReplica(durable::objects::ClusterReplica),
3823    SourceReferences(durable::objects::SourceReferences),
3824    SystemObjectMapping(durable::objects::SystemObjectMapping),
3825    // Temporary items are not actually updated via the durable catalog, but
3826    // this allows us to model them the same way as all other items in parts of
3827    // the pipeline.
3828    TemporaryItem(TemporaryItem),
3829    Item(durable::objects::Item),
3830    Comment(durable::objects::Comment),
3831    AuditLog(durable::objects::AuditLog),
3832    // Storage updates.
3833    StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3834    UnfinalizedShard(durable::objects::UnfinalizedShard),
3835}
3836
3837/// Valid diffs for catalog state updates.
3838#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
3839pub enum StateDiff {
3840    Retraction,
3841    Addition,
3842}
3843
3844impl From<StateDiff> for Diff {
3845    fn from(diff: StateDiff) -> Self {
3846        match diff {
3847            StateDiff::Retraction => Diff::MINUS_ONE,
3848            StateDiff::Addition => Diff::ONE,
3849        }
3850    }
3851}
3852impl TryFrom<Diff> for StateDiff {
3853    type Error = String;
3854
3855    fn try_from(diff: Diff) -> Result<Self, Self::Error> {
3856        match diff {
3857            Diff::MINUS_ONE => Ok(Self::Retraction),
3858            Diff::ONE => Ok(Self::Addition),
3859            diff => Err(format!("invalid diff {diff}")),
3860        }
3861    }
3862}
3863
3864/// Information needed to process an update to a temporary item.
3865#[derive(Debug, Clone, Ord, PartialOrd, PartialEq, Eq)]
3866pub struct TemporaryItem {
3867    pub id: CatalogItemId,
3868    pub oid: u32,
3869    pub global_id: GlobalId,
3870    pub schema_id: SchemaId,
3871    pub name: String,
3872    pub conn_id: Option<ConnectionId>,
3873    pub create_sql: String,
3874    pub owner_id: RoleId,
3875    pub privileges: Vec<MzAclItem>,
3876    pub extra_versions: BTreeMap<RelationVersion, GlobalId>,
3877}
3878
3879impl From<CatalogEntry> for TemporaryItem {
3880    fn from(entry: CatalogEntry) -> Self {
3881        let conn_id = entry.conn_id().cloned();
3882        let (create_sql, global_id, extra_versions) = entry.item.to_serialized();
3883
3884        TemporaryItem {
3885            id: entry.id,
3886            oid: entry.oid,
3887            global_id,
3888            schema_id: entry.name.qualifiers.schema_spec.into(),
3889            name: entry.name.item,
3890            conn_id,
3891            create_sql,
3892            owner_id: entry.owner_id,
3893            privileges: entry.privileges.into_all_values().collect(),
3894            extra_versions,
3895        }
3896    }
3897}
3898
3899impl TemporaryItem {
3900    pub fn item_type(&self) -> CatalogItemType {
3901        item_type(&self.create_sql)
3902    }
3903}
3904
3905/// The same as [`StateUpdateKind`], but without `TemporaryItem` so we can derive [`Ord`].
3906#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
3907pub enum BootstrapStateUpdateKind {
3908    Role(durable::objects::Role),
3909    RoleAuth(durable::objects::RoleAuth),
3910    Database(durable::objects::Database),
3911    Schema(durable::objects::Schema),
3912    DefaultPrivilege(durable::objects::DefaultPrivilege),
3913    SystemPrivilege(MzAclItem),
3914    SystemConfiguration(durable::objects::SystemConfiguration),
3915    Cluster(durable::objects::Cluster),
3916    NetworkPolicy(durable::objects::NetworkPolicy),
3917    IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3918    ClusterReplica(durable::objects::ClusterReplica),
3919    SourceReferences(durable::objects::SourceReferences),
3920    SystemObjectMapping(durable::objects::SystemObjectMapping),
3921    Item(durable::objects::Item),
3922    Comment(durable::objects::Comment),
3923    AuditLog(durable::objects::AuditLog),
3924    // Storage updates.
3925    StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3926    UnfinalizedShard(durable::objects::UnfinalizedShard),
3927}
3928
3929impl From<BootstrapStateUpdateKind> for StateUpdateKind {
3930    fn from(value: BootstrapStateUpdateKind) -> Self {
3931        match value {
3932            BootstrapStateUpdateKind::Role(kind) => StateUpdateKind::Role(kind),
3933            BootstrapStateUpdateKind::RoleAuth(kind) => StateUpdateKind::RoleAuth(kind),
3934            BootstrapStateUpdateKind::Database(kind) => StateUpdateKind::Database(kind),
3935            BootstrapStateUpdateKind::Schema(kind) => StateUpdateKind::Schema(kind),
3936            BootstrapStateUpdateKind::DefaultPrivilege(kind) => {
3937                StateUpdateKind::DefaultPrivilege(kind)
3938            }
3939            BootstrapStateUpdateKind::SystemPrivilege(kind) => {
3940                StateUpdateKind::SystemPrivilege(kind)
3941            }
3942            BootstrapStateUpdateKind::SystemConfiguration(kind) => {
3943                StateUpdateKind::SystemConfiguration(kind)
3944            }
3945            BootstrapStateUpdateKind::SourceReferences(kind) => {
3946                StateUpdateKind::SourceReferences(kind)
3947            }
3948            BootstrapStateUpdateKind::Cluster(kind) => StateUpdateKind::Cluster(kind),
3949            BootstrapStateUpdateKind::NetworkPolicy(kind) => StateUpdateKind::NetworkPolicy(kind),
3950            BootstrapStateUpdateKind::IntrospectionSourceIndex(kind) => {
3951                StateUpdateKind::IntrospectionSourceIndex(kind)
3952            }
3953            BootstrapStateUpdateKind::ClusterReplica(kind) => StateUpdateKind::ClusterReplica(kind),
3954            BootstrapStateUpdateKind::SystemObjectMapping(kind) => {
3955                StateUpdateKind::SystemObjectMapping(kind)
3956            }
3957            BootstrapStateUpdateKind::Item(kind) => StateUpdateKind::Item(kind),
3958            BootstrapStateUpdateKind::Comment(kind) => StateUpdateKind::Comment(kind),
3959            BootstrapStateUpdateKind::AuditLog(kind) => StateUpdateKind::AuditLog(kind),
3960            BootstrapStateUpdateKind::StorageCollectionMetadata(kind) => {
3961                StateUpdateKind::StorageCollectionMetadata(kind)
3962            }
3963            BootstrapStateUpdateKind::UnfinalizedShard(kind) => {
3964                StateUpdateKind::UnfinalizedShard(kind)
3965            }
3966        }
3967    }
3968}
3969
3970impl TryFrom<StateUpdateKind> for BootstrapStateUpdateKind {
3971    type Error = TemporaryItem;
3972
3973    fn try_from(value: StateUpdateKind) -> Result<Self, Self::Error> {
3974        match value {
3975            StateUpdateKind::Role(kind) => Ok(BootstrapStateUpdateKind::Role(kind)),
3976            StateUpdateKind::RoleAuth(kind) => Ok(BootstrapStateUpdateKind::RoleAuth(kind)),
3977            StateUpdateKind::Database(kind) => Ok(BootstrapStateUpdateKind::Database(kind)),
3978            StateUpdateKind::Schema(kind) => Ok(BootstrapStateUpdateKind::Schema(kind)),
3979            StateUpdateKind::DefaultPrivilege(kind) => {
3980                Ok(BootstrapStateUpdateKind::DefaultPrivilege(kind))
3981            }
3982            StateUpdateKind::SystemPrivilege(kind) => {
3983                Ok(BootstrapStateUpdateKind::SystemPrivilege(kind))
3984            }
3985            StateUpdateKind::SystemConfiguration(kind) => {
3986                Ok(BootstrapStateUpdateKind::SystemConfiguration(kind))
3987            }
3988            StateUpdateKind::Cluster(kind) => Ok(BootstrapStateUpdateKind::Cluster(kind)),
3989            StateUpdateKind::NetworkPolicy(kind) => {
3990                Ok(BootstrapStateUpdateKind::NetworkPolicy(kind))
3991            }
3992            StateUpdateKind::IntrospectionSourceIndex(kind) => {
3993                Ok(BootstrapStateUpdateKind::IntrospectionSourceIndex(kind))
3994            }
3995            StateUpdateKind::ClusterReplica(kind) => {
3996                Ok(BootstrapStateUpdateKind::ClusterReplica(kind))
3997            }
3998            StateUpdateKind::SourceReferences(kind) => {
3999                Ok(BootstrapStateUpdateKind::SourceReferences(kind))
4000            }
4001            StateUpdateKind::SystemObjectMapping(kind) => {
4002                Ok(BootstrapStateUpdateKind::SystemObjectMapping(kind))
4003            }
4004            StateUpdateKind::TemporaryItem(kind) => Err(kind),
4005            StateUpdateKind::Item(kind) => Ok(BootstrapStateUpdateKind::Item(kind)),
4006            StateUpdateKind::Comment(kind) => Ok(BootstrapStateUpdateKind::Comment(kind)),
4007            StateUpdateKind::AuditLog(kind) => Ok(BootstrapStateUpdateKind::AuditLog(kind)),
4008            StateUpdateKind::StorageCollectionMetadata(kind) => {
4009                Ok(BootstrapStateUpdateKind::StorageCollectionMetadata(kind))
4010            }
4011            StateUpdateKind::UnfinalizedShard(kind) => {
4012                Ok(BootstrapStateUpdateKind::UnfinalizedShard(kind))
4013            }
4014        }
4015    }
4016}