mz_catalog/memory/
objects.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The current types used by the in-memory Catalog. Many of the objects in this module are
11//! extremely similar to the objects found in [`crate::durable::objects`] but in a format that is
12//! easier consumed by higher layers.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use chrono::{DateTime, Utc};
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_compute_client::logging::LogVariant;
24use mz_controller::clusters::{ClusterRole, ClusterStatus, ReplicaConfig, ReplicaLogging};
25use mz_controller_types::{ClusterId, ReplicaId};
26use mz_expr::{MirScalarExpr, OptimizedMirRelationExpr};
27use mz_ore::collections::CollectionExt;
28use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
29use mz_repr::network_policy_id::NetworkPolicyId;
30use mz_repr::optimize::OptimizerFeatureOverrides;
31use mz_repr::refresh_schedule::RefreshSchedule;
32use mz_repr::role_id::RoleId;
33use mz_repr::{
34    CatalogItemId, ColumnName, Diff, GlobalId, RelationDesc, RelationVersion,
35    RelationVersionSelector, SqlColumnType, Timestamp, VersionedRelationDesc,
36};
37use mz_sql::ast::display::AstDisplay;
38use mz_sql::ast::{
39    ColumnDef, ColumnOption, ColumnOptionDef, ColumnVersioned, Expr, Raw, RawDataType, Statement,
40    UnresolvedItemName, Value, WithOptionValue,
41};
42use mz_sql::catalog::{
43    CatalogClusterReplica, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem,
44    CatalogItemType as SqlCatalogItemType, CatalogItemType, CatalogSchema, CatalogType,
45    CatalogTypeDetails, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference,
46    RoleAttributes, RoleMembership, RoleVars, SystemObjectType,
47};
48use mz_sql::names::{
49    Aug, CommentObjectId, DatabaseId, DependencyIds, FullItemName, QualifiedItemName,
50    QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier,
51};
52use mz_sql::plan::{
53    ClusterSchedule, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, ConnectionDetails,
54    CreateClusterManagedPlan, CreateClusterPlan, CreateClusterVariant, CreateSourcePlan,
55    HirRelationExpr, NetworkPolicyRule, PlanError, WebhookBodyFormat, WebhookHeaders,
56    WebhookValidation,
57};
58use mz_sql::rbac;
59use mz_sql::session::vars::OwnedVarInput;
60use mz_storage_client::controller::IntrospectionType;
61use mz_storage_types::connections::inline::ReferencedConnection;
62use mz_storage_types::sinks::{SinkEnvelope, StorageSinkConnection};
63use mz_storage_types::sources::load_generator::LoadGenerator;
64use mz_storage_types::sources::{
65    GenericSourceConnection, SourceConnection, SourceDesc, SourceEnvelope, SourceExportDataConfig,
66    SourceExportDetails, Timeline,
67};
68use serde::ser::SerializeSeq;
69use serde::{Deserialize, Serialize};
70use timely::progress::Antichain;
71use tracing::debug;
72
73use crate::builtin::{MZ_CATALOG_SERVER_CLUSTER, MZ_SYSTEM_CLUSTER};
74use crate::durable;
75use crate::durable::objects::item_type;
76
77/// Used to update `self` from the input value while consuming the input value.
78pub trait UpdateFrom<T>: From<T> {
79    fn update_from(&mut self, from: T);
80}
81
82#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
83pub struct Database {
84    pub name: String,
85    pub id: DatabaseId,
86    pub oid: u32,
87    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
88    pub schemas_by_id: BTreeMap<SchemaId, Schema>,
89    pub schemas_by_name: BTreeMap<String, SchemaId>,
90    pub owner_id: RoleId,
91    pub privileges: PrivilegeMap,
92}
93
94impl From<Database> for durable::Database {
95    fn from(database: Database) -> durable::Database {
96        durable::Database {
97            id: database.id,
98            oid: database.oid,
99            name: database.name,
100            owner_id: database.owner_id,
101            privileges: database.privileges.into_all_values().collect(),
102        }
103    }
104}
105
106impl From<durable::Database> for Database {
107    fn from(
108        durable::Database {
109            id,
110            oid,
111            name,
112            owner_id,
113            privileges,
114        }: durable::Database,
115    ) -> Database {
116        Database {
117            id,
118            oid,
119            schemas_by_id: BTreeMap::new(),
120            schemas_by_name: BTreeMap::new(),
121            name,
122            owner_id,
123            privileges: PrivilegeMap::from_mz_acl_items(privileges),
124        }
125    }
126}
127
128impl UpdateFrom<durable::Database> for Database {
129    fn update_from(
130        &mut self,
131        durable::Database {
132            id,
133            oid,
134            name,
135            owner_id,
136            privileges,
137        }: durable::Database,
138    ) {
139        self.id = id;
140        self.oid = oid;
141        self.name = name;
142        self.owner_id = owner_id;
143        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
144    }
145}
146
147#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
148pub struct Schema {
149    pub name: QualifiedSchemaName,
150    pub id: SchemaSpecifier,
151    pub oid: u32,
152    pub items: BTreeMap<String, CatalogItemId>,
153    pub functions: BTreeMap<String, CatalogItemId>,
154    pub types: BTreeMap<String, CatalogItemId>,
155    pub owner_id: RoleId,
156    pub privileges: PrivilegeMap,
157}
158
159impl From<Schema> for durable::Schema {
160    fn from(schema: Schema) -> durable::Schema {
161        durable::Schema {
162            id: schema.id.into(),
163            oid: schema.oid,
164            name: schema.name.schema,
165            database_id: schema.name.database.id(),
166            owner_id: schema.owner_id,
167            privileges: schema.privileges.into_all_values().collect(),
168        }
169    }
170}
171
172impl From<durable::Schema> for Schema {
173    fn from(
174        durable::Schema {
175            id,
176            oid,
177            name,
178            database_id,
179            owner_id,
180            privileges,
181        }: durable::Schema,
182    ) -> Schema {
183        Schema {
184            name: QualifiedSchemaName {
185                database: database_id.into(),
186                schema: name,
187            },
188            id: id.into(),
189            oid,
190            items: BTreeMap::new(),
191            functions: BTreeMap::new(),
192            types: BTreeMap::new(),
193            owner_id,
194            privileges: PrivilegeMap::from_mz_acl_items(privileges),
195        }
196    }
197}
198
199impl UpdateFrom<durable::Schema> for Schema {
200    fn update_from(
201        &mut self,
202        durable::Schema {
203            id,
204            oid,
205            name,
206            database_id,
207            owner_id,
208            privileges,
209        }: durable::Schema,
210    ) {
211        self.name = QualifiedSchemaName {
212            database: database_id.into(),
213            schema: name,
214        };
215        self.id = id.into();
216        self.oid = oid;
217        self.owner_id = owner_id;
218        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
219    }
220}
221
222#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
223pub struct Role {
224    pub name: String,
225    pub id: RoleId,
226    pub oid: u32,
227    pub attributes: RoleAttributes,
228    pub membership: RoleMembership,
229    pub vars: RoleVars,
230}
231
232impl Role {
233    pub fn is_user(&self) -> bool {
234        self.id.is_user()
235    }
236
237    pub fn vars<'a>(&'a self) -> impl Iterator<Item = (&'a str, &'a OwnedVarInput)> {
238        self.vars.map.iter().map(|(name, val)| (name.as_str(), val))
239    }
240}
241
242impl From<Role> for durable::Role {
243    fn from(role: Role) -> durable::Role {
244        durable::Role {
245            id: role.id,
246            oid: role.oid,
247            name: role.name,
248            attributes: role.attributes,
249            membership: role.membership,
250            vars: role.vars,
251        }
252    }
253}
254
255impl From<durable::Role> for Role {
256    fn from(
257        durable::Role {
258            id,
259            oid,
260            name,
261            attributes,
262            membership,
263            vars,
264        }: durable::Role,
265    ) -> Self {
266        Role {
267            name,
268            id,
269            oid,
270            attributes,
271            membership,
272            vars,
273        }
274    }
275}
276
277impl UpdateFrom<durable::Role> for Role {
278    fn update_from(
279        &mut self,
280        durable::Role {
281            id,
282            oid,
283            name,
284            attributes,
285            membership,
286            vars,
287        }: durable::Role,
288    ) {
289        self.id = id;
290        self.oid = oid;
291        self.name = name;
292        self.attributes = attributes;
293        self.membership = membership;
294        self.vars = vars;
295    }
296}
297
298#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
299pub struct RoleAuth {
300    pub role_id: RoleId,
301    pub password_hash: Option<String>,
302    pub updated_at: u64,
303}
304
305impl From<RoleAuth> for durable::RoleAuth {
306    fn from(role_auth: RoleAuth) -> durable::RoleAuth {
307        durable::RoleAuth {
308            role_id: role_auth.role_id,
309            password_hash: role_auth.password_hash,
310            updated_at: role_auth.updated_at,
311        }
312    }
313}
314
315impl From<durable::RoleAuth> for RoleAuth {
316    fn from(
317        durable::RoleAuth {
318            role_id,
319            password_hash,
320            updated_at,
321        }: durable::RoleAuth,
322    ) -> RoleAuth {
323        RoleAuth {
324            role_id,
325            password_hash,
326            updated_at,
327        }
328    }
329}
330
331impl UpdateFrom<durable::RoleAuth> for RoleAuth {
332    fn update_from(&mut self, from: durable::RoleAuth) {
333        self.role_id = from.role_id;
334        self.password_hash = from.password_hash;
335    }
336}
337
338#[derive(Debug, Serialize, Clone, PartialEq)]
339pub struct Cluster {
340    pub name: String,
341    pub id: ClusterId,
342    pub config: ClusterConfig,
343    #[serde(skip)]
344    pub log_indexes: BTreeMap<LogVariant, GlobalId>,
345    /// Objects bound to this cluster. Does not include introspection source
346    /// indexes.
347    pub bound_objects: BTreeSet<CatalogItemId>,
348    pub replica_id_by_name_: BTreeMap<String, ReplicaId>,
349    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
350    pub replicas_by_id_: BTreeMap<ReplicaId, ClusterReplica>,
351    pub owner_id: RoleId,
352    pub privileges: PrivilegeMap,
353}
354
355impl Cluster {
356    /// The role of the cluster. Currently used to set alert severity.
357    pub fn role(&self) -> ClusterRole {
358        // NOTE - These roles power monitoring systems. Do not change
359        // them without talking to the cloud or observability groups.
360        if self.name == MZ_SYSTEM_CLUSTER.name {
361            ClusterRole::SystemCritical
362        } else if self.name == MZ_CATALOG_SERVER_CLUSTER.name {
363            ClusterRole::System
364        } else {
365            ClusterRole::User
366        }
367    }
368
369    /// Returns `true` if the cluster is a managed cluster.
370    pub fn is_managed(&self) -> bool {
371        matches!(self.config.variant, ClusterVariant::Managed { .. })
372    }
373
374    /// Lists the user replicas, which are those that do not have the internal flag set.
375    pub fn user_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
376        self.replicas().filter(|r| !r.config.location.internal())
377    }
378
379    /// Lists all replicas in the cluster
380    pub fn replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
381        self.replicas_by_id_.values()
382    }
383
384    /// Lookup a replica by ID.
385    pub fn replica(&self, replica_id: ReplicaId) -> Option<&ClusterReplica> {
386        self.replicas_by_id_.get(&replica_id)
387    }
388
389    /// Lookup a replica ID by name.
390    pub fn replica_id(&self, name: &str) -> Option<ReplicaId> {
391        self.replica_id_by_name_.get(name).copied()
392    }
393
394    /// Returns the availability zones of this cluster, if they exist.
395    pub fn availability_zones(&self) -> Option<&[String]> {
396        match &self.config.variant {
397            ClusterVariant::Managed(managed) => Some(&managed.availability_zones),
398            ClusterVariant::Unmanaged => None,
399        }
400    }
401
402    pub fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
403        let name = self.name.clone();
404        let variant = match &self.config.variant {
405            ClusterVariant::Managed(ClusterVariantManaged {
406                size,
407                availability_zones,
408                logging,
409                replication_factor,
410                optimizer_feature_overrides,
411                schedule,
412            }) => {
413                let introspection = match logging {
414                    ReplicaLogging {
415                        log_logging,
416                        interval: Some(interval),
417                    } => Some(ComputeReplicaIntrospectionConfig {
418                        debugging: *log_logging,
419                        interval: interval.clone(),
420                    }),
421                    ReplicaLogging {
422                        log_logging: _,
423                        interval: None,
424                    } => None,
425                };
426                let compute = ComputeReplicaConfig { introspection };
427                CreateClusterVariant::Managed(CreateClusterManagedPlan {
428                    replication_factor: replication_factor.clone(),
429                    size: size.clone(),
430                    availability_zones: availability_zones.clone(),
431                    compute,
432                    optimizer_feature_overrides: optimizer_feature_overrides.clone(),
433                    schedule: schedule.clone(),
434                })
435            }
436            ClusterVariant::Unmanaged => {
437                // Unmanaged clusters are deprecated, so hopefully we can remove
438                // them before we have to implement this.
439                return Err(PlanError::Unsupported {
440                    feature: "SHOW CREATE for unmanaged clusters".to_string(),
441                    discussion_no: None,
442                });
443            }
444        };
445        let workload_class = self.config.workload_class.clone();
446        Ok(CreateClusterPlan {
447            name,
448            variant,
449            workload_class,
450        })
451    }
452}
453
454impl From<Cluster> for durable::Cluster {
455    fn from(cluster: Cluster) -> durable::Cluster {
456        durable::Cluster {
457            id: cluster.id,
458            name: cluster.name,
459            owner_id: cluster.owner_id,
460            privileges: cluster.privileges.into_all_values().collect(),
461            config: cluster.config.into(),
462        }
463    }
464}
465
466impl From<durable::Cluster> for Cluster {
467    fn from(
468        durable::Cluster {
469            id,
470            name,
471            owner_id,
472            privileges,
473            config,
474        }: durable::Cluster,
475    ) -> Self {
476        Cluster {
477            name: name.clone(),
478            id,
479            bound_objects: BTreeSet::new(),
480            log_indexes: BTreeMap::new(),
481            replica_id_by_name_: BTreeMap::new(),
482            replicas_by_id_: BTreeMap::new(),
483            owner_id,
484            privileges: PrivilegeMap::from_mz_acl_items(privileges),
485            config: config.into(),
486        }
487    }
488}
489
490impl UpdateFrom<durable::Cluster> for Cluster {
491    fn update_from(
492        &mut self,
493        durable::Cluster {
494            id,
495            name,
496            owner_id,
497            privileges,
498            config,
499        }: durable::Cluster,
500    ) {
501        self.id = id;
502        self.name = name;
503        self.owner_id = owner_id;
504        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
505        self.config = config.into();
506    }
507}
508
509#[derive(Debug, Serialize, Clone, PartialEq)]
510pub struct ClusterReplica {
511    pub name: String,
512    pub cluster_id: ClusterId,
513    pub replica_id: ReplicaId,
514    pub config: ReplicaConfig,
515    pub owner_id: RoleId,
516}
517
518impl From<ClusterReplica> for durable::ClusterReplica {
519    fn from(replica: ClusterReplica) -> durable::ClusterReplica {
520        durable::ClusterReplica {
521            cluster_id: replica.cluster_id,
522            replica_id: replica.replica_id,
523            name: replica.name,
524            config: replica.config.into(),
525            owner_id: replica.owner_id,
526        }
527    }
528}
529
530#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
531pub struct ClusterReplicaProcessStatus {
532    pub status: ClusterStatus,
533    pub time: DateTime<Utc>,
534}
535
536#[derive(Debug, Serialize, Clone, PartialEq)]
537pub struct SourceReferences {
538    pub updated_at: u64,
539    pub references: Vec<SourceReference>,
540}
541
542#[derive(Debug, Serialize, Clone, PartialEq)]
543pub struct SourceReference {
544    pub name: String,
545    pub namespace: Option<String>,
546    pub columns: Vec<String>,
547}
548
549impl From<SourceReference> for durable::SourceReference {
550    fn from(source_reference: SourceReference) -> durable::SourceReference {
551        durable::SourceReference {
552            name: source_reference.name,
553            namespace: source_reference.namespace,
554            columns: source_reference.columns,
555        }
556    }
557}
558
559impl SourceReferences {
560    pub fn to_durable(self, source_id: CatalogItemId) -> durable::SourceReferences {
561        durable::SourceReferences {
562            source_id,
563            updated_at: self.updated_at,
564            references: self.references.into_iter().map(Into::into).collect(),
565        }
566    }
567}
568
569impl From<durable::SourceReference> for SourceReference {
570    fn from(source_reference: durable::SourceReference) -> SourceReference {
571        SourceReference {
572            name: source_reference.name,
573            namespace: source_reference.namespace,
574            columns: source_reference.columns,
575        }
576    }
577}
578
579impl From<durable::SourceReferences> for SourceReferences {
580    fn from(source_references: durable::SourceReferences) -> SourceReferences {
581        SourceReferences {
582            updated_at: source_references.updated_at,
583            references: source_references
584                .references
585                .into_iter()
586                .map(|source_reference| source_reference.into())
587                .collect(),
588        }
589    }
590}
591
592impl From<mz_sql::plan::SourceReference> for SourceReference {
593    fn from(source_reference: mz_sql::plan::SourceReference) -> SourceReference {
594        SourceReference {
595            name: source_reference.name,
596            namespace: source_reference.namespace,
597            columns: source_reference.columns,
598        }
599    }
600}
601
602impl From<mz_sql::plan::SourceReferences> for SourceReferences {
603    fn from(source_references: mz_sql::plan::SourceReferences) -> SourceReferences {
604        SourceReferences {
605            updated_at: source_references.updated_at,
606            references: source_references
607                .references
608                .into_iter()
609                .map(|source_reference| source_reference.into())
610                .collect(),
611        }
612    }
613}
614
615impl From<SourceReferences> for mz_sql::plan::SourceReferences {
616    fn from(source_references: SourceReferences) -> mz_sql::plan::SourceReferences {
617        mz_sql::plan::SourceReferences {
618            updated_at: source_references.updated_at,
619            references: source_references
620                .references
621                .into_iter()
622                .map(|source_reference| source_reference.into())
623                .collect(),
624        }
625    }
626}
627
628impl From<SourceReference> for mz_sql::plan::SourceReference {
629    fn from(source_reference: SourceReference) -> mz_sql::plan::SourceReference {
630        mz_sql::plan::SourceReference {
631            name: source_reference.name,
632            namespace: source_reference.namespace,
633            columns: source_reference.columns,
634        }
635    }
636}
637
638#[derive(Clone, Debug, Serialize)]
639pub struct CatalogEntry {
640    pub item: CatalogItem,
641    #[serde(skip)]
642    pub referenced_by: Vec<CatalogItemId>,
643    // TODO(database-issues#7922)––this should have an invariant tied to it that all
644    // dependents (i.e. entries in this field) have IDs greater than this
645    // entry's ID.
646    #[serde(skip)]
647    pub used_by: Vec<CatalogItemId>,
648    pub id: CatalogItemId,
649    pub oid: u32,
650    pub name: QualifiedItemName,
651    pub owner_id: RoleId,
652    pub privileges: PrivilegeMap,
653}
654
655/// A [`CatalogEntry`] that is associated with a specific "collection" of data.
656/// A single item in the catalog may be associated with multiple "collections".
657///
658/// Here "collection" generally means a pTVC, e.g. a Persist Shard, an Index, a
659/// currently running dataflow, etc.
660///
661/// Items in the Catalog have a stable name -> ID mapping, in other words for
662/// the entire lifetime of an object its [`CatalogItemId`] will _never_ change.
663/// Similarly, we need to maintain a stable mapping from [`GlobalId`] to pTVC.
664/// This presents a challenge when `ALTER`-ing an object, e.g. adding columns
665/// to a table. We can't just change the schema of the underlying Persist Shard
666/// because that would be rebinding the [`GlobalId`] of the pTVC. Instead we
667/// allocate a new [`GlobalId`] to refer to the new version of the table, and
668/// then the [`CatalogEntry`] tracks the [`GlobalId`] for each version.
669///
670/// TODO(ct): Add a note here if we end up using this for associating continual
671/// tasks with a single catalog item.
672#[derive(Clone, Debug)]
673pub struct CatalogCollectionEntry {
674    pub entry: CatalogEntry,
675    pub version: RelationVersionSelector,
676}
677
678impl CatalogCollectionEntry {
679    pub fn relation_desc(&self) -> Option<Cow<'_, RelationDesc>> {
680        self.item().relation_desc(self.version)
681    }
682}
683
684impl mz_sql::catalog::CatalogCollectionItem for CatalogCollectionEntry {
685    fn relation_desc(&self) -> Option<Cow<'_, RelationDesc>> {
686        self.item().relation_desc(self.version)
687    }
688
689    fn global_id(&self) -> GlobalId {
690        self.entry
691            .item()
692            .global_id_for_version(self.version)
693            .expect("catalog corruption, missing version!")
694    }
695}
696
697impl Deref for CatalogCollectionEntry {
698    type Target = CatalogEntry;
699
700    fn deref(&self) -> &CatalogEntry {
701        &self.entry
702    }
703}
704
705impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry {
706    fn name(&self) -> &QualifiedItemName {
707        self.entry.name()
708    }
709
710    fn id(&self) -> CatalogItemId {
711        self.entry.id()
712    }
713
714    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
715        Box::new(self.entry.global_ids())
716    }
717
718    fn oid(&self) -> u32 {
719        self.entry.oid()
720    }
721
722    fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
723        self.entry.func()
724    }
725
726    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
727        self.entry.source_desc()
728    }
729
730    fn connection(
731        &self,
732    ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
733    {
734        mz_sql::catalog::CatalogItem::connection(&self.entry)
735    }
736
737    fn create_sql(&self) -> &str {
738        self.entry.create_sql()
739    }
740
741    fn item_type(&self) -> SqlCatalogItemType {
742        self.entry.item_type()
743    }
744
745    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
746        self.entry.index_details()
747    }
748
749    fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
750        self.entry.writable_table_details()
751    }
752
753    fn replacement_target(&self) -> Option<CatalogItemId> {
754        self.entry.replacement_target()
755    }
756
757    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
758        self.entry.type_details()
759    }
760
761    fn references(&self) -> &ResolvedIds {
762        self.entry.references()
763    }
764
765    fn uses(&self) -> BTreeSet<CatalogItemId> {
766        self.entry.uses()
767    }
768
769    fn referenced_by(&self) -> &[CatalogItemId] {
770        self.entry.referenced_by()
771    }
772
773    fn used_by(&self) -> &[CatalogItemId] {
774        self.entry.used_by()
775    }
776
777    fn subsource_details(
778        &self,
779    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
780        self.entry.subsource_details()
781    }
782
783    fn source_export_details(
784        &self,
785    ) -> Option<(
786        CatalogItemId,
787        &UnresolvedItemName,
788        &SourceExportDetails,
789        &SourceExportDataConfig<ReferencedConnection>,
790    )> {
791        self.entry.source_export_details()
792    }
793
794    fn is_progress_source(&self) -> bool {
795        self.entry.is_progress_source()
796    }
797
798    fn progress_id(&self) -> Option<CatalogItemId> {
799        self.entry.progress_id()
800    }
801
802    fn owner_id(&self) -> RoleId {
803        *self.entry.owner_id()
804    }
805
806    fn privileges(&self) -> &PrivilegeMap {
807        self.entry.privileges()
808    }
809
810    fn cluster_id(&self) -> Option<ClusterId> {
811        self.entry.item().cluster_id()
812    }
813
814    fn at_version(
815        &self,
816        version: RelationVersionSelector,
817    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
818        Box::new(CatalogCollectionEntry {
819            entry: self.entry.clone(),
820            version,
821        })
822    }
823
824    fn latest_version(&self) -> Option<RelationVersion> {
825        self.entry.latest_version()
826    }
827}
828
829#[derive(Debug, Clone, Serialize)]
830pub enum CatalogItem {
831    Table(Table),
832    Source(Source),
833    Log(Log),
834    View(View),
835    MaterializedView(MaterializedView),
836    Sink(Sink),
837    Index(Index),
838    Type(Type),
839    Func(Func),
840    Secret(Secret),
841    Connection(Connection),
842    ContinualTask(ContinualTask),
843}
844
845impl From<CatalogEntry> for durable::Item {
846    fn from(entry: CatalogEntry) -> durable::Item {
847        let (create_sql, global_id, extra_versions) = entry.item.into_serialized();
848        durable::Item {
849            id: entry.id,
850            oid: entry.oid,
851            global_id,
852            schema_id: entry.name.qualifiers.schema_spec.into(),
853            name: entry.name.item,
854            create_sql,
855            owner_id: entry.owner_id,
856            privileges: entry.privileges.into_all_values().collect(),
857            extra_versions,
858        }
859    }
860}
861
862#[derive(Debug, Clone, Serialize)]
863pub struct Table {
864    /// Parse-able SQL that defines this table.
865    pub create_sql: Option<String>,
866    /// [`VersionedRelationDesc`] of this table, derived from the `create_sql`.
867    pub desc: VersionedRelationDesc,
868    /// Versions of this table, and the [`GlobalId`]s that refer to them.
869    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
870    pub collections: BTreeMap<RelationVersion, GlobalId>,
871    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
872    #[serde(skip)]
873    pub conn_id: Option<ConnectionId>,
874    /// Other catalog objects referenced by this table, e.g. custom types.
875    pub resolved_ids: ResolvedIds,
876    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
877    pub custom_logical_compaction_window: Option<CompactionWindow>,
878    /// Whether the table's logical compaction window is controlled by the ['metrics_retention']
879    /// session variable.
880    ///
881    /// ['metrics_retention']: mz_sql::session::vars::METRICS_RETENTION
882    pub is_retained_metrics_object: bool,
883    /// Where data for this table comes from, e.g. `INSERT` statements or an upstream source.
884    pub data_source: TableDataSource,
885}
886
887impl Table {
888    pub fn timeline(&self) -> Timeline {
889        match &self.data_source {
890            // The Coordinator controls insertions for writable tables
891            // (including system tables), so they are realtime.
892            TableDataSource::TableWrites { .. } => Timeline::EpochMilliseconds,
893            TableDataSource::DataSource { timeline, .. } => timeline.clone(),
894        }
895    }
896
897    /// Returns all of the [`GlobalId`]s that this [`Table`] can be referenced by.
898    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
899        self.collections.values().copied()
900    }
901
902    /// Returns the latest [`GlobalId`] for this [`Table`] which should be used for writes.
903    pub fn global_id_writes(&self) -> GlobalId {
904        *self
905            .collections
906            .last_key_value()
907            .expect("at least one version of a table")
908            .1
909    }
910
911    /// Returns all of the collections and their [`RelationDesc`]s associated with this [`Table`].
912    pub fn collection_descs(
913        &self,
914    ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
915        self.collections.iter().map(|(version, gid)| {
916            let desc = self
917                .desc
918                .at_version(RelationVersionSelector::Specific(*version));
919            (*gid, *version, desc)
920        })
921    }
922
923    /// Returns the [`RelationDesc`] for a specific [`GlobalId`].
924    pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
925        let (version, _gid) = self
926            .collections
927            .iter()
928            .find(|(_version, gid)| *gid == id)
929            .expect("GlobalId to exist");
930        self.desc
931            .at_version(RelationVersionSelector::Specific(*version))
932    }
933}
934
935#[derive(Clone, Debug, Serialize)]
936pub enum TableDataSource {
937    /// The table owns data created via INSERT/UPDATE/DELETE statements.
938    TableWrites {
939        #[serde(skip)]
940        defaults: Vec<Expr<Aug>>,
941    },
942
943    /// The table receives its data from the identified `DataSourceDesc`.
944    /// This table type does not support INSERT/UPDATE/DELETE statements.
945    DataSource {
946        desc: DataSourceDesc,
947        timeline: Timeline,
948    },
949}
950
951#[derive(Debug, Clone, Serialize)]
952pub enum DataSourceDesc {
953    /// Receives data from an external system
954    Ingestion {
955        desc: SourceDesc<ReferencedConnection>,
956        cluster_id: ClusterId,
957    },
958    /// Receives data from an external system
959    OldSyntaxIngestion {
960        desc: SourceDesc<ReferencedConnection>,
961        cluster_id: ClusterId,
962        // If we're dealing with an old syntax ingestion the progress id will be some other collection
963        // and the ingestion itself will have the data from an external reference
964        progress_subsource: CatalogItemId,
965        data_config: SourceExportDataConfig<ReferencedConnection>,
966        details: SourceExportDetails,
967    },
968    /// This source receives its data from the identified ingestion,
969    /// specifically the output identified by `external_reference`.
970    /// N.B. that `external_reference` should not be used to identify
971    /// anything downstream of purification, as the purification process
972    /// encodes source-specific identifiers into the `details` struct.
973    /// The `external_reference` field is only used here for displaying
974    /// human-readable names in system tables.
975    IngestionExport {
976        ingestion_id: CatalogItemId,
977        external_reference: UnresolvedItemName,
978        details: SourceExportDetails,
979        data_config: SourceExportDataConfig<ReferencedConnection>,
980    },
981    /// Receives introspection data from an internal system
982    Introspection(IntrospectionType),
983    /// Receives data from the source's reclocking/remapping operations.
984    Progress,
985    /// Receives data from HTTP requests.
986    Webhook {
987        /// Optional components used to validation a webhook request.
988        validate_using: Option<WebhookValidation>,
989        /// Describes how we deserialize the body of a webhook request.
990        body_format: WebhookBodyFormat,
991        /// Describes whether or not to include headers and how to map them.
992        headers: WebhookHeaders,
993        /// The cluster which this source is associated with.
994        cluster_id: ClusterId,
995    },
996}
997
998impl DataSourceDesc {
999    /// The key and value formats of the data source.
1000    pub fn formats(&self) -> (Option<&str>, Option<&str>) {
1001        match &self {
1002            DataSourceDesc::Ingestion { .. } => (None, None),
1003            DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1004                match &data_config.encoding.as_ref() {
1005                    Some(encoding) => match &encoding.key {
1006                        Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1007                        None => (None, Some(encoding.value.type_())),
1008                    },
1009                    None => (None, None),
1010                }
1011            }
1012            DataSourceDesc::IngestionExport { data_config, .. } => match &data_config.encoding {
1013                Some(encoding) => match &encoding.key {
1014                    Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1015                    None => (None, Some(encoding.value.type_())),
1016                },
1017                None => (None, None),
1018            },
1019            DataSourceDesc::Introspection(_)
1020            | DataSourceDesc::Webhook { .. }
1021            | DataSourceDesc::Progress => (None, None),
1022        }
1023    }
1024
1025    /// Envelope of the data source.
1026    pub fn envelope(&self) -> Option<&str> {
1027        // Note how "none"/"append-only" is different from `None`. Source
1028        // sources don't have an envelope (internal logs, for example), while
1029        // other sources have an envelope that we call the "NONE"-envelope.
1030
1031        fn envelope_string(envelope: &SourceEnvelope) -> &str {
1032            match envelope {
1033                SourceEnvelope::None(_) => "none",
1034                SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
1035                    mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert",
1036                    mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
1037                        // NOTE(aljoscha): Should we somehow mark that this is
1038                        // using upsert internally? See note above about
1039                        // DEBEZIUM.
1040                        "debezium"
1041                    }
1042                    mz_storage_types::sources::envelope::UpsertStyle::ValueErrInline { .. } => {
1043                        "upsert-value-err-inline"
1044                    }
1045                },
1046                SourceEnvelope::CdcV2 => {
1047                    // TODO(aljoscha): Should we even report this? It's
1048                    // currently not exposed.
1049                    "materialize"
1050                }
1051            }
1052        }
1053
1054        match self {
1055            // NOTE(aljoscha): We could move the block for ingestions into
1056            // `SourceEnvelope` itself, but that one feels more like an internal
1057            // thing and adapter should own how we represent envelopes as a
1058            // string? It would not be hard to convince me otherwise, though.
1059            DataSourceDesc::Ingestion { .. } => None,
1060            DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1061                Some(envelope_string(&data_config.envelope))
1062            }
1063            DataSourceDesc::IngestionExport { data_config, .. } => {
1064                Some(envelope_string(&data_config.envelope))
1065            }
1066            DataSourceDesc::Introspection(_)
1067            | DataSourceDesc::Webhook { .. }
1068            | DataSourceDesc::Progress => None,
1069        }
1070    }
1071}
1072
1073#[derive(Debug, Clone, Serialize)]
1074pub struct Source {
1075    /// Parse-able SQL that defines this table.
1076    pub create_sql: Option<String>,
1077    /// [`GlobalId`] used to reference this source from outside the catalog.
1078    pub global_id: GlobalId,
1079    // TODO: Unskip: currently blocked on some inner BTreeMap<X, _> problems.
1080    #[serde(skip)]
1081    pub data_source: DataSourceDesc,
1082    /// [`RelationDesc`] of this source, derived from the `create_sql`.
1083    pub desc: RelationDesc,
1084    /// The timeline this source exists on.
1085    pub timeline: Timeline,
1086    /// Other catalog objects referenced by this table, e.g. custom types.
1087    pub resolved_ids: ResolvedIds,
1088    /// This value is ignored for subsources, i.e. for
1089    /// [`DataSourceDesc::IngestionExport`]. Instead, it uses the primary
1090    /// sources logical compaction window.
1091    pub custom_logical_compaction_window: Option<CompactionWindow>,
1092    /// Whether the source's logical compaction window is controlled by
1093    /// METRICS_RETENTION
1094    pub is_retained_metrics_object: bool,
1095}
1096
1097impl Source {
1098    /// Creates a new `Source`.
1099    ///
1100    /// # Panics
1101    /// - If an ingestion-based plan is not given a cluster_id.
1102    /// - If a non-ingestion-based source has a defined cluster config in its plan.
1103    /// - If a non-ingestion-based source is given a cluster_id.
1104    pub fn new(
1105        plan: CreateSourcePlan,
1106        global_id: GlobalId,
1107        resolved_ids: ResolvedIds,
1108        custom_logical_compaction_window: Option<CompactionWindow>,
1109        is_retained_metrics_object: bool,
1110    ) -> Source {
1111        Source {
1112            create_sql: Some(plan.source.create_sql),
1113            data_source: match plan.source.data_source {
1114                mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1115                    desc,
1116                    cluster_id: plan
1117                        .in_cluster
1118                        .expect("ingestion-based sources must be given a cluster ID"),
1119                },
1120                mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1121                    desc,
1122                    progress_subsource,
1123                    data_config,
1124                    details,
1125                } => DataSourceDesc::OldSyntaxIngestion {
1126                    desc,
1127                    cluster_id: plan
1128                        .in_cluster
1129                        .expect("ingestion-based sources must be given a cluster ID"),
1130                    progress_subsource,
1131                    data_config,
1132                    details,
1133                },
1134                mz_sql::plan::DataSourceDesc::Progress => {
1135                    assert!(
1136                        plan.in_cluster.is_none(),
1137                        "subsources must not have a host config or cluster_id defined"
1138                    );
1139                    DataSourceDesc::Progress
1140                }
1141                mz_sql::plan::DataSourceDesc::IngestionExport {
1142                    ingestion_id,
1143                    external_reference,
1144                    details,
1145                    data_config,
1146                } => {
1147                    assert!(
1148                        plan.in_cluster.is_none(),
1149                        "subsources must not have a host config or cluster_id defined"
1150                    );
1151                    DataSourceDesc::IngestionExport {
1152                        ingestion_id,
1153                        external_reference,
1154                        details,
1155                        data_config,
1156                    }
1157                }
1158                mz_sql::plan::DataSourceDesc::Webhook {
1159                    validate_using,
1160                    body_format,
1161                    headers,
1162                    cluster_id,
1163                } => {
1164                    mz_ore::soft_assert_or_log!(
1165                        cluster_id.is_none(),
1166                        "cluster_id set at Source level for Webhooks"
1167                    );
1168                    DataSourceDesc::Webhook {
1169                        validate_using,
1170                        body_format,
1171                        headers,
1172                        cluster_id: plan
1173                            .in_cluster
1174                            .expect("webhook sources must be given a cluster ID"),
1175                    }
1176                }
1177            },
1178            desc: plan.source.desc,
1179            global_id,
1180            timeline: plan.timeline,
1181            resolved_ids,
1182            custom_logical_compaction_window: plan
1183                .source
1184                .compaction_window
1185                .or(custom_logical_compaction_window),
1186            is_retained_metrics_object,
1187        }
1188    }
1189
1190    /// Type of the source.
1191    pub fn source_type(&self) -> &str {
1192        match &self.data_source {
1193            DataSourceDesc::Ingestion { desc, .. }
1194            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.name(),
1195            DataSourceDesc::Progress => "progress",
1196            DataSourceDesc::IngestionExport { .. } => "subsource",
1197            DataSourceDesc::Introspection(_) => "source",
1198            DataSourceDesc::Webhook { .. } => "webhook",
1199        }
1200    }
1201
1202    /// Connection ID of the source, if one exists.
1203    pub fn connection_id(&self) -> Option<CatalogItemId> {
1204        match &self.data_source {
1205            DataSourceDesc::Ingestion { desc, .. }
1206            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.connection_id(),
1207            DataSourceDesc::IngestionExport { .. }
1208            | DataSourceDesc::Introspection(_)
1209            | DataSourceDesc::Webhook { .. }
1210            | DataSourceDesc::Progress => None,
1211        }
1212    }
1213
1214    /// The single [`GlobalId`] that refers to this Source.
1215    pub fn global_id(&self) -> GlobalId {
1216        self.global_id
1217    }
1218
1219    /// The expensive resource that each source consumes is persist shards. To
1220    /// prevent abuse, we want to prevent users from creating sources that use an
1221    /// unbounded number of persist shards. But we also don't want to count
1222    /// persist shards that are mandated by teh system (e.g., the progress
1223    /// shard) so that future versions of Materialize can introduce additional
1224    /// per-source shards (e.g., a per-source status shard) without impacting
1225    /// the limit calculation.
1226    pub fn user_controllable_persist_shard_count(&self) -> i64 {
1227        match &self.data_source {
1228            DataSourceDesc::Ingestion { .. } => 0,
1229            DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1230                match &desc.connection {
1231                    // These multi-output sources do not use their primary
1232                    // source's data shard, so we don't include it in accounting
1233                    // for users.
1234                    GenericSourceConnection::Postgres(_)
1235                    | GenericSourceConnection::MySql(_)
1236                    | GenericSourceConnection::SqlServer(_) => 0,
1237                    GenericSourceConnection::LoadGenerator(lg) => match lg.load_generator {
1238                        // Load generators that output data in their primary shard
1239                        LoadGenerator::Clock
1240                        | LoadGenerator::Counter { .. }
1241                        | LoadGenerator::Datums
1242                        | LoadGenerator::KeyValue(_) => 1,
1243                        LoadGenerator::Auction
1244                        | LoadGenerator::Marketing
1245                        | LoadGenerator::Tpch { .. } => 0,
1246                    },
1247                    GenericSourceConnection::Kafka(_) => 1,
1248                }
1249            }
1250            //  DataSourceDesc::IngestionExport represents a subsource, which
1251            //  use a data shard.
1252            DataSourceDesc::IngestionExport { .. } => 1,
1253            DataSourceDesc::Webhook { .. } => 1,
1254            // Introspection and progress subsources are not under the user's control, so shouldn't
1255            // count toward their quota.
1256            DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => 0,
1257        }
1258    }
1259}
1260
1261#[derive(Debug, Clone, Serialize)]
1262pub struct Log {
1263    /// The category of data this log stores.
1264    pub variant: LogVariant,
1265    /// [`GlobalId`] used to reference this log from outside the catalog.
1266    pub global_id: GlobalId,
1267}
1268
1269impl Log {
1270    /// The single [`GlobalId`] that refers to this Log.
1271    pub fn global_id(&self) -> GlobalId {
1272        self.global_id
1273    }
1274}
1275
1276#[derive(Debug, Clone, Serialize)]
1277pub struct Sink {
1278    /// Parse-able SQL that defines this sink.
1279    pub create_sql: String,
1280    /// [`GlobalId`] used to reference this sink from outside the catalog, e.g storage.
1281    pub global_id: GlobalId,
1282    /// Collection we read into this sink.
1283    pub from: GlobalId,
1284    /// Connection to the external service we're sinking into, e.g. Kafka.
1285    pub connection: StorageSinkConnection<ReferencedConnection>,
1286    /// Envelope we use to sink into the external system.
1287    ///
1288    /// TODO(guswynn): this probably should just be in the `connection`.
1289    pub envelope: SinkEnvelope,
1290    /// Emit an initial snapshot into the sink.
1291    pub with_snapshot: bool,
1292    /// Used to fence other writes into this sink as we evolve the upstream materialized view.
1293    pub version: u64,
1294    /// Other catalog objects this sink references.
1295    pub resolved_ids: ResolvedIds,
1296    /// Cluster this sink runs on.
1297    pub cluster_id: ClusterId,
1298    /// Commit interval for the sink.
1299    pub commit_interval: Option<Duration>,
1300}
1301
1302impl Sink {
1303    pub fn sink_type(&self) -> &str {
1304        self.connection.name()
1305    }
1306
1307    /// Envelope of the sink.
1308    pub fn envelope(&self) -> Option<&str> {
1309        match &self.envelope {
1310            SinkEnvelope::Debezium => Some("debezium"),
1311            SinkEnvelope::Upsert => Some("upsert"),
1312        }
1313    }
1314
1315    /// Output a combined format string of the sink. For legacy reasons
1316    /// if the key-format is none or the key & value formats are
1317    /// both the same (either avro or json), we return the value format name,
1318    /// otherwise we return a composite name.
1319    pub fn combined_format(&self) -> Option<Cow<'_, str>> {
1320        match &self.connection {
1321            StorageSinkConnection::Kafka(connection) => Some(connection.format.get_format_name()),
1322            _ => None,
1323        }
1324    }
1325
1326    /// Output distinct key_format and value_format of the sink.
1327    pub fn formats(&self) -> Option<(Option<&str>, &str)> {
1328        match &self.connection {
1329            StorageSinkConnection::Kafka(connection) => {
1330                let key_format = connection
1331                    .format
1332                    .key_format
1333                    .as_ref()
1334                    .map(|f| f.get_format_name());
1335                let value_format = connection.format.value_format.get_format_name();
1336                Some((key_format, value_format))
1337            }
1338            _ => None,
1339        }
1340    }
1341
1342    pub fn connection_id(&self) -> Option<CatalogItemId> {
1343        self.connection.connection_id()
1344    }
1345
1346    /// The single [`GlobalId`] that this Sink can be referenced by.
1347    pub fn global_id(&self) -> GlobalId {
1348        self.global_id
1349    }
1350}
1351
1352#[derive(Debug, Clone, Serialize)]
1353pub struct View {
1354    /// Parse-able SQL that defines this view.
1355    pub create_sql: String,
1356    /// [`GlobalId`] used to reference this view from outside the catalog, e.g. compute.
1357    pub global_id: GlobalId,
1358    /// Unoptimized high-level expression from parsing the `create_sql`.
1359    pub raw_expr: Arc<HirRelationExpr>,
1360    /// Optimized mid-level expression from (locally) optimizing the `raw_expr`.
1361    pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1362    /// Columns of this view.
1363    pub desc: RelationDesc,
1364    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
1365    pub conn_id: Option<ConnectionId>,
1366    /// Other catalog objects that are referenced by this view, determined at name resolution.
1367    pub resolved_ids: ResolvedIds,
1368    /// All of the catalog objects that are referenced by this view.
1369    pub dependencies: DependencyIds,
1370}
1371
1372impl View {
1373    /// The single [`GlobalId`] this [`View`] can be referenced by.
1374    pub fn global_id(&self) -> GlobalId {
1375        self.global_id
1376    }
1377}
1378
1379#[derive(Debug, Clone, Serialize)]
1380pub struct MaterializedView {
1381    /// Parse-able SQL that defines this materialized view.
1382    pub create_sql: String,
1383    /// Versions of this materialized view, and the [`GlobalId`]s that refer to them.
1384    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
1385    pub collections: BTreeMap<RelationVersion, GlobalId>,
1386    /// Raw high-level expression from planning, derived from the `create_sql`.
1387    pub raw_expr: Arc<HirRelationExpr>,
1388    /// Optimized mid-level expression, derived from the `raw_expr`.
1389    pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1390    /// [`VersionedRelationDesc`] of this materialized view, derived from the `create_sql`.
1391    pub desc: VersionedRelationDesc,
1392    /// Other catalog items that this materialized view references, determined at name resolution.
1393    pub resolved_ids: ResolvedIds,
1394    /// All of the catalog objects that are referenced by this view.
1395    pub dependencies: DependencyIds,
1396    /// ID of the materialized view this materialized view is intended to replace.
1397    pub replacement_target: Option<CatalogItemId>,
1398    /// Cluster that this materialized view runs on.
1399    pub cluster_id: ClusterId,
1400    /// Column indexes that we assert are not `NULL`.
1401    ///
1402    /// TODO(parkmycar): Switch this to use the `ColumnIdx` type.
1403    pub non_null_assertions: Vec<usize>,
1404    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
1405    pub custom_logical_compaction_window: Option<CompactionWindow>,
1406    /// Schedule to refresh this materialized view, e.g. set via `REFRESH EVERY` option.
1407    pub refresh_schedule: Option<RefreshSchedule>,
1408    /// The initial `as_of` of the storage collection associated with the materialized view.
1409    ///
1410    /// Note: This doesn't change upon restarts.
1411    /// (The dataflow's initial `as_of` can be different.)
1412    pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1413}
1414
1415impl MaterializedView {
1416    /// Returns all [`GlobalId`]s that this [`MaterializedView`] can be referenced by.
1417    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1418        self.collections.values().copied()
1419    }
1420
1421    /// The latest [`GlobalId`] for this [`MaterializedView`] which represents the writing
1422    /// version.
1423    pub fn global_id_writes(&self) -> GlobalId {
1424        *self
1425            .collections
1426            .last_key_value()
1427            .expect("at least one version of a materialized view")
1428            .1
1429    }
1430
1431    /// Returns all collections and their [`RelationDesc`]s associated with this [`MaterializedView`].
1432    pub fn collection_descs(
1433        &self,
1434    ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
1435        self.collections.iter().map(|(version, gid)| {
1436            let desc = self
1437                .desc
1438                .at_version(RelationVersionSelector::Specific(*version));
1439            (*gid, *version, desc)
1440        })
1441    }
1442
1443    /// Returns the [`RelationDesc`] for a specific [`GlobalId`].
1444    pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
1445        let (version, _gid) = self
1446            .collections
1447            .iter()
1448            .find(|(_version, gid)| *gid == id)
1449            .expect("GlobalId to exist");
1450        self.desc
1451            .at_version(RelationVersionSelector::Specific(*version))
1452    }
1453
1454    /// Apply the given replacement materialized view to this [`MaterializedView`].
1455    pub fn apply_replacement(&mut self, replacement: Self) {
1456        let target_id = replacement
1457            .replacement_target
1458            .expect("replacement has target");
1459
1460        fn parse(create_sql: &str) -> mz_sql::ast::CreateMaterializedViewStatement<Raw> {
1461            let res = mz_sql::parse::parse(create_sql).unwrap_or_else(|e| {
1462                panic!("invalid create_sql persisted in catalog: {e}\n{create_sql}");
1463            });
1464            if let Statement::CreateMaterializedView(cmvs) = res.into_element().ast {
1465                cmvs
1466            } else {
1467                panic!("invalid MV create_sql persisted in catalog\n{create_sql}");
1468            }
1469        }
1470
1471        let old_stmt = parse(&self.create_sql);
1472        let rpl_stmt = parse(&replacement.create_sql);
1473        let new_stmt = mz_sql::ast::CreateMaterializedViewStatement {
1474            if_exists: old_stmt.if_exists,
1475            name: old_stmt.name,
1476            columns: rpl_stmt.columns,
1477            replacing: None,
1478            in_cluster: rpl_stmt.in_cluster,
1479            query: rpl_stmt.query,
1480            as_of: rpl_stmt.as_of,
1481            with_options: rpl_stmt.with_options,
1482        };
1483        let create_sql = new_stmt.to_ast_string_stable();
1484
1485        let mut collections = std::mem::take(&mut self.collections);
1486        // Note: We can't use `self.desc.latest_version` here because a replacement doesn't
1487        // necessary evolve the relation schema, so that version might be lower than the actual
1488        // latest version.
1489        let latest_version = collections.keys().max().expect("at least one version");
1490        let new_version = latest_version.bump();
1491        collections.insert(new_version, replacement.global_id_writes());
1492
1493        let mut resolved_ids = replacement.resolved_ids;
1494        resolved_ids.remove_item(&target_id);
1495        let mut dependencies = replacement.dependencies;
1496        dependencies.0.remove(&target_id);
1497
1498        *self = Self {
1499            create_sql,
1500            collections,
1501            raw_expr: replacement.raw_expr,
1502            optimized_expr: replacement.optimized_expr,
1503            desc: replacement.desc,
1504            resolved_ids,
1505            dependencies,
1506            replacement_target: None,
1507            cluster_id: replacement.cluster_id,
1508            non_null_assertions: replacement.non_null_assertions,
1509            custom_logical_compaction_window: replacement.custom_logical_compaction_window,
1510            refresh_schedule: replacement.refresh_schedule,
1511            initial_as_of: replacement.initial_as_of,
1512        };
1513    }
1514}
1515
1516#[derive(Debug, Clone, Serialize)]
1517pub struct Index {
1518    /// Parse-able SQL that defines this table.
1519    pub create_sql: String,
1520    /// [`GlobalId`] used to reference this index from outside the catalog, e.g. compute.
1521    pub global_id: GlobalId,
1522    /// The [`GlobalId`] this Index is on.
1523    pub on: GlobalId,
1524    /// Keys of the index.
1525    pub keys: Arc<[MirScalarExpr]>,
1526    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
1527    pub conn_id: Option<ConnectionId>,
1528    /// Other catalog objects referenced by this index, e.g. the object we're indexing.
1529    pub resolved_ids: ResolvedIds,
1530    /// Cluster this index is installed on.
1531    pub cluster_id: ClusterId,
1532    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
1533    pub custom_logical_compaction_window: Option<CompactionWindow>,
1534    /// Whether the table's logical compaction window is controlled by the ['metrics_retention']
1535    /// session variable.
1536    ///
1537    /// ['metrics_retention']: mz_sql::session::vars::METRICS_RETENTION
1538    pub is_retained_metrics_object: bool,
1539}
1540
1541impl Index {
1542    /// The [`GlobalId`] that refers to this Index.
1543    pub fn global_id(&self) -> GlobalId {
1544        self.global_id
1545    }
1546}
1547
1548#[derive(Debug, Clone, Serialize)]
1549pub struct Type {
1550    /// Parse-able SQL that defines this type.
1551    pub create_sql: Option<String>,
1552    /// [`GlobalId`] used to reference this type from outside the catalog.
1553    pub global_id: GlobalId,
1554    #[serde(skip)]
1555    pub details: CatalogTypeDetails<IdReference>,
1556    /// Other catalog objects referenced by this type.
1557    pub resolved_ids: ResolvedIds,
1558}
1559
1560#[derive(Debug, Clone, Serialize)]
1561pub struct Func {
1562    /// Static definition of the function.
1563    #[serde(skip)]
1564    pub inner: &'static mz_sql::func::Func,
1565    /// [`GlobalId`] used to reference this function from outside the catalog.
1566    pub global_id: GlobalId,
1567}
1568
1569#[derive(Debug, Clone, Serialize)]
1570pub struct Secret {
1571    /// Parse-able SQL that defines this secret.
1572    pub create_sql: String,
1573    /// [`GlobalId`] used to reference this secret from outside the catalog.
1574    pub global_id: GlobalId,
1575}
1576
1577#[derive(Debug, Clone, Serialize)]
1578pub struct Connection {
1579    /// Parse-able SQL that defines this connection.
1580    pub create_sql: String,
1581    /// [`GlobalId`] used to reference this connection from the storage layer.
1582    pub global_id: GlobalId,
1583    /// The kind of connection.
1584    pub details: ConnectionDetails,
1585    /// Other objects this connection depends on.
1586    pub resolved_ids: ResolvedIds,
1587}
1588
1589impl Connection {
1590    /// The single [`GlobalId`] used to reference this connection.
1591    pub fn global_id(&self) -> GlobalId {
1592        self.global_id
1593    }
1594}
1595
1596#[derive(Debug, Clone, Serialize)]
1597pub struct ContinualTask {
1598    /// Parse-able SQL that defines this continual task.
1599    pub create_sql: String,
1600    /// [`GlobalId`] used to reference this continual task from outside the catalog.
1601    pub global_id: GlobalId,
1602    /// [`GlobalId`] of the collection that we read into this continual task.
1603    pub input_id: GlobalId,
1604    pub with_snapshot: bool,
1605    /// ContinualTasks are self-referential. We make this work by using a
1606    /// placeholder `LocalId` for the CT itself through name resolution and
1607    /// planning. Then we fill in the real `GlobalId` before constructing this
1608    /// catalog item.
1609    pub raw_expr: Arc<HirRelationExpr>,
1610    /// Columns for this continual task.
1611    pub desc: RelationDesc,
1612    /// Other catalog items that this continual task references, determined at name resolution.
1613    pub resolved_ids: ResolvedIds,
1614    /// All of the catalog objects that are referenced by this continual task.
1615    pub dependencies: DependencyIds,
1616    /// Cluster that this continual task runs on.
1617    pub cluster_id: ClusterId,
1618    /// See the comment on [MaterializedView::initial_as_of].
1619    pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1620}
1621
1622impl ContinualTask {
1623    /// The single [`GlobalId`] used to reference this continual task.
1624    pub fn global_id(&self) -> GlobalId {
1625        self.global_id
1626    }
1627}
1628
1629#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
1630pub struct NetworkPolicy {
1631    pub name: String,
1632    pub id: NetworkPolicyId,
1633    pub oid: u32,
1634    pub rules: Vec<NetworkPolicyRule>,
1635    pub owner_id: RoleId,
1636    pub privileges: PrivilegeMap,
1637}
1638
1639impl From<NetworkPolicy> for durable::NetworkPolicy {
1640    fn from(policy: NetworkPolicy) -> durable::NetworkPolicy {
1641        durable::NetworkPolicy {
1642            id: policy.id,
1643            oid: policy.oid,
1644            name: policy.name,
1645            rules: policy.rules,
1646            owner_id: policy.owner_id,
1647            privileges: policy.privileges.into_all_values().collect(),
1648        }
1649    }
1650}
1651
1652impl From<durable::NetworkPolicy> for NetworkPolicy {
1653    fn from(
1654        durable::NetworkPolicy {
1655            id,
1656            oid,
1657            name,
1658            rules,
1659            owner_id,
1660            privileges,
1661        }: durable::NetworkPolicy,
1662    ) -> Self {
1663        NetworkPolicy {
1664            id,
1665            oid,
1666            name,
1667            rules,
1668            owner_id,
1669            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1670        }
1671    }
1672}
1673
1674impl UpdateFrom<durable::NetworkPolicy> for NetworkPolicy {
1675    fn update_from(
1676        &mut self,
1677        durable::NetworkPolicy {
1678            id,
1679            oid,
1680            name,
1681            rules,
1682            owner_id,
1683            privileges,
1684        }: durable::NetworkPolicy,
1685    ) {
1686        self.id = id;
1687        self.oid = oid;
1688        self.name = name;
1689        self.rules = rules;
1690        self.owner_id = owner_id;
1691        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1692    }
1693}
1694
1695impl CatalogItem {
1696    /// Returns a string indicating the type of this catalog entry.
1697    pub fn typ(&self) -> mz_sql::catalog::CatalogItemType {
1698        match self {
1699            CatalogItem::Table(_) => CatalogItemType::Table,
1700            CatalogItem::Source(_) => CatalogItemType::Source,
1701            CatalogItem::Log(_) => CatalogItemType::Source,
1702            CatalogItem::Sink(_) => CatalogItemType::Sink,
1703            CatalogItem::View(_) => CatalogItemType::View,
1704            CatalogItem::MaterializedView(_) => CatalogItemType::MaterializedView,
1705            CatalogItem::Index(_) => CatalogItemType::Index,
1706            CatalogItem::Type(_) => CatalogItemType::Type,
1707            CatalogItem::Func(_) => CatalogItemType::Func,
1708            CatalogItem::Secret(_) => CatalogItemType::Secret,
1709            CatalogItem::Connection(_) => CatalogItemType::Connection,
1710            CatalogItem::ContinualTask(_) => CatalogItemType::ContinualTask,
1711        }
1712    }
1713
1714    /// Returns the [`GlobalId`]s that reference this item, if any.
1715    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1716        let gid = match self {
1717            CatalogItem::Source(source) => source.global_id,
1718            CatalogItem::Log(log) => log.global_id,
1719            CatalogItem::Sink(sink) => sink.global_id,
1720            CatalogItem::View(view) => view.global_id,
1721            CatalogItem::MaterializedView(mv) => {
1722                return itertools::Either::Left(mv.collections.values().copied());
1723            }
1724            CatalogItem::ContinualTask(ct) => ct.global_id,
1725            CatalogItem::Index(index) => index.global_id,
1726            CatalogItem::Func(func) => func.global_id,
1727            CatalogItem::Type(ty) => ty.global_id,
1728            CatalogItem::Secret(secret) => secret.global_id,
1729            CatalogItem::Connection(conn) => conn.global_id,
1730            CatalogItem::Table(table) => {
1731                return itertools::Either::Left(table.collections.values().copied());
1732            }
1733        };
1734        itertools::Either::Right(std::iter::once(gid))
1735    }
1736
1737    /// Returns the most up-to-date [`GlobalId`] for this item.
1738    ///
1739    /// Note: The only type of object that can have multiple [`GlobalId`]s are tables.
1740    pub fn latest_global_id(&self) -> GlobalId {
1741        match self {
1742            CatalogItem::Source(source) => source.global_id,
1743            CatalogItem::Log(log) => log.global_id,
1744            CatalogItem::Sink(sink) => sink.global_id,
1745            CatalogItem::View(view) => view.global_id,
1746            CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
1747            CatalogItem::ContinualTask(ct) => ct.global_id,
1748            CatalogItem::Index(index) => index.global_id,
1749            CatalogItem::Func(func) => func.global_id,
1750            CatalogItem::Type(ty) => ty.global_id,
1751            CatalogItem::Secret(secret) => secret.global_id,
1752            CatalogItem::Connection(conn) => conn.global_id,
1753            CatalogItem::Table(table) => table.global_id_writes(),
1754        }
1755    }
1756
1757    /// Whether this item represents a storage collection.
1758    pub fn is_storage_collection(&self) -> bool {
1759        match self {
1760            CatalogItem::Table(_)
1761            | CatalogItem::Source(_)
1762            | CatalogItem::MaterializedView(_)
1763            | CatalogItem::Sink(_)
1764            | CatalogItem::ContinualTask(_) => true,
1765            CatalogItem::Log(_)
1766            | CatalogItem::View(_)
1767            | CatalogItem::Index(_)
1768            | CatalogItem::Type(_)
1769            | CatalogItem::Func(_)
1770            | CatalogItem::Secret(_)
1771            | CatalogItem::Connection(_) => false,
1772        }
1773    }
1774
1775    /// Returns the [`RelationDesc`] for items that yield rows, at the requested
1776    /// version.
1777    ///
1778    /// Some item types honor `version` so callers can ask for the schema that
1779    /// matches a specific [`GlobalId`] or historical definition. Other relation
1780    /// types ignore `version` because they have a single shape. Non-relational
1781    /// items ( for example functions, indexes, sinks, secrets, and connections)
1782    /// return `None`.
1783    pub fn relation_desc(&self, version: RelationVersionSelector) -> Option<Cow<'_, RelationDesc>> {
1784        match &self {
1785            CatalogItem::Source(src) => Some(Cow::Borrowed(&src.desc)),
1786            CatalogItem::Log(log) => Some(Cow::Owned(log.variant.desc())),
1787            CatalogItem::Table(tbl) => Some(Cow::Owned(tbl.desc.at_version(version))),
1788            CatalogItem::View(view) => Some(Cow::Borrowed(&view.desc)),
1789            CatalogItem::MaterializedView(mview) => {
1790                Some(Cow::Owned(mview.desc.at_version(version)))
1791            }
1792            CatalogItem::ContinualTask(ct) => Some(Cow::Borrowed(&ct.desc)),
1793            CatalogItem::Func(_)
1794            | CatalogItem::Index(_)
1795            | CatalogItem::Sink(_)
1796            | CatalogItem::Secret(_)
1797            | CatalogItem::Connection(_)
1798            | CatalogItem::Type(_) => None,
1799        }
1800    }
1801
1802    pub fn func(
1803        &self,
1804        entry: &CatalogEntry,
1805    ) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
1806        match &self {
1807            CatalogItem::Func(func) => Ok(func.inner),
1808            _ => Err(SqlCatalogError::UnexpectedType {
1809                name: entry.name().item.to_string(),
1810                actual_type: entry.item_type(),
1811                expected_type: CatalogItemType::Func,
1812            }),
1813        }
1814    }
1815
1816    pub fn source_desc(
1817        &self,
1818        entry: &CatalogEntry,
1819    ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
1820        match &self {
1821            CatalogItem::Source(source) => match &source.data_source {
1822                DataSourceDesc::Ingestion { desc, .. }
1823                | DataSourceDesc::OldSyntaxIngestion { desc, .. } => Ok(Some(desc)),
1824                DataSourceDesc::IngestionExport { .. }
1825                | DataSourceDesc::Introspection(_)
1826                | DataSourceDesc::Webhook { .. }
1827                | DataSourceDesc::Progress => Ok(None),
1828            },
1829            _ => Err(SqlCatalogError::UnexpectedType {
1830                name: entry.name().item.to_string(),
1831                actual_type: entry.item_type(),
1832                expected_type: CatalogItemType::Source,
1833            }),
1834        }
1835    }
1836
1837    /// Reports whether this catalog entry is a progress source.
1838    pub fn is_progress_source(&self) -> bool {
1839        matches!(
1840            self,
1841            CatalogItem::Source(Source {
1842                data_source: DataSourceDesc::Progress,
1843                ..
1844            })
1845        )
1846    }
1847
1848    /// Collects the identifiers of the objects that were encountered when resolving names in the
1849    /// item's DDL statement.
1850    pub fn references(&self) -> &ResolvedIds {
1851        static EMPTY: LazyLock<ResolvedIds> = LazyLock::new(ResolvedIds::empty);
1852        match self {
1853            CatalogItem::Func(_) => &*EMPTY,
1854            CatalogItem::Index(idx) => &idx.resolved_ids,
1855            CatalogItem::Sink(sink) => &sink.resolved_ids,
1856            CatalogItem::Source(source) => &source.resolved_ids,
1857            CatalogItem::Log(_) => &*EMPTY,
1858            CatalogItem::Table(table) => &table.resolved_ids,
1859            CatalogItem::Type(typ) => &typ.resolved_ids,
1860            CatalogItem::View(view) => &view.resolved_ids,
1861            CatalogItem::MaterializedView(mview) => &mview.resolved_ids,
1862            CatalogItem::Secret(_) => &*EMPTY,
1863            CatalogItem::Connection(connection) => &connection.resolved_ids,
1864            CatalogItem::ContinualTask(ct) => &ct.resolved_ids,
1865        }
1866    }
1867
1868    /// Collects the identifiers of the objects used by this [`CatalogItem`].
1869    ///
1870    /// Like [`CatalogItem::references()`] but also includes objects that are not directly
1871    /// referenced. For example this will include any catalog objects used to implement functions
1872    /// and casts in the item.
1873    pub fn uses(&self) -> BTreeSet<CatalogItemId> {
1874        let mut uses: BTreeSet<_> = self.references().items().copied().collect();
1875        match self {
1876            // TODO(jkosh44) This isn't really correct for functions. They may use other objects in
1877            // their implementation. However, currently there's no way to get that information.
1878            CatalogItem::Func(_) => {}
1879            CatalogItem::Index(_) => {}
1880            CatalogItem::Sink(_) => {}
1881            CatalogItem::Source(_) => {}
1882            CatalogItem::Log(_) => {}
1883            CatalogItem::Table(_) => {}
1884            CatalogItem::Type(_) => {}
1885            CatalogItem::View(view) => uses.extend(view.dependencies.0.iter().copied()),
1886            CatalogItem::MaterializedView(mview) => {
1887                uses.extend(mview.dependencies.0.iter().copied())
1888            }
1889            CatalogItem::ContinualTask(ct) => uses.extend(ct.dependencies.0.iter().copied()),
1890            CatalogItem::Secret(_) => {}
1891            CatalogItem::Connection(_) => {}
1892        }
1893        uses
1894    }
1895
1896    /// Returns the connection ID that this item belongs to, if this item is
1897    /// temporary.
1898    pub fn conn_id(&self) -> Option<&ConnectionId> {
1899        match self {
1900            CatalogItem::View(view) => view.conn_id.as_ref(),
1901            CatalogItem::Index(index) => index.conn_id.as_ref(),
1902            CatalogItem::Table(table) => table.conn_id.as_ref(),
1903            CatalogItem::Log(_)
1904            | CatalogItem::Source(_)
1905            | CatalogItem::Sink(_)
1906            | CatalogItem::MaterializedView(_)
1907            | CatalogItem::Secret(_)
1908            | CatalogItem::Type(_)
1909            | CatalogItem::Func(_)
1910            | CatalogItem::Connection(_)
1911            | CatalogItem::ContinualTask(_) => None,
1912        }
1913    }
1914
1915    /// Sets the connection ID that this item belongs to, which makes it a
1916    /// temporary item.
1917    pub fn set_conn_id(&mut self, conn_id: Option<ConnectionId>) {
1918        match self {
1919            CatalogItem::View(view) => view.conn_id = conn_id,
1920            CatalogItem::Index(index) => index.conn_id = conn_id,
1921            CatalogItem::Table(table) => table.conn_id = conn_id,
1922            CatalogItem::Log(_)
1923            | CatalogItem::Source(_)
1924            | CatalogItem::Sink(_)
1925            | CatalogItem::MaterializedView(_)
1926            | CatalogItem::Secret(_)
1927            | CatalogItem::Type(_)
1928            | CatalogItem::Func(_)
1929            | CatalogItem::Connection(_)
1930            | CatalogItem::ContinualTask(_) => (),
1931        }
1932    }
1933
1934    /// Indicates whether this item is temporary or not.
1935    pub fn is_temporary(&self) -> bool {
1936        self.conn_id().is_some()
1937    }
1938
1939    pub fn rename_schema_refs(
1940        &self,
1941        database_name: &str,
1942        cur_schema_name: &str,
1943        new_schema_name: &str,
1944    ) -> Result<CatalogItem, (String, String)> {
1945        let do_rewrite = |create_sql: String| -> Result<String, (String, String)> {
1946            let mut create_stmt = mz_sql::parse::parse(&create_sql)
1947                .expect("invalid create sql persisted to catalog")
1948                .into_element()
1949                .ast;
1950
1951            // Rename all references to cur_schema_name.
1952            mz_sql::ast::transform::create_stmt_rename_schema_refs(
1953                &mut create_stmt,
1954                database_name,
1955                cur_schema_name,
1956                new_schema_name,
1957            )?;
1958
1959            Ok(create_stmt.to_ast_string_stable())
1960        };
1961
1962        match self {
1963            CatalogItem::Table(i) => {
1964                let mut i = i.clone();
1965                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1966                Ok(CatalogItem::Table(i))
1967            }
1968            CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
1969            CatalogItem::Source(i) => {
1970                let mut i = i.clone();
1971                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1972                Ok(CatalogItem::Source(i))
1973            }
1974            CatalogItem::Sink(i) => {
1975                let mut i = i.clone();
1976                i.create_sql = do_rewrite(i.create_sql)?;
1977                Ok(CatalogItem::Sink(i))
1978            }
1979            CatalogItem::View(i) => {
1980                let mut i = i.clone();
1981                i.create_sql = do_rewrite(i.create_sql)?;
1982                Ok(CatalogItem::View(i))
1983            }
1984            CatalogItem::MaterializedView(i) => {
1985                let mut i = i.clone();
1986                i.create_sql = do_rewrite(i.create_sql)?;
1987                Ok(CatalogItem::MaterializedView(i))
1988            }
1989            CatalogItem::Index(i) => {
1990                let mut i = i.clone();
1991                i.create_sql = do_rewrite(i.create_sql)?;
1992                Ok(CatalogItem::Index(i))
1993            }
1994            CatalogItem::Secret(i) => {
1995                let mut i = i.clone();
1996                i.create_sql = do_rewrite(i.create_sql)?;
1997                Ok(CatalogItem::Secret(i))
1998            }
1999            CatalogItem::Connection(i) => {
2000                let mut i = i.clone();
2001                i.create_sql = do_rewrite(i.create_sql)?;
2002                Ok(CatalogItem::Connection(i))
2003            }
2004            CatalogItem::Type(i) => {
2005                let mut i = i.clone();
2006                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2007                Ok(CatalogItem::Type(i))
2008            }
2009            CatalogItem::Func(i) => Ok(CatalogItem::Func(i.clone())),
2010            CatalogItem::ContinualTask(i) => {
2011                let mut i = i.clone();
2012                i.create_sql = do_rewrite(i.create_sql)?;
2013                Ok(CatalogItem::ContinualTask(i))
2014            }
2015        }
2016    }
2017
2018    /// Returns a clone of `self` with all instances of `from` renamed to `to`
2019    /// (with the option of including the item's own name) or errors if request
2020    /// is ambiguous.
2021    pub fn rename_item_refs(
2022        &self,
2023        from: FullItemName,
2024        to_item_name: String,
2025        rename_self: bool,
2026    ) -> Result<CatalogItem, String> {
2027        let do_rewrite = |create_sql: String| -> Result<String, String> {
2028            let mut create_stmt = mz_sql::parse::parse(&create_sql)
2029                .expect("invalid create sql persisted to catalog")
2030                .into_element()
2031                .ast;
2032            if rename_self {
2033                mz_sql::ast::transform::create_stmt_rename(&mut create_stmt, to_item_name.clone());
2034            }
2035            // Determination of what constitutes an ambiguous request is done here.
2036            mz_sql::ast::transform::create_stmt_rename_refs(&mut create_stmt, from, to_item_name)?;
2037            Ok(create_stmt.to_ast_string_stable())
2038        };
2039
2040        match self {
2041            CatalogItem::Table(i) => {
2042                let mut i = i.clone();
2043                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2044                Ok(CatalogItem::Table(i))
2045            }
2046            CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
2047            CatalogItem::Source(i) => {
2048                let mut i = i.clone();
2049                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2050                Ok(CatalogItem::Source(i))
2051            }
2052            CatalogItem::Sink(i) => {
2053                let mut i = i.clone();
2054                i.create_sql = do_rewrite(i.create_sql)?;
2055                Ok(CatalogItem::Sink(i))
2056            }
2057            CatalogItem::View(i) => {
2058                let mut i = i.clone();
2059                i.create_sql = do_rewrite(i.create_sql)?;
2060                Ok(CatalogItem::View(i))
2061            }
2062            CatalogItem::MaterializedView(i) => {
2063                let mut i = i.clone();
2064                i.create_sql = do_rewrite(i.create_sql)?;
2065                Ok(CatalogItem::MaterializedView(i))
2066            }
2067            CatalogItem::Index(i) => {
2068                let mut i = i.clone();
2069                i.create_sql = do_rewrite(i.create_sql)?;
2070                Ok(CatalogItem::Index(i))
2071            }
2072            CatalogItem::Secret(i) => {
2073                let mut i = i.clone();
2074                i.create_sql = do_rewrite(i.create_sql)?;
2075                Ok(CatalogItem::Secret(i))
2076            }
2077            CatalogItem::Func(_) | CatalogItem::Type(_) => {
2078                unreachable!("{}s cannot be renamed", self.typ())
2079            }
2080            CatalogItem::Connection(i) => {
2081                let mut i = i.clone();
2082                i.create_sql = do_rewrite(i.create_sql)?;
2083                Ok(CatalogItem::Connection(i))
2084            }
2085            CatalogItem::ContinualTask(i) => {
2086                let mut i = i.clone();
2087                i.create_sql = do_rewrite(i.create_sql)?;
2088                Ok(CatalogItem::ContinualTask(i))
2089            }
2090        }
2091    }
2092
2093    /// Updates the retain history for an item. Returns the previous retain history value. Returns
2094    /// an error if this item does not support retain history.
2095    pub fn update_retain_history(
2096        &mut self,
2097        value: Option<Value>,
2098        window: CompactionWindow,
2099    ) -> Result<Option<WithOptionValue<Raw>>, ()> {
2100        let update = |mut ast: &mut Statement<Raw>| {
2101            // Each statement type has unique option types. This macro handles them commonly.
2102            macro_rules! update_retain_history {
2103                ( $stmt:ident, $opt:ident, $name:ident ) => {{
2104                    // Replace or add the option.
2105                    let pos = $stmt
2106                        .with_options
2107                        .iter()
2108                        // In case there are ever multiple, look for the last one.
2109                        .rposition(|o| o.name == mz_sql_parser::ast::$name::RetainHistory);
2110                    if let Some(value) = value {
2111                        let next = mz_sql_parser::ast::$opt {
2112                            name: mz_sql_parser::ast::$name::RetainHistory,
2113                            value: Some(WithOptionValue::RetainHistoryFor(value)),
2114                        };
2115                        if let Some(idx) = pos {
2116                            let previous = $stmt.with_options[idx].clone();
2117                            $stmt.with_options[idx] = next;
2118                            previous.value
2119                        } else {
2120                            $stmt.with_options.push(next);
2121                            None
2122                        }
2123                    } else {
2124                        if let Some(idx) = pos {
2125                            $stmt.with_options.swap_remove(idx).value
2126                        } else {
2127                            None
2128                        }
2129                    }
2130                }};
2131            }
2132            let previous = match &mut ast {
2133                Statement::CreateTable(stmt) => {
2134                    update_retain_history!(stmt, TableOption, TableOptionName)
2135                }
2136                Statement::CreateIndex(stmt) => {
2137                    update_retain_history!(stmt, IndexOption, IndexOptionName)
2138                }
2139                Statement::CreateSource(stmt) => {
2140                    update_retain_history!(stmt, CreateSourceOption, CreateSourceOptionName)
2141                }
2142                Statement::CreateMaterializedView(stmt) => {
2143                    update_retain_history!(stmt, MaterializedViewOption, MaterializedViewOptionName)
2144                }
2145                _ => {
2146                    return Err(());
2147                }
2148            };
2149            Ok(previous)
2150        };
2151
2152        let res = self.update_sql(update)?;
2153        let cw = self
2154            .custom_logical_compaction_window_mut()
2155            .expect("item must have compaction window");
2156        *cw = Some(window);
2157        Ok(res)
2158    }
2159
2160    pub fn add_column(
2161        &mut self,
2162        name: ColumnName,
2163        typ: SqlColumnType,
2164        sql: RawDataType,
2165    ) -> Result<RelationVersion, PlanError> {
2166        let CatalogItem::Table(table) = self else {
2167            return Err(PlanError::Unsupported {
2168                feature: "adding columns to a non-Table".to_string(),
2169                discussion_no: None,
2170            });
2171        };
2172        let next_version = table.desc.add_column(name.clone(), typ);
2173
2174        let update = |mut ast: &mut Statement<Raw>| match &mut ast {
2175            Statement::CreateTable(stmt) => {
2176                let version = ColumnOptionDef {
2177                    name: None,
2178                    option: ColumnOption::Versioned {
2179                        action: ColumnVersioned::Added,
2180                        version: next_version.into(),
2181                    },
2182                };
2183                let column = ColumnDef {
2184                    name: name.into(),
2185                    data_type: sql,
2186                    collation: None,
2187                    options: vec![version],
2188                };
2189                stmt.columns.push(column);
2190                Ok(())
2191            }
2192            _ => Err(()),
2193        };
2194
2195        self.update_sql(update)
2196            .map_err(|()| PlanError::Unstructured("expected CREATE TABLE statement".to_string()))?;
2197        Ok(next_version)
2198    }
2199
2200    /// Updates the create_sql field of this item. Returns an error if this is a builtin item,
2201    /// otherwise returns f's result.
2202    pub fn update_sql<F, T>(&mut self, f: F) -> Result<T, ()>
2203    where
2204        F: FnOnce(&mut Statement<Raw>) -> Result<T, ()>,
2205    {
2206        let create_sql = match self {
2207            CatalogItem::Table(Table { create_sql, .. })
2208            | CatalogItem::Type(Type { create_sql, .. })
2209            | CatalogItem::Source(Source { create_sql, .. }) => create_sql.as_mut(),
2210            CatalogItem::Sink(Sink { create_sql, .. })
2211            | CatalogItem::View(View { create_sql, .. })
2212            | CatalogItem::MaterializedView(MaterializedView { create_sql, .. })
2213            | CatalogItem::Index(Index { create_sql, .. })
2214            | CatalogItem::Secret(Secret { create_sql, .. })
2215            | CatalogItem::Connection(Connection { create_sql, .. })
2216            | CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => Some(create_sql),
2217            CatalogItem::Func(_) | CatalogItem::Log(_) => None,
2218        };
2219        let Some(create_sql) = create_sql else {
2220            return Err(());
2221        };
2222        let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
2223            .expect("non-system items must be parseable")
2224            .into_element()
2225            .ast;
2226        debug!("rewrite: {}", ast.to_ast_string_redacted());
2227        let t = f(&mut ast)?;
2228        *create_sql = ast.to_ast_string_stable();
2229        debug!("rewrote: {}", ast.to_ast_string_redacted());
2230        Ok(t)
2231    }
2232
2233    /// If the object is considered a "compute object"
2234    /// (i.e., it is managed by the compute controller),
2235    /// this function returns its cluster ID. Otherwise, it returns nothing.
2236    ///
2237    /// This function differs from `cluster_id` because while all
2238    /// compute objects run on a cluster, the converse is not true.
2239    pub fn is_compute_object_on_cluster(&self) -> Option<ClusterId> {
2240        match self {
2241            CatalogItem::Index(index) => Some(index.cluster_id),
2242            CatalogItem::Table(_)
2243            | CatalogItem::Source(_)
2244            | CatalogItem::Log(_)
2245            | CatalogItem::View(_)
2246            | CatalogItem::MaterializedView(_)
2247            | CatalogItem::Sink(_)
2248            | CatalogItem::Type(_)
2249            | CatalogItem::Func(_)
2250            | CatalogItem::Secret(_)
2251            | CatalogItem::Connection(_)
2252            | CatalogItem::ContinualTask(_) => None,
2253        }
2254    }
2255
2256    pub fn cluster_id(&self) -> Option<ClusterId> {
2257        match self {
2258            CatalogItem::MaterializedView(mv) => Some(mv.cluster_id),
2259            CatalogItem::Index(index) => Some(index.cluster_id),
2260            CatalogItem::Source(source) => match &source.data_source {
2261                DataSourceDesc::Ingestion { cluster_id, .. }
2262                | DataSourceDesc::OldSyntaxIngestion { cluster_id, .. } => Some(*cluster_id),
2263                // This is somewhat of a lie because the export runs on the same
2264                // cluster as its ingestion but we don't yet have a way of
2265                // cross-referencing the items
2266                DataSourceDesc::IngestionExport { .. } => None,
2267                DataSourceDesc::Webhook { cluster_id, .. } => Some(*cluster_id),
2268                DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => None,
2269            },
2270            CatalogItem::Sink(sink) => Some(sink.cluster_id),
2271            CatalogItem::ContinualTask(ct) => Some(ct.cluster_id),
2272            CatalogItem::Table(_)
2273            | CatalogItem::Log(_)
2274            | CatalogItem::View(_)
2275            | CatalogItem::Type(_)
2276            | CatalogItem::Func(_)
2277            | CatalogItem::Secret(_)
2278            | CatalogItem::Connection(_) => None,
2279        }
2280    }
2281
2282    /// The custom compaction window, if any has been set. This does not reflect any propagated
2283    /// compaction window (i.e., source -> subsource).
2284    pub fn custom_logical_compaction_window(&self) -> Option<CompactionWindow> {
2285        match self {
2286            CatalogItem::Table(table) => table.custom_logical_compaction_window,
2287            CatalogItem::Source(source) => source.custom_logical_compaction_window,
2288            CatalogItem::Index(index) => index.custom_logical_compaction_window,
2289            CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
2290            CatalogItem::Log(_)
2291            | CatalogItem::View(_)
2292            | CatalogItem::Sink(_)
2293            | CatalogItem::Type(_)
2294            | CatalogItem::Func(_)
2295            | CatalogItem::Secret(_)
2296            | CatalogItem::Connection(_)
2297            | CatalogItem::ContinualTask(_) => None,
2298        }
2299    }
2300
2301    /// Mutable access to the custom compaction window, or None if this type does not support custom
2302    /// compaction windows. This does not reflect any propagated compaction window (i.e., source ->
2303    /// subsource).
2304    pub fn custom_logical_compaction_window_mut(
2305        &mut self,
2306    ) -> Option<&mut Option<CompactionWindow>> {
2307        let cw = match self {
2308            CatalogItem::Table(table) => &mut table.custom_logical_compaction_window,
2309            CatalogItem::Source(source) => &mut source.custom_logical_compaction_window,
2310            CatalogItem::Index(index) => &mut index.custom_logical_compaction_window,
2311            CatalogItem::MaterializedView(mview) => &mut mview.custom_logical_compaction_window,
2312            CatalogItem::Log(_)
2313            | CatalogItem::View(_)
2314            | CatalogItem::Sink(_)
2315            | CatalogItem::Type(_)
2316            | CatalogItem::Func(_)
2317            | CatalogItem::Secret(_)
2318            | CatalogItem::Connection(_)
2319            | CatalogItem::ContinualTask(_) => return None,
2320        };
2321        Some(cw)
2322    }
2323
2324    /// The initial compaction window, for objects that have one; that is, tables, sources, indexes,
2325    /// and MVs. This does not reflect any propagated compaction window (i.e., source -> subsource).
2326    ///
2327    /// If `custom_logical_compaction_window()` returns something, use that.  Otherwise, use a
2328    /// sensible default (currently 1s).
2329    ///
2330    /// For objects that do not have the concept of compaction window, return None.
2331    pub fn initial_logical_compaction_window(&self) -> Option<CompactionWindow> {
2332        let custom_logical_compaction_window = match self {
2333            CatalogItem::Table(_)
2334            | CatalogItem::Source(_)
2335            | CatalogItem::Index(_)
2336            | CatalogItem::MaterializedView(_)
2337            | CatalogItem::ContinualTask(_) => self.custom_logical_compaction_window(),
2338            CatalogItem::Log(_)
2339            | CatalogItem::View(_)
2340            | CatalogItem::Sink(_)
2341            | CatalogItem::Type(_)
2342            | CatalogItem::Func(_)
2343            | CatalogItem::Secret(_)
2344            | CatalogItem::Connection(_) => return None,
2345        };
2346        Some(custom_logical_compaction_window.unwrap_or(CompactionWindow::Default))
2347    }
2348
2349    /// Whether the item's logical compaction window
2350    /// is controlled by the METRICS_RETENTION
2351    /// system var.
2352    pub fn is_retained_metrics_object(&self) -> bool {
2353        match self {
2354            CatalogItem::Table(table) => table.is_retained_metrics_object,
2355            CatalogItem::Source(source) => source.is_retained_metrics_object,
2356            CatalogItem::Index(index) => index.is_retained_metrics_object,
2357            CatalogItem::Log(_)
2358            | CatalogItem::View(_)
2359            | CatalogItem::MaterializedView(_)
2360            | CatalogItem::Sink(_)
2361            | CatalogItem::Type(_)
2362            | CatalogItem::Func(_)
2363            | CatalogItem::Secret(_)
2364            | CatalogItem::Connection(_)
2365            | CatalogItem::ContinualTask(_) => false,
2366        }
2367    }
2368
2369    pub fn to_serialized(&self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2370        match self {
2371            CatalogItem::Table(table) => {
2372                let create_sql = table
2373                    .create_sql
2374                    .clone()
2375                    .expect("builtin tables cannot be serialized");
2376                let mut collections = table.collections.clone();
2377                let global_id = collections
2378                    .remove(&RelationVersion::root())
2379                    .expect("at least one version");
2380                (create_sql, global_id, collections)
2381            }
2382            CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2383            CatalogItem::Source(source) => {
2384                assert!(
2385                    !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2386                    "cannot serialize introspection/builtin sources",
2387                );
2388                let create_sql = source
2389                    .create_sql
2390                    .clone()
2391                    .expect("builtin sources cannot be serialized");
2392                (create_sql, source.global_id, BTreeMap::new())
2393            }
2394            CatalogItem::View(view) => (view.create_sql.clone(), view.global_id, BTreeMap::new()),
2395            CatalogItem::MaterializedView(mview) => {
2396                let mut collections = mview.collections.clone();
2397                let global_id = collections
2398                    .remove(&RelationVersion::root())
2399                    .expect("at least one version");
2400                (mview.create_sql.clone(), global_id, collections)
2401            }
2402            CatalogItem::Index(index) => {
2403                (index.create_sql.clone(), index.global_id, BTreeMap::new())
2404            }
2405            CatalogItem::Sink(sink) => (sink.create_sql.clone(), sink.global_id, BTreeMap::new()),
2406            CatalogItem::Type(typ) => {
2407                let create_sql = typ
2408                    .create_sql
2409                    .clone()
2410                    .expect("builtin types cannot be serialized");
2411                (create_sql, typ.global_id, BTreeMap::new())
2412            }
2413            CatalogItem::Secret(secret) => {
2414                (secret.create_sql.clone(), secret.global_id, BTreeMap::new())
2415            }
2416            CatalogItem::Connection(connection) => (
2417                connection.create_sql.clone(),
2418                connection.global_id,
2419                BTreeMap::new(),
2420            ),
2421            CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2422            CatalogItem::ContinualTask(ct) => {
2423                (ct.create_sql.clone(), ct.global_id, BTreeMap::new())
2424            }
2425        }
2426    }
2427
2428    pub fn into_serialized(self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2429        match self {
2430            CatalogItem::Table(mut table) => {
2431                let create_sql = table
2432                    .create_sql
2433                    .expect("builtin tables cannot be serialized");
2434                let global_id = table
2435                    .collections
2436                    .remove(&RelationVersion::root())
2437                    .expect("at least one version");
2438                (create_sql, global_id, table.collections)
2439            }
2440            CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2441            CatalogItem::Source(source) => {
2442                assert!(
2443                    !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2444                    "cannot serialize introspection/builtin sources",
2445                );
2446                let create_sql = source
2447                    .create_sql
2448                    .expect("builtin sources cannot be serialized");
2449                (create_sql, source.global_id, BTreeMap::new())
2450            }
2451            CatalogItem::View(view) => (view.create_sql, view.global_id, BTreeMap::new()),
2452            CatalogItem::MaterializedView(mut mview) => {
2453                let global_id = mview
2454                    .collections
2455                    .remove(&RelationVersion::root())
2456                    .expect("at least one version");
2457                (mview.create_sql, global_id, mview.collections)
2458            }
2459            CatalogItem::Index(index) => (index.create_sql, index.global_id, BTreeMap::new()),
2460            CatalogItem::Sink(sink) => (sink.create_sql, sink.global_id, BTreeMap::new()),
2461            CatalogItem::Type(typ) => {
2462                let create_sql = typ.create_sql.expect("builtin types cannot be serialized");
2463                (create_sql, typ.global_id, BTreeMap::new())
2464            }
2465            CatalogItem::Secret(secret) => (secret.create_sql, secret.global_id, BTreeMap::new()),
2466            CatalogItem::Connection(connection) => {
2467                (connection.create_sql, connection.global_id, BTreeMap::new())
2468            }
2469            CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2470            CatalogItem::ContinualTask(ct) => (ct.create_sql, ct.global_id, BTreeMap::new()),
2471        }
2472    }
2473
2474    /// Returns a global ID for a specific version selector. Returns `None` if the item does
2475    /// not have versions or if the version does not exist.
2476    pub fn global_id_for_version(&self, version: RelationVersionSelector) -> Option<GlobalId> {
2477        let collections = match self {
2478            CatalogItem::MaterializedView(mv) => &mv.collections,
2479            CatalogItem::Table(table) => &table.collections,
2480            CatalogItem::Source(source) => return Some(source.global_id),
2481            CatalogItem::Log(log) => return Some(log.global_id),
2482            CatalogItem::View(view) => return Some(view.global_id),
2483            CatalogItem::Sink(sink) => return Some(sink.global_id),
2484            CatalogItem::Index(index) => return Some(index.global_id),
2485            CatalogItem::Type(ty) => return Some(ty.global_id),
2486            CatalogItem::Func(func) => return Some(func.global_id),
2487            CatalogItem::Secret(secret) => return Some(secret.global_id),
2488            CatalogItem::Connection(conn) => return Some(conn.global_id),
2489            CatalogItem::ContinualTask(ct) => return Some(ct.global_id),
2490        };
2491        match version {
2492            RelationVersionSelector::Latest => collections.values().last().copied(),
2493            RelationVersionSelector::Specific(version) => collections.get(&version).copied(),
2494        }
2495    }
2496}
2497
2498impl CatalogEntry {
2499    /// Reports the latest [`RelationDesc`] of the rows produced by this [`CatalogEntry`], if it
2500    /// produces rows.
2501    pub fn relation_desc_latest(&self) -> Option<Cow<'_, RelationDesc>> {
2502        self.item.relation_desc(RelationVersionSelector::Latest)
2503    }
2504
2505    /// Reports if the item has columns.
2506    pub fn has_columns(&self) -> bool {
2507        match self.item() {
2508            CatalogItem::Type(Type { details, .. }) => {
2509                matches!(details.typ, CatalogType::Record { .. })
2510            }
2511            _ => self.relation_desc_latest().is_some(),
2512        }
2513    }
2514
2515    /// Returns the [`mz_sql::func::Func`] associated with this `CatalogEntry`.
2516    pub fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
2517        self.item.func(self)
2518    }
2519
2520    /// Returns the inner [`Index`] if this entry is an index, else `None`.
2521    pub fn index(&self) -> Option<&Index> {
2522        match self.item() {
2523            CatalogItem::Index(idx) => Some(idx),
2524            _ => None,
2525        }
2526    }
2527
2528    /// Returns the inner [`MaterializedView`] if this entry is a materialized view, else `None`.
2529    pub fn materialized_view(&self) -> Option<&MaterializedView> {
2530        match self.item() {
2531            CatalogItem::MaterializedView(mv) => Some(mv),
2532            _ => None,
2533        }
2534    }
2535
2536    /// Returns the inner [`Table`] if this entry is a table, else `None`.
2537    pub fn table(&self) -> Option<&Table> {
2538        match self.item() {
2539            CatalogItem::Table(tbl) => Some(tbl),
2540            _ => None,
2541        }
2542    }
2543
2544    /// Returns the inner [`Source`] if this entry is a source, else `None`.
2545    pub fn source(&self) -> Option<&Source> {
2546        match self.item() {
2547            CatalogItem::Source(src) => Some(src),
2548            _ => None,
2549        }
2550    }
2551
2552    /// Returns the inner [`Sink`] if this entry is a sink, else `None`.
2553    pub fn sink(&self) -> Option<&Sink> {
2554        match self.item() {
2555            CatalogItem::Sink(sink) => Some(sink),
2556            _ => None,
2557        }
2558    }
2559
2560    /// Returns the inner [`Secret`] if this entry is a secret, else `None`.
2561    pub fn secret(&self) -> Option<&Secret> {
2562        match self.item() {
2563            CatalogItem::Secret(secret) => Some(secret),
2564            _ => None,
2565        }
2566    }
2567
2568    pub fn connection(&self) -> Result<&Connection, SqlCatalogError> {
2569        match self.item() {
2570            CatalogItem::Connection(connection) => Ok(connection),
2571            _ => {
2572                let db_name = match self.name().qualifiers.database_spec {
2573                    ResolvedDatabaseSpecifier::Ambient => "".to_string(),
2574                    ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
2575                };
2576                Err(SqlCatalogError::UnknownConnection(format!(
2577                    "{}{}.{}",
2578                    db_name,
2579                    self.name().qualifiers.schema_spec,
2580                    self.name().item
2581                )))
2582            }
2583        }
2584    }
2585
2586    /// Returns the [`mz_storage_types::sources::SourceDesc`] associated with
2587    /// this `CatalogEntry`, if any.
2588    pub fn source_desc(
2589        &self,
2590    ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
2591        self.item.source_desc(self)
2592    }
2593
2594    /// Reports whether this catalog entry is a connection.
2595    pub fn is_connection(&self) -> bool {
2596        matches!(self.item(), CatalogItem::Connection(_))
2597    }
2598
2599    /// Reports whether this catalog entry is a table.
2600    pub fn is_table(&self) -> bool {
2601        matches!(self.item(), CatalogItem::Table(_))
2602    }
2603
2604    /// Reports whether this catalog entry is a source. Note that this includes
2605    /// subsources.
2606    pub fn is_source(&self) -> bool {
2607        matches!(self.item(), CatalogItem::Source(_))
2608    }
2609
2610    /// Reports whether this catalog entry is a subsource and, if it is, the
2611    /// ingestion it is an export of, as well as the item it exports.
2612    pub fn subsource_details(
2613        &self,
2614    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
2615        match &self.item() {
2616            CatalogItem::Source(source) => match &source.data_source {
2617                DataSourceDesc::IngestionExport {
2618                    ingestion_id,
2619                    external_reference,
2620                    details,
2621                    data_config: _,
2622                } => Some((*ingestion_id, external_reference, details)),
2623                _ => None,
2624            },
2625            _ => None,
2626        }
2627    }
2628
2629    /// Reports whether this catalog entry is a source export and, if it is, the
2630    /// ingestion it is an export of, as well as the item it exports.
2631    pub fn source_export_details(
2632        &self,
2633    ) -> Option<(
2634        CatalogItemId,
2635        &UnresolvedItemName,
2636        &SourceExportDetails,
2637        &SourceExportDataConfig<ReferencedConnection>,
2638    )> {
2639        match &self.item() {
2640            CatalogItem::Source(source) => match &source.data_source {
2641                DataSourceDesc::IngestionExport {
2642                    ingestion_id,
2643                    external_reference,
2644                    details,
2645                    data_config,
2646                } => Some((*ingestion_id, external_reference, details, data_config)),
2647                _ => None,
2648            },
2649            CatalogItem::Table(table) => match &table.data_source {
2650                TableDataSource::DataSource {
2651                    desc:
2652                        DataSourceDesc::IngestionExport {
2653                            ingestion_id,
2654                            external_reference,
2655                            details,
2656                            data_config,
2657                        },
2658                    timeline: _,
2659                } => Some((*ingestion_id, external_reference, details, data_config)),
2660                _ => None,
2661            },
2662            _ => None,
2663        }
2664    }
2665
2666    /// Reports whether this catalog entry is a progress source.
2667    pub fn is_progress_source(&self) -> bool {
2668        self.item().is_progress_source()
2669    }
2670
2671    /// Returns the `GlobalId` of all of this entry's progress ID.
2672    pub fn progress_id(&self) -> Option<CatalogItemId> {
2673        match &self.item() {
2674            CatalogItem::Source(source) => match &source.data_source {
2675                DataSourceDesc::Ingestion { .. } => Some(self.id),
2676                DataSourceDesc::OldSyntaxIngestion {
2677                    progress_subsource, ..
2678                } => Some(*progress_subsource),
2679                DataSourceDesc::IngestionExport { .. }
2680                | DataSourceDesc::Introspection(_)
2681                | DataSourceDesc::Progress
2682                | DataSourceDesc::Webhook { .. } => None,
2683            },
2684            CatalogItem::Table(_)
2685            | CatalogItem::Log(_)
2686            | CatalogItem::View(_)
2687            | CatalogItem::MaterializedView(_)
2688            | CatalogItem::Sink(_)
2689            | CatalogItem::Index(_)
2690            | CatalogItem::Type(_)
2691            | CatalogItem::Func(_)
2692            | CatalogItem::Secret(_)
2693            | CatalogItem::Connection(_)
2694            | CatalogItem::ContinualTask(_) => None,
2695        }
2696    }
2697
2698    /// Reports whether this catalog entry is a sink.
2699    pub fn is_sink(&self) -> bool {
2700        matches!(self.item(), CatalogItem::Sink(_))
2701    }
2702
2703    /// Reports whether this catalog entry is a materialized view.
2704    pub fn is_materialized_view(&self) -> bool {
2705        matches!(self.item(), CatalogItem::MaterializedView(_))
2706    }
2707
2708    /// Reports whether this catalog entry is a view.
2709    pub fn is_view(&self) -> bool {
2710        matches!(self.item(), CatalogItem::View(_))
2711    }
2712
2713    /// Reports whether this catalog entry is a secret.
2714    pub fn is_secret(&self) -> bool {
2715        matches!(self.item(), CatalogItem::Secret(_))
2716    }
2717
2718    /// Reports whether this catalog entry is an introspection source.
2719    pub fn is_introspection_source(&self) -> bool {
2720        matches!(self.item(), CatalogItem::Log(_))
2721    }
2722
2723    /// Reports whether this catalog entry is an index.
2724    pub fn is_index(&self) -> bool {
2725        matches!(self.item(), CatalogItem::Index(_))
2726    }
2727
2728    /// Reports whether this catalog entry is a continual task.
2729    pub fn is_continual_task(&self) -> bool {
2730        matches!(self.item(), CatalogItem::ContinualTask(_))
2731    }
2732
2733    /// Reports whether this catalog entry can be treated as a relation, it can produce rows.
2734    pub fn is_relation(&self) -> bool {
2735        mz_sql::catalog::ObjectType::from(self.item_type()).is_relation()
2736    }
2737
2738    /// Collects the identifiers of the objects that were encountered when
2739    /// resolving names in the item's DDL statement.
2740    pub fn references(&self) -> &ResolvedIds {
2741        self.item.references()
2742    }
2743
2744    /// Collects the identifiers of the objects used by this [`CatalogEntry`].
2745    ///
2746    /// Like [`CatalogEntry::references()`] but also includes objects that are not directly
2747    /// referenced. For example this will include any catalog objects used to implement functions
2748    /// and casts in the item.
2749    pub fn uses(&self) -> BTreeSet<CatalogItemId> {
2750        self.item.uses()
2751    }
2752
2753    /// Returns the `CatalogItem` associated with this catalog entry.
2754    pub fn item(&self) -> &CatalogItem {
2755        &self.item
2756    }
2757
2758    /// Returns the [`CatalogItemId`] of this catalog entry.
2759    pub fn id(&self) -> CatalogItemId {
2760        self.id
2761    }
2762
2763    /// Returns all of the [`GlobalId`]s associated with this item.
2764    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2765        self.item().global_ids()
2766    }
2767
2768    pub fn latest_global_id(&self) -> GlobalId {
2769        self.item().latest_global_id()
2770    }
2771
2772    /// Returns the OID of this catalog entry.
2773    pub fn oid(&self) -> u32 {
2774        self.oid
2775    }
2776
2777    /// Returns the fully qualified name of this catalog entry.
2778    pub fn name(&self) -> &QualifiedItemName {
2779        &self.name
2780    }
2781
2782    /// Returns the identifiers of the dataflows that are directly referenced by this dataflow.
2783    pub fn referenced_by(&self) -> &[CatalogItemId] {
2784        &self.referenced_by
2785    }
2786
2787    /// Returns the identifiers of the dataflows that depend upon this dataflow.
2788    pub fn used_by(&self) -> &[CatalogItemId] {
2789        &self.used_by
2790    }
2791
2792    /// Returns the connection ID that this item belongs to, if this item is
2793    /// temporary.
2794    pub fn conn_id(&self) -> Option<&ConnectionId> {
2795        self.item.conn_id()
2796    }
2797
2798    /// Returns the role ID of the entry owner.
2799    pub fn owner_id(&self) -> &RoleId {
2800        &self.owner_id
2801    }
2802
2803    /// Returns the privileges of the entry.
2804    pub fn privileges(&self) -> &PrivilegeMap {
2805        &self.privileges
2806    }
2807}
2808
2809#[derive(Debug, Clone, Default)]
2810pub struct CommentsMap {
2811    map: BTreeMap<CommentObjectId, BTreeMap<Option<usize>, String>>,
2812}
2813
2814impl CommentsMap {
2815    pub fn update_comment(
2816        &mut self,
2817        object_id: CommentObjectId,
2818        sub_component: Option<usize>,
2819        comment: Option<String>,
2820    ) -> Option<String> {
2821        let object_comments = self.map.entry(object_id).or_default();
2822
2823        // Either replace the existing comment, or remove it if comment is None/NULL.
2824        let (empty, prev) = if let Some(comment) = comment {
2825            let prev = object_comments.insert(sub_component, comment);
2826            (false, prev)
2827        } else {
2828            let prev = object_comments.remove(&sub_component);
2829            (object_comments.is_empty(), prev)
2830        };
2831
2832        // Cleanup entries that are now empty.
2833        if empty {
2834            self.map.remove(&object_id);
2835        }
2836
2837        // Return the previous comment, if there was one, for easy removal.
2838        prev
2839    }
2840
2841    /// Remove all comments for `object_id` from the map.
2842    ///
2843    /// Generally there is one comment for a given [`CommentObjectId`], but in the case of
2844    /// relations you can also have comments on the individual columns. Dropping the comments for a
2845    /// relation will also drop all of the comments on any columns.
2846    pub fn drop_comments(
2847        &mut self,
2848        object_ids: &BTreeSet<CommentObjectId>,
2849    ) -> Vec<(CommentObjectId, Option<usize>, String)> {
2850        let mut removed_comments = Vec::new();
2851
2852        for object_id in object_ids {
2853            if let Some(comments) = self.map.remove(object_id) {
2854                let removed = comments
2855                    .into_iter()
2856                    .map(|(sub_comp, comment)| (object_id.clone(), sub_comp, comment));
2857                removed_comments.extend(removed);
2858            }
2859        }
2860
2861        removed_comments
2862    }
2863
2864    pub fn iter(&self) -> impl Iterator<Item = (CommentObjectId, Option<usize>, &str)> {
2865        self.map
2866            .iter()
2867            .map(|(id, comments)| {
2868                comments
2869                    .iter()
2870                    .map(|(pos, comment)| (*id, *pos, comment.as_str()))
2871            })
2872            .flatten()
2873    }
2874
2875    pub fn get_object_comments(
2876        &self,
2877        object_id: CommentObjectId,
2878    ) -> Option<&BTreeMap<Option<usize>, String>> {
2879        self.map.get(&object_id)
2880    }
2881}
2882
2883impl Serialize for CommentsMap {
2884    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2885    where
2886        S: serde::Serializer,
2887    {
2888        let comment_count = self
2889            .map
2890            .iter()
2891            .map(|(_object_id, comments)| comments.len())
2892            .sum();
2893
2894        let mut seq = serializer.serialize_seq(Some(comment_count))?;
2895        for (object_id, sub) in &self.map {
2896            for (sub_component, comment) in sub {
2897                seq.serialize_element(&(
2898                    format!("{object_id:?}"),
2899                    format!("{sub_component:?}"),
2900                    comment,
2901                ))?;
2902            }
2903        }
2904        seq.end()
2905    }
2906}
2907
2908#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
2909pub struct DefaultPrivileges {
2910    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
2911    privileges: BTreeMap<DefaultPrivilegeObject, RoleDefaultPrivileges>,
2912}
2913
2914// Use a new type here because otherwise we have two levels of BTreeMap, both needing
2915// map_key_to_string.
2916#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
2917struct RoleDefaultPrivileges(
2918    /// Denormalized, the key is the grantee Role.
2919    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
2920    BTreeMap<RoleId, DefaultPrivilegeAclItem>,
2921);
2922
2923impl Deref for RoleDefaultPrivileges {
2924    type Target = BTreeMap<RoleId, DefaultPrivilegeAclItem>;
2925
2926    fn deref(&self) -> &Self::Target {
2927        &self.0
2928    }
2929}
2930
2931impl DerefMut for RoleDefaultPrivileges {
2932    fn deref_mut(&mut self) -> &mut Self::Target {
2933        &mut self.0
2934    }
2935}
2936
2937impl DefaultPrivileges {
2938    /// Add a new default privilege into the set of all default privileges.
2939    pub fn grant(&mut self, object: DefaultPrivilegeObject, privilege: DefaultPrivilegeAclItem) {
2940        if privilege.acl_mode.is_empty() {
2941            return;
2942        }
2943
2944        let privileges = self.privileges.entry(object).or_default();
2945        if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
2946            default_privilege.acl_mode |= privilege.acl_mode;
2947        } else {
2948            privileges.insert(privilege.grantee, privilege);
2949        }
2950    }
2951
2952    /// Revoke a default privilege from the set of all default privileges.
2953    pub fn revoke(&mut self, object: &DefaultPrivilegeObject, privilege: &DefaultPrivilegeAclItem) {
2954        if let Some(privileges) = self.privileges.get_mut(object) {
2955            if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
2956                default_privilege.acl_mode =
2957                    default_privilege.acl_mode.difference(privilege.acl_mode);
2958                if default_privilege.acl_mode.is_empty() {
2959                    privileges.remove(&privilege.grantee);
2960                }
2961            }
2962            if privileges.is_empty() {
2963                self.privileges.remove(object);
2964            }
2965        }
2966    }
2967
2968    /// Get the privileges that will be granted on all objects matching `object` to `grantee`, if
2969    /// any exist.
2970    pub fn get_privileges_for_grantee(
2971        &self,
2972        object: &DefaultPrivilegeObject,
2973        grantee: &RoleId,
2974    ) -> Option<&AclMode> {
2975        self.privileges
2976            .get(object)
2977            .and_then(|privileges| privileges.get(grantee))
2978            .map(|privilege| &privilege.acl_mode)
2979    }
2980
2981    /// Get all default privileges that apply to the provided object details.
2982    pub fn get_applicable_privileges(
2983        &self,
2984        role_id: RoleId,
2985        database_id: Option<DatabaseId>,
2986        schema_id: Option<SchemaId>,
2987        object_type: mz_sql::catalog::ObjectType,
2988    ) -> impl Iterator<Item = DefaultPrivilegeAclItem> + '_ {
2989        // Privileges consider all relations to be of type table due to PostgreSQL compatibility. We
2990        // don't require the caller to worry about that and we will map their `object_type` to the
2991        // correct type for privileges.
2992        let privilege_object_type = if object_type.is_relation() {
2993            mz_sql::catalog::ObjectType::Table
2994        } else {
2995            object_type
2996        };
2997        let valid_acl_mode = rbac::all_object_privileges(SystemObjectType::Object(object_type));
2998
2999        // Collect all entries that apply to the provided object details.
3000        // If either `database_id` or `schema_id` are `None`, then we might end up with duplicate
3001        // entries in the vec below. That's OK because we consolidate the results after.
3002        [
3003            DefaultPrivilegeObject {
3004                role_id,
3005                database_id,
3006                schema_id,
3007                object_type: privilege_object_type,
3008            },
3009            DefaultPrivilegeObject {
3010                role_id,
3011                database_id,
3012                schema_id: None,
3013                object_type: privilege_object_type,
3014            },
3015            DefaultPrivilegeObject {
3016                role_id,
3017                database_id: None,
3018                schema_id: None,
3019                object_type: privilege_object_type,
3020            },
3021            DefaultPrivilegeObject {
3022                role_id: RoleId::Public,
3023                database_id,
3024                schema_id,
3025                object_type: privilege_object_type,
3026            },
3027            DefaultPrivilegeObject {
3028                role_id: RoleId::Public,
3029                database_id,
3030                schema_id: None,
3031                object_type: privilege_object_type,
3032            },
3033            DefaultPrivilegeObject {
3034                role_id: RoleId::Public,
3035                database_id: None,
3036                schema_id: None,
3037                object_type: privilege_object_type,
3038            },
3039        ]
3040        .into_iter()
3041        .filter_map(|object| self.privileges.get(&object))
3042        .flat_map(|acl_map| acl_map.values())
3043        // Consolidate privileges with a common grantee.
3044        .fold(
3045            BTreeMap::new(),
3046            |mut accum, DefaultPrivilegeAclItem { grantee, acl_mode }| {
3047                let accum_acl_mode = accum.entry(grantee).or_insert_with(AclMode::empty);
3048                *accum_acl_mode |= *acl_mode;
3049                accum
3050            },
3051        )
3052        .into_iter()
3053        // Restrict the acl_mode to only privileges valid for the provided object type. If the
3054        // default privilege has an object type of Table, then it may contain privileges valid for
3055        // tables but not other relations. If the passed in object type is another relation, then
3056        // we need to remove any privilege that is not valid for the specified relation.
3057        .map(move |(grantee, acl_mode)| (grantee, acl_mode & valid_acl_mode))
3058        // Filter out empty privileges.
3059        .filter(|(_, acl_mode)| !acl_mode.is_empty())
3060        .map(|(grantee, acl_mode)| DefaultPrivilegeAclItem {
3061            grantee: *grantee,
3062            acl_mode,
3063        })
3064    }
3065
3066    pub fn iter(
3067        &self,
3068    ) -> impl Iterator<
3069        Item = (
3070            &DefaultPrivilegeObject,
3071            impl Iterator<Item = &DefaultPrivilegeAclItem>,
3072        ),
3073    > {
3074        self.privileges
3075            .iter()
3076            .map(|(object, acl_map)| (object, acl_map.values()))
3077    }
3078}
3079
3080#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3081pub struct ClusterConfig {
3082    pub variant: ClusterVariant,
3083    pub workload_class: Option<String>,
3084}
3085
3086impl ClusterConfig {
3087    pub fn features(&self) -> Option<&OptimizerFeatureOverrides> {
3088        match &self.variant {
3089            ClusterVariant::Managed(managed) => Some(&managed.optimizer_feature_overrides),
3090            ClusterVariant::Unmanaged => None,
3091        }
3092    }
3093}
3094
3095impl From<ClusterConfig> for durable::ClusterConfig {
3096    fn from(config: ClusterConfig) -> Self {
3097        Self {
3098            variant: config.variant.into(),
3099            workload_class: config.workload_class,
3100        }
3101    }
3102}
3103
3104impl From<durable::ClusterConfig> for ClusterConfig {
3105    fn from(config: durable::ClusterConfig) -> Self {
3106        Self {
3107            variant: config.variant.into(),
3108            workload_class: config.workload_class,
3109        }
3110    }
3111}
3112
3113#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3114pub struct ClusterVariantManaged {
3115    pub size: String,
3116    pub availability_zones: Vec<String>,
3117    pub logging: ReplicaLogging,
3118    pub replication_factor: u32,
3119    pub optimizer_feature_overrides: OptimizerFeatureOverrides,
3120    pub schedule: ClusterSchedule,
3121}
3122
3123impl From<ClusterVariantManaged> for durable::ClusterVariantManaged {
3124    fn from(managed: ClusterVariantManaged) -> Self {
3125        Self {
3126            size: managed.size,
3127            availability_zones: managed.availability_zones,
3128            logging: managed.logging,
3129            replication_factor: managed.replication_factor,
3130            optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3131            schedule: managed.schedule,
3132        }
3133    }
3134}
3135
3136impl From<durable::ClusterVariantManaged> for ClusterVariantManaged {
3137    fn from(managed: durable::ClusterVariantManaged) -> Self {
3138        Self {
3139            size: managed.size,
3140            availability_zones: managed.availability_zones,
3141            logging: managed.logging,
3142            replication_factor: managed.replication_factor,
3143            optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3144            schedule: managed.schedule,
3145        }
3146    }
3147}
3148
3149#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3150pub enum ClusterVariant {
3151    Managed(ClusterVariantManaged),
3152    Unmanaged,
3153}
3154
3155impl From<ClusterVariant> for durable::ClusterVariant {
3156    fn from(variant: ClusterVariant) -> Self {
3157        match variant {
3158            ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3159            ClusterVariant::Unmanaged => Self::Unmanaged,
3160        }
3161    }
3162}
3163
3164impl From<durable::ClusterVariant> for ClusterVariant {
3165    fn from(variant: durable::ClusterVariant) -> Self {
3166        match variant {
3167            durable::ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3168            durable::ClusterVariant::Unmanaged => Self::Unmanaged,
3169        }
3170    }
3171}
3172
3173impl mz_sql::catalog::CatalogDatabase for Database {
3174    fn name(&self) -> &str {
3175        &self.name
3176    }
3177
3178    fn id(&self) -> DatabaseId {
3179        self.id
3180    }
3181
3182    fn has_schemas(&self) -> bool {
3183        !self.schemas_by_name.is_empty()
3184    }
3185
3186    fn schema_ids(&self) -> &BTreeMap<String, SchemaId> {
3187        &self.schemas_by_name
3188    }
3189
3190    // `as` is ok to use to cast to a trait object.
3191    #[allow(clippy::as_conversions)]
3192    fn schemas(&self) -> Vec<&dyn CatalogSchema> {
3193        self.schemas_by_id
3194            .values()
3195            .map(|schema| schema as &dyn CatalogSchema)
3196            .collect()
3197    }
3198
3199    fn owner_id(&self) -> RoleId {
3200        self.owner_id
3201    }
3202
3203    fn privileges(&self) -> &PrivilegeMap {
3204        &self.privileges
3205    }
3206}
3207
3208impl mz_sql::catalog::CatalogSchema for Schema {
3209    fn database(&self) -> &ResolvedDatabaseSpecifier {
3210        &self.name.database
3211    }
3212
3213    fn name(&self) -> &QualifiedSchemaName {
3214        &self.name
3215    }
3216
3217    fn id(&self) -> &SchemaSpecifier {
3218        &self.id
3219    }
3220
3221    fn has_items(&self) -> bool {
3222        !self.items.is_empty()
3223    }
3224
3225    fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_> {
3226        Box::new(
3227            self.items
3228                .values()
3229                .chain(self.functions.values())
3230                .chain(self.types.values())
3231                .copied(),
3232        )
3233    }
3234
3235    fn owner_id(&self) -> RoleId {
3236        self.owner_id
3237    }
3238
3239    fn privileges(&self) -> &PrivilegeMap {
3240        &self.privileges
3241    }
3242}
3243
3244impl mz_sql::catalog::CatalogRole for Role {
3245    fn name(&self) -> &str {
3246        &self.name
3247    }
3248
3249    fn id(&self) -> RoleId {
3250        self.id
3251    }
3252
3253    fn membership(&self) -> &BTreeMap<RoleId, RoleId> {
3254        &self.membership.map
3255    }
3256
3257    fn attributes(&self) -> &RoleAttributes {
3258        &self.attributes
3259    }
3260
3261    fn vars(&self) -> &BTreeMap<String, OwnedVarInput> {
3262        &self.vars.map
3263    }
3264}
3265
3266impl mz_sql::catalog::CatalogNetworkPolicy for NetworkPolicy {
3267    fn name(&self) -> &str {
3268        &self.name
3269    }
3270
3271    fn id(&self) -> NetworkPolicyId {
3272        self.id
3273    }
3274
3275    fn owner_id(&self) -> RoleId {
3276        self.owner_id
3277    }
3278
3279    fn privileges(&self) -> &PrivilegeMap {
3280        &self.privileges
3281    }
3282}
3283
3284impl mz_sql::catalog::CatalogCluster<'_> for Cluster {
3285    fn name(&self) -> &str {
3286        &self.name
3287    }
3288
3289    fn id(&self) -> ClusterId {
3290        self.id
3291    }
3292
3293    fn bound_objects(&self) -> &BTreeSet<CatalogItemId> {
3294        &self.bound_objects
3295    }
3296
3297    fn replica_ids(&self) -> &BTreeMap<String, ReplicaId> {
3298        &self.replica_id_by_name_
3299    }
3300
3301    // `as` is ok to use to cast to a trait object.
3302    #[allow(clippy::as_conversions)]
3303    fn replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>> {
3304        self.replicas()
3305            .map(|replica| replica as &dyn CatalogClusterReplica)
3306            .collect()
3307    }
3308
3309    fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica<'_> {
3310        self.replica(id).expect("catalog out of sync")
3311    }
3312
3313    fn owner_id(&self) -> RoleId {
3314        self.owner_id
3315    }
3316
3317    fn privileges(&self) -> &PrivilegeMap {
3318        &self.privileges
3319    }
3320
3321    fn is_managed(&self) -> bool {
3322        self.is_managed()
3323    }
3324
3325    fn managed_size(&self) -> Option<&str> {
3326        match &self.config.variant {
3327            ClusterVariant::Managed(ClusterVariantManaged { size, .. }) => Some(size),
3328            _ => None,
3329        }
3330    }
3331
3332    fn schedule(&self) -> Option<&ClusterSchedule> {
3333        match &self.config.variant {
3334            ClusterVariant::Managed(ClusterVariantManaged { schedule, .. }) => Some(schedule),
3335            _ => None,
3336        }
3337    }
3338
3339    fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
3340        self.try_to_plan()
3341    }
3342}
3343
3344impl mz_sql::catalog::CatalogClusterReplica<'_> for ClusterReplica {
3345    fn name(&self) -> &str {
3346        &self.name
3347    }
3348
3349    fn cluster_id(&self) -> ClusterId {
3350        self.cluster_id
3351    }
3352
3353    fn replica_id(&self) -> ReplicaId {
3354        self.replica_id
3355    }
3356
3357    fn owner_id(&self) -> RoleId {
3358        self.owner_id
3359    }
3360
3361    fn internal(&self) -> bool {
3362        self.config.location.internal()
3363    }
3364}
3365
3366impl mz_sql::catalog::CatalogItem for CatalogEntry {
3367    fn name(&self) -> &QualifiedItemName {
3368        self.name()
3369    }
3370
3371    fn id(&self) -> CatalogItemId {
3372        self.id()
3373    }
3374
3375    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
3376        Box::new(self.global_ids())
3377    }
3378
3379    fn oid(&self) -> u32 {
3380        self.oid()
3381    }
3382
3383    fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
3384        self.func()
3385    }
3386
3387    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
3388        self.source_desc()
3389    }
3390
3391    fn connection(
3392        &self,
3393    ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
3394    {
3395        Ok(self.connection()?.details.to_connection())
3396    }
3397
3398    fn create_sql(&self) -> &str {
3399        match self.item() {
3400            CatalogItem::Table(Table { create_sql, .. }) => {
3401                create_sql.as_deref().unwrap_or("<builtin>")
3402            }
3403            CatalogItem::Source(Source { create_sql, .. }) => {
3404                create_sql.as_deref().unwrap_or("<builtin>")
3405            }
3406            CatalogItem::Sink(Sink { create_sql, .. }) => create_sql,
3407            CatalogItem::View(View { create_sql, .. }) => create_sql,
3408            CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) => create_sql,
3409            CatalogItem::Index(Index { create_sql, .. }) => create_sql,
3410            CatalogItem::Type(Type { create_sql, .. }) => {
3411                create_sql.as_deref().unwrap_or("<builtin>")
3412            }
3413            CatalogItem::Secret(Secret { create_sql, .. }) => create_sql,
3414            CatalogItem::Connection(Connection { create_sql, .. }) => create_sql,
3415            CatalogItem::Func(_) => "<builtin>",
3416            CatalogItem::Log(_) => "<builtin>",
3417            CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => create_sql,
3418        }
3419    }
3420
3421    fn item_type(&self) -> SqlCatalogItemType {
3422        self.item().typ()
3423    }
3424
3425    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
3426        if let CatalogItem::Index(Index { keys, on, .. }) = self.item() {
3427            Some((keys, *on))
3428        } else {
3429            None
3430        }
3431    }
3432
3433    fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
3434        if let CatalogItem::Table(Table {
3435            data_source: TableDataSource::TableWrites { defaults },
3436            ..
3437        }) = self.item()
3438        {
3439            Some(defaults.as_slice())
3440        } else {
3441            None
3442        }
3443    }
3444
3445    fn replacement_target(&self) -> Option<CatalogItemId> {
3446        if let CatalogItem::MaterializedView(mv) = self.item() {
3447            mv.replacement_target
3448        } else {
3449            None
3450        }
3451    }
3452
3453    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
3454        if let CatalogItem::Type(Type { details, .. }) = self.item() {
3455            Some(details)
3456        } else {
3457            None
3458        }
3459    }
3460
3461    fn references(&self) -> &ResolvedIds {
3462        self.references()
3463    }
3464
3465    fn uses(&self) -> BTreeSet<CatalogItemId> {
3466        self.uses()
3467    }
3468
3469    fn referenced_by(&self) -> &[CatalogItemId] {
3470        self.referenced_by()
3471    }
3472
3473    fn used_by(&self) -> &[CatalogItemId] {
3474        self.used_by()
3475    }
3476
3477    fn subsource_details(
3478        &self,
3479    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
3480        self.subsource_details()
3481    }
3482
3483    fn source_export_details(
3484        &self,
3485    ) -> Option<(
3486        CatalogItemId,
3487        &UnresolvedItemName,
3488        &SourceExportDetails,
3489        &SourceExportDataConfig<ReferencedConnection>,
3490    )> {
3491        self.source_export_details()
3492    }
3493
3494    fn is_progress_source(&self) -> bool {
3495        self.is_progress_source()
3496    }
3497
3498    fn progress_id(&self) -> Option<CatalogItemId> {
3499        self.progress_id()
3500    }
3501
3502    fn owner_id(&self) -> RoleId {
3503        self.owner_id
3504    }
3505
3506    fn privileges(&self) -> &PrivilegeMap {
3507        &self.privileges
3508    }
3509
3510    fn cluster_id(&self) -> Option<ClusterId> {
3511        self.item().cluster_id()
3512    }
3513
3514    fn at_version(
3515        &self,
3516        version: RelationVersionSelector,
3517    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
3518        Box::new(CatalogCollectionEntry {
3519            entry: self.clone(),
3520            version,
3521        })
3522    }
3523
3524    fn latest_version(&self) -> Option<RelationVersion> {
3525        self.table().map(|t| t.desc.latest_version())
3526    }
3527}
3528
3529/// A single update to the catalog state.
3530#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3531pub struct StateUpdate {
3532    pub kind: StateUpdateKind,
3533    pub ts: Timestamp,
3534    pub diff: StateDiff,
3535}
3536
3537/// The contents of a single state update.
3538///
3539/// Variants are listed in dependency order.
3540#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3541pub enum StateUpdateKind {
3542    Role(durable::objects::Role),
3543    RoleAuth(durable::objects::RoleAuth),
3544    Database(durable::objects::Database),
3545    Schema(durable::objects::Schema),
3546    DefaultPrivilege(durable::objects::DefaultPrivilege),
3547    SystemPrivilege(MzAclItem),
3548    SystemConfiguration(durable::objects::SystemConfiguration),
3549    Cluster(durable::objects::Cluster),
3550    NetworkPolicy(durable::objects::NetworkPolicy),
3551    IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3552    ClusterReplica(durable::objects::ClusterReplica),
3553    SourceReferences(durable::objects::SourceReferences),
3554    SystemObjectMapping(durable::objects::SystemObjectMapping),
3555    // Temporary items are not actually updated via the durable catalog, but
3556    // this allows us to model them the same way as all other items in parts of
3557    // the pipeline.
3558    TemporaryItem(TemporaryItem),
3559    Item(durable::objects::Item),
3560    Comment(durable::objects::Comment),
3561    AuditLog(durable::objects::AuditLog),
3562    // Storage updates.
3563    StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3564    UnfinalizedShard(durable::objects::UnfinalizedShard),
3565}
3566
3567/// Valid diffs for catalog state updates.
3568#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
3569pub enum StateDiff {
3570    Retraction,
3571    Addition,
3572}
3573
3574impl From<StateDiff> for Diff {
3575    fn from(diff: StateDiff) -> Self {
3576        match diff {
3577            StateDiff::Retraction => Diff::MINUS_ONE,
3578            StateDiff::Addition => Diff::ONE,
3579        }
3580    }
3581}
3582impl TryFrom<Diff> for StateDiff {
3583    type Error = String;
3584
3585    fn try_from(diff: Diff) -> Result<Self, Self::Error> {
3586        match diff {
3587            Diff::MINUS_ONE => Ok(Self::Retraction),
3588            Diff::ONE => Ok(Self::Addition),
3589            diff => Err(format!("invalid diff {diff}")),
3590        }
3591    }
3592}
3593
3594/// Information needed to process an update to a temporary item.
3595#[derive(Debug, Clone, Ord, PartialOrd, PartialEq, Eq)]
3596pub struct TemporaryItem {
3597    pub id: CatalogItemId,
3598    pub oid: u32,
3599    pub global_id: GlobalId,
3600    pub schema_id: SchemaId,
3601    pub name: String,
3602    pub conn_id: Option<ConnectionId>,
3603    pub create_sql: String,
3604    pub owner_id: RoleId,
3605    pub privileges: Vec<MzAclItem>,
3606    pub extra_versions: BTreeMap<RelationVersion, GlobalId>,
3607}
3608
3609impl From<CatalogEntry> for TemporaryItem {
3610    fn from(entry: CatalogEntry) -> Self {
3611        let conn_id = entry.conn_id().cloned();
3612        let (create_sql, global_id, extra_versions) = entry.item.to_serialized();
3613
3614        TemporaryItem {
3615            id: entry.id,
3616            oid: entry.oid,
3617            global_id,
3618            schema_id: entry.name.qualifiers.schema_spec.into(),
3619            name: entry.name.item,
3620            conn_id,
3621            create_sql,
3622            owner_id: entry.owner_id,
3623            privileges: entry.privileges.into_all_values().collect(),
3624            extra_versions,
3625        }
3626    }
3627}
3628
3629impl TemporaryItem {
3630    pub fn item_type(&self) -> CatalogItemType {
3631        item_type(&self.create_sql)
3632    }
3633}
3634
3635/// The same as [`StateUpdateKind`], but without `TemporaryItem` so we can derive [`Ord`].
3636#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
3637pub enum BootstrapStateUpdateKind {
3638    Role(durable::objects::Role),
3639    RoleAuth(durable::objects::RoleAuth),
3640    Database(durable::objects::Database),
3641    Schema(durable::objects::Schema),
3642    DefaultPrivilege(durable::objects::DefaultPrivilege),
3643    SystemPrivilege(MzAclItem),
3644    SystemConfiguration(durable::objects::SystemConfiguration),
3645    Cluster(durable::objects::Cluster),
3646    NetworkPolicy(durable::objects::NetworkPolicy),
3647    IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3648    ClusterReplica(durable::objects::ClusterReplica),
3649    SourceReferences(durable::objects::SourceReferences),
3650    SystemObjectMapping(durable::objects::SystemObjectMapping),
3651    Item(durable::objects::Item),
3652    Comment(durable::objects::Comment),
3653    AuditLog(durable::objects::AuditLog),
3654    // Storage updates.
3655    StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3656    UnfinalizedShard(durable::objects::UnfinalizedShard),
3657}
3658
3659impl From<BootstrapStateUpdateKind> for StateUpdateKind {
3660    fn from(value: BootstrapStateUpdateKind) -> Self {
3661        match value {
3662            BootstrapStateUpdateKind::Role(kind) => StateUpdateKind::Role(kind),
3663            BootstrapStateUpdateKind::RoleAuth(kind) => StateUpdateKind::RoleAuth(kind),
3664            BootstrapStateUpdateKind::Database(kind) => StateUpdateKind::Database(kind),
3665            BootstrapStateUpdateKind::Schema(kind) => StateUpdateKind::Schema(kind),
3666            BootstrapStateUpdateKind::DefaultPrivilege(kind) => {
3667                StateUpdateKind::DefaultPrivilege(kind)
3668            }
3669            BootstrapStateUpdateKind::SystemPrivilege(kind) => {
3670                StateUpdateKind::SystemPrivilege(kind)
3671            }
3672            BootstrapStateUpdateKind::SystemConfiguration(kind) => {
3673                StateUpdateKind::SystemConfiguration(kind)
3674            }
3675            BootstrapStateUpdateKind::SourceReferences(kind) => {
3676                StateUpdateKind::SourceReferences(kind)
3677            }
3678            BootstrapStateUpdateKind::Cluster(kind) => StateUpdateKind::Cluster(kind),
3679            BootstrapStateUpdateKind::NetworkPolicy(kind) => StateUpdateKind::NetworkPolicy(kind),
3680            BootstrapStateUpdateKind::IntrospectionSourceIndex(kind) => {
3681                StateUpdateKind::IntrospectionSourceIndex(kind)
3682            }
3683            BootstrapStateUpdateKind::ClusterReplica(kind) => StateUpdateKind::ClusterReplica(kind),
3684            BootstrapStateUpdateKind::SystemObjectMapping(kind) => {
3685                StateUpdateKind::SystemObjectMapping(kind)
3686            }
3687            BootstrapStateUpdateKind::Item(kind) => StateUpdateKind::Item(kind),
3688            BootstrapStateUpdateKind::Comment(kind) => StateUpdateKind::Comment(kind),
3689            BootstrapStateUpdateKind::AuditLog(kind) => StateUpdateKind::AuditLog(kind),
3690            BootstrapStateUpdateKind::StorageCollectionMetadata(kind) => {
3691                StateUpdateKind::StorageCollectionMetadata(kind)
3692            }
3693            BootstrapStateUpdateKind::UnfinalizedShard(kind) => {
3694                StateUpdateKind::UnfinalizedShard(kind)
3695            }
3696        }
3697    }
3698}
3699
3700impl TryFrom<StateUpdateKind> for BootstrapStateUpdateKind {
3701    type Error = TemporaryItem;
3702
3703    fn try_from(value: StateUpdateKind) -> Result<Self, Self::Error> {
3704        match value {
3705            StateUpdateKind::Role(kind) => Ok(BootstrapStateUpdateKind::Role(kind)),
3706            StateUpdateKind::RoleAuth(kind) => Ok(BootstrapStateUpdateKind::RoleAuth(kind)),
3707            StateUpdateKind::Database(kind) => Ok(BootstrapStateUpdateKind::Database(kind)),
3708            StateUpdateKind::Schema(kind) => Ok(BootstrapStateUpdateKind::Schema(kind)),
3709            StateUpdateKind::DefaultPrivilege(kind) => {
3710                Ok(BootstrapStateUpdateKind::DefaultPrivilege(kind))
3711            }
3712            StateUpdateKind::SystemPrivilege(kind) => {
3713                Ok(BootstrapStateUpdateKind::SystemPrivilege(kind))
3714            }
3715            StateUpdateKind::SystemConfiguration(kind) => {
3716                Ok(BootstrapStateUpdateKind::SystemConfiguration(kind))
3717            }
3718            StateUpdateKind::Cluster(kind) => Ok(BootstrapStateUpdateKind::Cluster(kind)),
3719            StateUpdateKind::NetworkPolicy(kind) => {
3720                Ok(BootstrapStateUpdateKind::NetworkPolicy(kind))
3721            }
3722            StateUpdateKind::IntrospectionSourceIndex(kind) => {
3723                Ok(BootstrapStateUpdateKind::IntrospectionSourceIndex(kind))
3724            }
3725            StateUpdateKind::ClusterReplica(kind) => {
3726                Ok(BootstrapStateUpdateKind::ClusterReplica(kind))
3727            }
3728            StateUpdateKind::SourceReferences(kind) => {
3729                Ok(BootstrapStateUpdateKind::SourceReferences(kind))
3730            }
3731            StateUpdateKind::SystemObjectMapping(kind) => {
3732                Ok(BootstrapStateUpdateKind::SystemObjectMapping(kind))
3733            }
3734            StateUpdateKind::TemporaryItem(kind) => Err(kind),
3735            StateUpdateKind::Item(kind) => Ok(BootstrapStateUpdateKind::Item(kind)),
3736            StateUpdateKind::Comment(kind) => Ok(BootstrapStateUpdateKind::Comment(kind)),
3737            StateUpdateKind::AuditLog(kind) => Ok(BootstrapStateUpdateKind::AuditLog(kind)),
3738            StateUpdateKind::StorageCollectionMetadata(kind) => {
3739                Ok(BootstrapStateUpdateKind::StorageCollectionMetadata(kind))
3740            }
3741            StateUpdateKind::UnfinalizedShard(kind) => {
3742                Ok(BootstrapStateUpdateKind::UnfinalizedShard(kind))
3743            }
3744        }
3745    }
3746}