mz_catalog/memory/
objects.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The current types used by the in-memory Catalog. Many of the objects in this module are
11//! extremely similar to the objects found in [`crate::durable::objects`] but in a format that is
12//! easier consumed by higher layers.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, LazyLock};
18
19use chrono::{DateTime, Utc};
20use mz_adapter_types::compaction::CompactionWindow;
21use mz_adapter_types::connection::ConnectionId;
22use mz_compute_client::logging::LogVariant;
23use mz_controller::clusters::{ClusterRole, ClusterStatus, ReplicaConfig, ReplicaLogging};
24use mz_controller_types::{ClusterId, ReplicaId};
25use mz_expr::{MirScalarExpr, OptimizedMirRelationExpr};
26use mz_ore::collections::CollectionExt;
27use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
28use mz_repr::network_policy_id::NetworkPolicyId;
29use mz_repr::optimize::OptimizerFeatureOverrides;
30use mz_repr::refresh_schedule::RefreshSchedule;
31use mz_repr::role_id::RoleId;
32use mz_repr::{
33    CatalogItemId, ColumnName, Diff, GlobalId, RelationDesc, RelationVersion,
34    RelationVersionSelector, SqlColumnType, Timestamp, VersionedRelationDesc,
35};
36use mz_sql::ast::display::AstDisplay;
37use mz_sql::ast::{
38    ColumnDef, ColumnOption, ColumnOptionDef, ColumnVersioned, Expr, Raw, RawDataType, Statement,
39    UnresolvedItemName, Value, WithOptionValue,
40};
41use mz_sql::catalog::{
42    CatalogClusterReplica, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem,
43    CatalogItemType as SqlCatalogItemType, CatalogItemType, CatalogSchema, CatalogTypeDetails,
44    DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, RoleAttributes, RoleMembership,
45    RoleVars, SystemObjectType,
46};
47use mz_sql::names::{
48    Aug, CommentObjectId, DatabaseId, DependencyIds, FullItemName, QualifiedItemName,
49    QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier,
50};
51use mz_sql::plan::{
52    ClusterSchedule, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, ConnectionDetails,
53    CreateClusterManagedPlan, CreateClusterPlan, CreateClusterVariant, CreateSourcePlan,
54    HirRelationExpr, NetworkPolicyRule, PlanError, WebhookBodyFormat, WebhookHeaders,
55    WebhookValidation,
56};
57use mz_sql::rbac;
58use mz_sql::session::vars::OwnedVarInput;
59use mz_storage_client::controller::IntrospectionType;
60use mz_storage_types::connections::inline::ReferencedConnection;
61use mz_storage_types::sinks::{SinkEnvelope, StorageSinkConnection};
62use mz_storage_types::sources::load_generator::LoadGenerator;
63use mz_storage_types::sources::{
64    GenericSourceConnection, SourceConnection, SourceDesc, SourceEnvelope, SourceExportDataConfig,
65    SourceExportDetails, Timeline,
66};
67use serde::ser::SerializeSeq;
68use serde::{Deserialize, Serialize};
69use timely::progress::Antichain;
70use tracing::debug;
71
72use crate::builtin::{MZ_CATALOG_SERVER_CLUSTER, MZ_SYSTEM_CLUSTER};
73use crate::durable;
74
75/// Used to update `self` from the input value while consuming the input value.
76pub trait UpdateFrom<T>: From<T> {
77    fn update_from(&mut self, from: T);
78}
79
80#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
81pub struct Database {
82    pub name: String,
83    pub id: DatabaseId,
84    pub oid: u32,
85    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
86    pub schemas_by_id: BTreeMap<SchemaId, Schema>,
87    pub schemas_by_name: BTreeMap<String, SchemaId>,
88    pub owner_id: RoleId,
89    pub privileges: PrivilegeMap,
90}
91
92impl From<Database> for durable::Database {
93    fn from(database: Database) -> durable::Database {
94        durable::Database {
95            id: database.id,
96            oid: database.oid,
97            name: database.name,
98            owner_id: database.owner_id,
99            privileges: database.privileges.into_all_values().collect(),
100        }
101    }
102}
103
104impl From<durable::Database> for Database {
105    fn from(
106        durable::Database {
107            id,
108            oid,
109            name,
110            owner_id,
111            privileges,
112        }: durable::Database,
113    ) -> Database {
114        Database {
115            id,
116            oid,
117            schemas_by_id: BTreeMap::new(),
118            schemas_by_name: BTreeMap::new(),
119            name,
120            owner_id,
121            privileges: PrivilegeMap::from_mz_acl_items(privileges),
122        }
123    }
124}
125
126impl UpdateFrom<durable::Database> for Database {
127    fn update_from(
128        &mut self,
129        durable::Database {
130            id,
131            oid,
132            name,
133            owner_id,
134            privileges,
135        }: durable::Database,
136    ) {
137        self.id = id;
138        self.oid = oid;
139        self.name = name;
140        self.owner_id = owner_id;
141        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
142    }
143}
144
145#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
146pub struct Schema {
147    pub name: QualifiedSchemaName,
148    pub id: SchemaSpecifier,
149    pub oid: u32,
150    pub items: BTreeMap<String, CatalogItemId>,
151    pub functions: BTreeMap<String, CatalogItemId>,
152    pub types: BTreeMap<String, CatalogItemId>,
153    pub owner_id: RoleId,
154    pub privileges: PrivilegeMap,
155}
156
157impl From<Schema> for durable::Schema {
158    fn from(schema: Schema) -> durable::Schema {
159        durable::Schema {
160            id: schema.id.into(),
161            oid: schema.oid,
162            name: schema.name.schema,
163            database_id: schema.name.database.id(),
164            owner_id: schema.owner_id,
165            privileges: schema.privileges.into_all_values().collect(),
166        }
167    }
168}
169
170impl From<durable::Schema> for Schema {
171    fn from(
172        durable::Schema {
173            id,
174            oid,
175            name,
176            database_id,
177            owner_id,
178            privileges,
179        }: durable::Schema,
180    ) -> Schema {
181        Schema {
182            name: QualifiedSchemaName {
183                database: database_id.into(),
184                schema: name,
185            },
186            id: id.into(),
187            oid,
188            items: BTreeMap::new(),
189            functions: BTreeMap::new(),
190            types: BTreeMap::new(),
191            owner_id,
192            privileges: PrivilegeMap::from_mz_acl_items(privileges),
193        }
194    }
195}
196
197impl UpdateFrom<durable::Schema> for Schema {
198    fn update_from(
199        &mut self,
200        durable::Schema {
201            id,
202            oid,
203            name,
204            database_id,
205            owner_id,
206            privileges,
207        }: durable::Schema,
208    ) {
209        self.name = QualifiedSchemaName {
210            database: database_id.into(),
211            schema: name,
212        };
213        self.id = id.into();
214        self.oid = oid;
215        self.owner_id = owner_id;
216        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
217    }
218}
219
220#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
221pub struct Role {
222    pub name: String,
223    pub id: RoleId,
224    pub oid: u32,
225    pub attributes: RoleAttributes,
226    pub membership: RoleMembership,
227    pub vars: RoleVars,
228}
229
230impl Role {
231    pub fn is_user(&self) -> bool {
232        self.id.is_user()
233    }
234
235    pub fn vars<'a>(&'a self) -> impl Iterator<Item = (&'a str, &'a OwnedVarInput)> {
236        self.vars.map.iter().map(|(name, val)| (name.as_str(), val))
237    }
238}
239
240impl From<Role> for durable::Role {
241    fn from(role: Role) -> durable::Role {
242        durable::Role {
243            id: role.id,
244            oid: role.oid,
245            name: role.name,
246            attributes: role.attributes,
247            membership: role.membership,
248            vars: role.vars,
249        }
250    }
251}
252
253impl From<durable::Role> for Role {
254    fn from(
255        durable::Role {
256            id,
257            oid,
258            name,
259            attributes,
260            membership,
261            vars,
262        }: durable::Role,
263    ) -> Self {
264        Role {
265            name,
266            id,
267            oid,
268            attributes,
269            membership,
270            vars,
271        }
272    }
273}
274
275impl UpdateFrom<durable::Role> for Role {
276    fn update_from(
277        &mut self,
278        durable::Role {
279            id,
280            oid,
281            name,
282            attributes,
283            membership,
284            vars,
285        }: durable::Role,
286    ) {
287        self.id = id;
288        self.oid = oid;
289        self.name = name;
290        self.attributes = attributes;
291        self.membership = membership;
292        self.vars = vars;
293    }
294}
295
296#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
297pub struct RoleAuth {
298    pub role_id: RoleId,
299    pub password_hash: Option<String>,
300    pub updated_at: u64,
301}
302
303impl From<RoleAuth> for durable::RoleAuth {
304    fn from(role_auth: RoleAuth) -> durable::RoleAuth {
305        durable::RoleAuth {
306            role_id: role_auth.role_id,
307            password_hash: role_auth.password_hash,
308            updated_at: role_auth.updated_at,
309        }
310    }
311}
312
313impl From<durable::RoleAuth> for RoleAuth {
314    fn from(
315        durable::RoleAuth {
316            role_id,
317            password_hash,
318            updated_at,
319        }: durable::RoleAuth,
320    ) -> RoleAuth {
321        RoleAuth {
322            role_id,
323            password_hash,
324            updated_at,
325        }
326    }
327}
328
329impl UpdateFrom<durable::RoleAuth> for RoleAuth {
330    fn update_from(&mut self, from: durable::RoleAuth) {
331        self.role_id = from.role_id;
332        self.password_hash = from.password_hash;
333    }
334}
335
336#[derive(Debug, Serialize, Clone, PartialEq)]
337pub struct Cluster {
338    pub name: String,
339    pub id: ClusterId,
340    pub config: ClusterConfig,
341    #[serde(skip)]
342    pub log_indexes: BTreeMap<LogVariant, GlobalId>,
343    /// Objects bound to this cluster. Does not include introspection source
344    /// indexes.
345    pub bound_objects: BTreeSet<CatalogItemId>,
346    pub replica_id_by_name_: BTreeMap<String, ReplicaId>,
347    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
348    pub replicas_by_id_: BTreeMap<ReplicaId, ClusterReplica>,
349    pub owner_id: RoleId,
350    pub privileges: PrivilegeMap,
351}
352
353impl Cluster {
354    /// The role of the cluster. Currently used to set alert severity.
355    pub fn role(&self) -> ClusterRole {
356        // NOTE - These roles power monitoring systems. Do not change
357        // them without talking to the cloud or observability groups.
358        if self.name == MZ_SYSTEM_CLUSTER.name {
359            ClusterRole::SystemCritical
360        } else if self.name == MZ_CATALOG_SERVER_CLUSTER.name {
361            ClusterRole::System
362        } else {
363            ClusterRole::User
364        }
365    }
366
367    /// Returns `true` if the cluster is a managed cluster.
368    pub fn is_managed(&self) -> bool {
369        matches!(self.config.variant, ClusterVariant::Managed { .. })
370    }
371
372    /// Lists the user replicas, which are those that do not have the internal flag set.
373    pub fn user_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
374        self.replicas().filter(|r| !r.config.location.internal())
375    }
376
377    /// Lists all replicas in the cluster
378    pub fn replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
379        self.replicas_by_id_.values()
380    }
381
382    /// Lookup a replica by ID.
383    pub fn replica(&self, replica_id: ReplicaId) -> Option<&ClusterReplica> {
384        self.replicas_by_id_.get(&replica_id)
385    }
386
387    /// Lookup a replica ID by name.
388    pub fn replica_id(&self, name: &str) -> Option<ReplicaId> {
389        self.replica_id_by_name_.get(name).copied()
390    }
391
392    /// Returns the availability zones of this cluster, if they exist.
393    pub fn availability_zones(&self) -> Option<&[String]> {
394        match &self.config.variant {
395            ClusterVariant::Managed(managed) => Some(&managed.availability_zones),
396            ClusterVariant::Unmanaged => None,
397        }
398    }
399
400    pub fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
401        let name = self.name.clone();
402        let variant = match &self.config.variant {
403            ClusterVariant::Managed(ClusterVariantManaged {
404                size,
405                availability_zones,
406                logging,
407                replication_factor,
408                optimizer_feature_overrides,
409                schedule,
410            }) => {
411                let introspection = match logging {
412                    ReplicaLogging {
413                        log_logging,
414                        interval: Some(interval),
415                    } => Some(ComputeReplicaIntrospectionConfig {
416                        debugging: *log_logging,
417                        interval: interval.clone(),
418                    }),
419                    ReplicaLogging {
420                        log_logging: _,
421                        interval: None,
422                    } => None,
423                };
424                let compute = ComputeReplicaConfig { introspection };
425                CreateClusterVariant::Managed(CreateClusterManagedPlan {
426                    replication_factor: replication_factor.clone(),
427                    size: size.clone(),
428                    availability_zones: availability_zones.clone(),
429                    compute,
430                    optimizer_feature_overrides: optimizer_feature_overrides.clone(),
431                    schedule: schedule.clone(),
432                })
433            }
434            ClusterVariant::Unmanaged => {
435                // Unmanaged clusters are deprecated, so hopefully we can remove
436                // them before we have to implement this.
437                return Err(PlanError::Unsupported {
438                    feature: "SHOW CREATE for unmanaged clusters".to_string(),
439                    discussion_no: None,
440                });
441            }
442        };
443        let workload_class = self.config.workload_class.clone();
444        Ok(CreateClusterPlan {
445            name,
446            variant,
447            workload_class,
448        })
449    }
450}
451
452impl From<Cluster> for durable::Cluster {
453    fn from(cluster: Cluster) -> durable::Cluster {
454        durable::Cluster {
455            id: cluster.id,
456            name: cluster.name,
457            owner_id: cluster.owner_id,
458            privileges: cluster.privileges.into_all_values().collect(),
459            config: cluster.config.into(),
460        }
461    }
462}
463
464impl From<durable::Cluster> for Cluster {
465    fn from(
466        durable::Cluster {
467            id,
468            name,
469            owner_id,
470            privileges,
471            config,
472        }: durable::Cluster,
473    ) -> Self {
474        Cluster {
475            name: name.clone(),
476            id,
477            bound_objects: BTreeSet::new(),
478            log_indexes: BTreeMap::new(),
479            replica_id_by_name_: BTreeMap::new(),
480            replicas_by_id_: BTreeMap::new(),
481            owner_id,
482            privileges: PrivilegeMap::from_mz_acl_items(privileges),
483            config: config.into(),
484        }
485    }
486}
487
488impl UpdateFrom<durable::Cluster> for Cluster {
489    fn update_from(
490        &mut self,
491        durable::Cluster {
492            id,
493            name,
494            owner_id,
495            privileges,
496            config,
497        }: durable::Cluster,
498    ) {
499        self.id = id;
500        self.name = name;
501        self.owner_id = owner_id;
502        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
503        self.config = config.into();
504    }
505}
506
507#[derive(Debug, Serialize, Clone, PartialEq)]
508pub struct ClusterReplica {
509    pub name: String,
510    pub cluster_id: ClusterId,
511    pub replica_id: ReplicaId,
512    pub config: ReplicaConfig,
513    pub owner_id: RoleId,
514}
515
516impl From<ClusterReplica> for durable::ClusterReplica {
517    fn from(replica: ClusterReplica) -> durable::ClusterReplica {
518        durable::ClusterReplica {
519            cluster_id: replica.cluster_id,
520            replica_id: replica.replica_id,
521            name: replica.name,
522            config: replica.config.into(),
523            owner_id: replica.owner_id,
524        }
525    }
526}
527
528#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
529pub struct ClusterReplicaProcessStatus {
530    pub status: ClusterStatus,
531    pub time: DateTime<Utc>,
532}
533
534#[derive(Debug, Serialize, Clone, PartialEq)]
535pub struct SourceReferences {
536    pub updated_at: u64,
537    pub references: Vec<SourceReference>,
538}
539
540#[derive(Debug, Serialize, Clone, PartialEq)]
541pub struct SourceReference {
542    pub name: String,
543    pub namespace: Option<String>,
544    pub columns: Vec<String>,
545}
546
547impl From<SourceReference> for durable::SourceReference {
548    fn from(source_reference: SourceReference) -> durable::SourceReference {
549        durable::SourceReference {
550            name: source_reference.name,
551            namespace: source_reference.namespace,
552            columns: source_reference.columns,
553        }
554    }
555}
556
557impl SourceReferences {
558    pub fn to_durable(self, source_id: CatalogItemId) -> durable::SourceReferences {
559        durable::SourceReferences {
560            source_id,
561            updated_at: self.updated_at,
562            references: self.references.into_iter().map(Into::into).collect(),
563        }
564    }
565}
566
567impl From<durable::SourceReference> for SourceReference {
568    fn from(source_reference: durable::SourceReference) -> SourceReference {
569        SourceReference {
570            name: source_reference.name,
571            namespace: source_reference.namespace,
572            columns: source_reference.columns,
573        }
574    }
575}
576
577impl From<durable::SourceReferences> for SourceReferences {
578    fn from(source_references: durable::SourceReferences) -> SourceReferences {
579        SourceReferences {
580            updated_at: source_references.updated_at,
581            references: source_references
582                .references
583                .into_iter()
584                .map(|source_reference| source_reference.into())
585                .collect(),
586        }
587    }
588}
589
590impl From<mz_sql::plan::SourceReference> for SourceReference {
591    fn from(source_reference: mz_sql::plan::SourceReference) -> SourceReference {
592        SourceReference {
593            name: source_reference.name,
594            namespace: source_reference.namespace,
595            columns: source_reference.columns,
596        }
597    }
598}
599
600impl From<mz_sql::plan::SourceReferences> for SourceReferences {
601    fn from(source_references: mz_sql::plan::SourceReferences) -> SourceReferences {
602        SourceReferences {
603            updated_at: source_references.updated_at,
604            references: source_references
605                .references
606                .into_iter()
607                .map(|source_reference| source_reference.into())
608                .collect(),
609        }
610    }
611}
612
613impl From<SourceReferences> for mz_sql::plan::SourceReferences {
614    fn from(source_references: SourceReferences) -> mz_sql::plan::SourceReferences {
615        mz_sql::plan::SourceReferences {
616            updated_at: source_references.updated_at,
617            references: source_references
618                .references
619                .into_iter()
620                .map(|source_reference| source_reference.into())
621                .collect(),
622        }
623    }
624}
625
626impl From<SourceReference> for mz_sql::plan::SourceReference {
627    fn from(source_reference: SourceReference) -> mz_sql::plan::SourceReference {
628        mz_sql::plan::SourceReference {
629            name: source_reference.name,
630            namespace: source_reference.namespace,
631            columns: source_reference.columns,
632        }
633    }
634}
635
636#[derive(Clone, Debug, Serialize)]
637pub struct CatalogEntry {
638    pub item: CatalogItem,
639    #[serde(skip)]
640    pub referenced_by: Vec<CatalogItemId>,
641    // TODO(database-issues#7922)––this should have an invariant tied to it that all
642    // dependents (i.e. entries in this field) have IDs greater than this
643    // entry's ID.
644    #[serde(skip)]
645    pub used_by: Vec<CatalogItemId>,
646    pub id: CatalogItemId,
647    pub oid: u32,
648    pub name: QualifiedItemName,
649    pub owner_id: RoleId,
650    pub privileges: PrivilegeMap,
651}
652
653/// A [`CatalogEntry`] that is associated with a specific "collection" of data.
654/// A single item in the catalog may be associated with multiple "collections".
655///
656/// Here "collection" generally means a pTVC, e.g. a Persist Shard, an Index, a
657/// currently running dataflow, etc.
658///
659/// Items in the Catalog have a stable name -> ID mapping, in other words for
660/// the entire lifetime of an object its [`CatalogItemId`] will _never_ change.
661/// Similarly, we need to maintain a stable mapping from [`GlobalId`] to pTVC.
662/// This presents a challenge when `ALTER`-ing an object, e.g. adding columns
663/// to a table. We can't just change the schema of the underlying Persist Shard
664/// because that would be rebinding the [`GlobalId`] of the pTVC. Instead we
665/// allocate a new [`GlobalId`] to refer to the new version of the table, and
666/// then the [`CatalogEntry`] tracks the [`GlobalId`] for each version.
667///
668/// TODO(ct): Add a note here if we end up using this for associating continual
669/// tasks with a single catalog item.
670#[derive(Clone, Debug)]
671pub struct CatalogCollectionEntry {
672    pub entry: CatalogEntry,
673    pub version: RelationVersionSelector,
674}
675
676impl CatalogCollectionEntry {
677    pub fn desc(&self, name: &FullItemName) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
678        self.item().desc(name, self.version)
679    }
680
681    pub fn desc_opt(&self) -> Option<Cow<'_, RelationDesc>> {
682        self.item().desc_opt(self.version)
683    }
684}
685
686impl mz_sql::catalog::CatalogCollectionItem for CatalogCollectionEntry {
687    fn desc(&self, name: &FullItemName) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
688        CatalogCollectionEntry::desc(self, name)
689    }
690
691    fn global_id(&self) -> GlobalId {
692        match self.entry.item() {
693            CatalogItem::Source(source) => source.global_id,
694            CatalogItem::Log(log) => log.global_id,
695            CatalogItem::View(view) => view.global_id,
696            CatalogItem::MaterializedView(mv) => mv.global_id,
697            CatalogItem::Sink(sink) => sink.global_id,
698            CatalogItem::Index(index) => index.global_id,
699            CatalogItem::Type(ty) => ty.global_id,
700            CatalogItem::Func(func) => func.global_id,
701            CatalogItem::Secret(secret) => secret.global_id,
702            CatalogItem::Connection(conn) => conn.global_id,
703            CatalogItem::ContinualTask(ct) => ct.global_id,
704            CatalogItem::Table(table) => match self.version {
705                RelationVersionSelector::Latest => {
706                    let (_version, gid) = table
707                        .collections
708                        .last_key_value()
709                        .expect("at least one version");
710                    *gid
711                }
712                RelationVersionSelector::Specific(version) => table
713                    .collections
714                    .get(&version)
715                    .expect("catalog corruption, missing version!")
716                    .clone(),
717            },
718        }
719    }
720}
721
722impl Deref for CatalogCollectionEntry {
723    type Target = CatalogEntry;
724
725    fn deref(&self) -> &CatalogEntry {
726        &self.entry
727    }
728}
729
730impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry {
731    fn name(&self) -> &QualifiedItemName {
732        self.entry.name()
733    }
734
735    fn id(&self) -> CatalogItemId {
736        self.entry.id()
737    }
738
739    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
740        Box::new(self.entry.global_ids())
741    }
742
743    fn oid(&self) -> u32 {
744        self.entry.oid()
745    }
746
747    fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
748        self.entry.func()
749    }
750
751    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
752        self.entry.source_desc()
753    }
754
755    fn connection(
756        &self,
757    ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
758    {
759        mz_sql::catalog::CatalogItem::connection(&self.entry)
760    }
761
762    fn create_sql(&self) -> &str {
763        self.entry.create_sql()
764    }
765
766    fn item_type(&self) -> SqlCatalogItemType {
767        self.entry.item_type()
768    }
769
770    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
771        self.entry.index_details()
772    }
773
774    fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
775        self.entry.writable_table_details()
776    }
777
778    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
779        self.entry.type_details()
780    }
781
782    fn references(&self) -> &ResolvedIds {
783        self.entry.references()
784    }
785
786    fn uses(&self) -> BTreeSet<CatalogItemId> {
787        self.entry.uses()
788    }
789
790    fn referenced_by(&self) -> &[CatalogItemId] {
791        self.entry.referenced_by()
792    }
793
794    fn used_by(&self) -> &[CatalogItemId] {
795        self.entry.used_by()
796    }
797
798    fn subsource_details(
799        &self,
800    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
801        self.entry.subsource_details()
802    }
803
804    fn source_export_details(
805        &self,
806    ) -> Option<(
807        CatalogItemId,
808        &UnresolvedItemName,
809        &SourceExportDetails,
810        &SourceExportDataConfig<ReferencedConnection>,
811    )> {
812        self.entry.source_export_details()
813    }
814
815    fn is_progress_source(&self) -> bool {
816        self.entry.is_progress_source()
817    }
818
819    fn progress_id(&self) -> Option<CatalogItemId> {
820        self.entry.progress_id()
821    }
822
823    fn owner_id(&self) -> RoleId {
824        *self.entry.owner_id()
825    }
826
827    fn privileges(&self) -> &PrivilegeMap {
828        self.entry.privileges()
829    }
830
831    fn cluster_id(&self) -> Option<ClusterId> {
832        self.entry.item().cluster_id()
833    }
834
835    fn at_version(
836        &self,
837        version: RelationVersionSelector,
838    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
839        Box::new(CatalogCollectionEntry {
840            entry: self.entry.clone(),
841            version,
842        })
843    }
844
845    fn latest_version(&self) -> Option<RelationVersion> {
846        self.entry.latest_version()
847    }
848}
849
850#[derive(Debug, Clone, Serialize)]
851pub enum CatalogItem {
852    Table(Table),
853    Source(Source),
854    Log(Log),
855    View(View),
856    MaterializedView(MaterializedView),
857    Sink(Sink),
858    Index(Index),
859    Type(Type),
860    Func(Func),
861    Secret(Secret),
862    Connection(Connection),
863    ContinualTask(ContinualTask),
864}
865
866impl From<CatalogEntry> for durable::Item {
867    fn from(entry: CatalogEntry) -> durable::Item {
868        let (create_sql, global_id, extra_versions) = entry.item.into_serialized();
869        durable::Item {
870            id: entry.id,
871            oid: entry.oid,
872            global_id,
873            schema_id: entry.name.qualifiers.schema_spec.into(),
874            name: entry.name.item,
875            create_sql,
876            owner_id: entry.owner_id,
877            privileges: entry.privileges.into_all_values().collect(),
878            extra_versions,
879        }
880    }
881}
882
883#[derive(Debug, Clone, Serialize)]
884pub struct Table {
885    /// Parse-able SQL that defines this table.
886    pub create_sql: Option<String>,
887    /// [`VersionedRelationDesc`] of this table, derived from the `create_sql`.
888    pub desc: VersionedRelationDesc,
889    /// Versions of this table, and the [`GlobalId`]s that refer to them.
890    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
891    pub collections: BTreeMap<RelationVersion, GlobalId>,
892    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
893    #[serde(skip)]
894    pub conn_id: Option<ConnectionId>,
895    /// Other catalog objects referenced by this table, e.g. custom types.
896    pub resolved_ids: ResolvedIds,
897    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
898    pub custom_logical_compaction_window: Option<CompactionWindow>,
899    /// Whether the table's logical compaction window is controlled by the ['metrics_retention']
900    /// session variable.
901    ///
902    /// ['metrics_retention']: mz_sql::session::vars::METRICS_RETENTION
903    pub is_retained_metrics_object: bool,
904    /// Where data for this table comes from, e.g. `INSERT` statements or an upstream source.
905    pub data_source: TableDataSource,
906}
907
908impl Table {
909    pub fn timeline(&self) -> Timeline {
910        match &self.data_source {
911            // The Coordinator controls insertions for writable tables
912            // (including system tables), so they are realtime.
913            TableDataSource::TableWrites { .. } => Timeline::EpochMilliseconds,
914            TableDataSource::DataSource { timeline, .. } => timeline.clone(),
915        }
916    }
917
918    /// Returns all of the [`GlobalId`]s that this [`Table`] can be referenced by.
919    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
920        self.collections.values().copied()
921    }
922
923    /// Returns the latest [`GlobalId`] for this [`Table`] which should be used for writes.
924    pub fn global_id_writes(&self) -> GlobalId {
925        self.collections
926            .last_key_value()
927            .expect("at least one version of a table")
928            .1
929            .clone()
930    }
931
932    /// Returns all of the collections and their [`RelationDesc`]s associated with this [`Table`].
933    pub fn collection_descs(
934        &self,
935    ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
936        self.collections.iter().map(|(version, gid)| {
937            let desc = self
938                .desc
939                .at_version(RelationVersionSelector::Specific(*version));
940            (*gid, *version, desc)
941        })
942    }
943
944    /// Returns the [`RelationDesc`] for a specific [`GlobalId`].
945    pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
946        let (version, _gid) = self
947            .collections
948            .iter()
949            .find(|(_version, gid)| *gid == id)
950            .expect("GlobalId to exist");
951        self.desc
952            .at_version(RelationVersionSelector::Specific(*version))
953    }
954}
955
956#[derive(Clone, Debug, Serialize)]
957pub enum TableDataSource {
958    /// The table owns data created via INSERT/UPDATE/DELETE statements.
959    TableWrites {
960        #[serde(skip)]
961        defaults: Vec<Expr<Aug>>,
962    },
963
964    /// The table receives its data from the identified `DataSourceDesc`.
965    /// This table type does not support INSERT/UPDATE/DELETE statements.
966    DataSource {
967        desc: DataSourceDesc,
968        timeline: Timeline,
969    },
970}
971
972#[derive(Debug, Clone, Serialize)]
973pub enum DataSourceDesc {
974    /// Receives data from an external system
975    Ingestion {
976        desc: SourceDesc<ReferencedConnection>,
977        cluster_id: ClusterId,
978    },
979    /// Receives data from an external system
980    OldSyntaxIngestion {
981        desc: SourceDesc<ReferencedConnection>,
982        cluster_id: ClusterId,
983        // If we're dealing with an old syntax ingestion the progress id will be some other collection
984        // and the ingestion itself will have the data from an external reference
985        progress_subsource: CatalogItemId,
986        data_config: SourceExportDataConfig<ReferencedConnection>,
987        details: SourceExportDetails,
988    },
989    /// This source receives its data from the identified ingestion,
990    /// specifically the output identified by `external_reference`.
991    /// N.B. that `external_reference` should not be used to identify
992    /// anything downstream of purification, as the purification process
993    /// encodes source-specific identifiers into the `details` struct.
994    /// The `external_reference` field is only used here for displaying
995    /// human-readable names in system tables.
996    IngestionExport {
997        ingestion_id: CatalogItemId,
998        external_reference: UnresolvedItemName,
999        details: SourceExportDetails,
1000        data_config: SourceExportDataConfig<ReferencedConnection>,
1001    },
1002    /// Receives introspection data from an internal system
1003    Introspection(IntrospectionType),
1004    /// Receives data from the source's reclocking/remapping operations.
1005    Progress,
1006    /// Receives data from HTTP requests.
1007    Webhook {
1008        /// Optional components used to validation a webhook request.
1009        validate_using: Option<WebhookValidation>,
1010        /// Describes how we deserialize the body of a webhook request.
1011        body_format: WebhookBodyFormat,
1012        /// Describes whether or not to include headers and how to map them.
1013        headers: WebhookHeaders,
1014        /// The cluster which this source is associated with.
1015        cluster_id: ClusterId,
1016    },
1017}
1018
1019impl DataSourceDesc {
1020    /// The key and value formats of the data source.
1021    pub fn formats(&self) -> (Option<&str>, Option<&str>) {
1022        match &self {
1023            DataSourceDesc::Ingestion { .. } => (None, None),
1024            DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1025                match &data_config.encoding.as_ref() {
1026                    Some(encoding) => match &encoding.key {
1027                        Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1028                        None => (None, Some(encoding.value.type_())),
1029                    },
1030                    None => (None, None),
1031                }
1032            }
1033            DataSourceDesc::IngestionExport { data_config, .. } => match &data_config.encoding {
1034                Some(encoding) => match &encoding.key {
1035                    Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1036                    None => (None, Some(encoding.value.type_())),
1037                },
1038                None => (None, None),
1039            },
1040            DataSourceDesc::Introspection(_)
1041            | DataSourceDesc::Webhook { .. }
1042            | DataSourceDesc::Progress => (None, None),
1043        }
1044    }
1045
1046    /// Envelope of the data source.
1047    pub fn envelope(&self) -> Option<&str> {
1048        // Note how "none"/"append-only" is different from `None`. Source
1049        // sources don't have an envelope (internal logs, for example), while
1050        // other sources have an envelope that we call the "NONE"-envelope.
1051
1052        fn envelope_string(envelope: &SourceEnvelope) -> &str {
1053            match envelope {
1054                SourceEnvelope::None(_) => "none",
1055                SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
1056                    mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert",
1057                    mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
1058                        // NOTE(aljoscha): Should we somehow mark that this is
1059                        // using upsert internally? See note above about
1060                        // DEBEZIUM.
1061                        "debezium"
1062                    }
1063                    mz_storage_types::sources::envelope::UpsertStyle::ValueErrInline { .. } => {
1064                        "upsert-value-err-inline"
1065                    }
1066                },
1067                SourceEnvelope::CdcV2 => {
1068                    // TODO(aljoscha): Should we even report this? It's
1069                    // currently not exposed.
1070                    "materialize"
1071                }
1072            }
1073        }
1074
1075        match self {
1076            // NOTE(aljoscha): We could move the block for ingestions into
1077            // `SourceEnvelope` itself, but that one feels more like an internal
1078            // thing and adapter should own how we represent envelopes as a
1079            // string? It would not be hard to convince me otherwise, though.
1080            DataSourceDesc::Ingestion { .. } => None,
1081            DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1082                Some(envelope_string(&data_config.envelope))
1083            }
1084            DataSourceDesc::IngestionExport { data_config, .. } => {
1085                Some(envelope_string(&data_config.envelope))
1086            }
1087            DataSourceDesc::Introspection(_)
1088            | DataSourceDesc::Webhook { .. }
1089            | DataSourceDesc::Progress => None,
1090        }
1091    }
1092}
1093
1094#[derive(Debug, Clone, Serialize)]
1095pub struct Source {
1096    /// Parse-able SQL that defines this table.
1097    pub create_sql: Option<String>,
1098    /// [`GlobalId`] used to reference this source from outside the catalog.
1099    pub global_id: GlobalId,
1100    // TODO: Unskip: currently blocked on some inner BTreeMap<X, _> problems.
1101    #[serde(skip)]
1102    pub data_source: DataSourceDesc,
1103    /// [`RelationDesc`] of this source, derived from the `create_sql`.
1104    pub desc: RelationDesc,
1105    /// The timeline this source exists on.
1106    pub timeline: Timeline,
1107    /// Other catalog objects referenced by this table, e.g. custom types.
1108    pub resolved_ids: ResolvedIds,
1109    /// This value is ignored for subsources, i.e. for
1110    /// [`DataSourceDesc::IngestionExport`]. Instead, it uses the primary
1111    /// sources logical compaction window.
1112    pub custom_logical_compaction_window: Option<CompactionWindow>,
1113    /// Whether the source's logical compaction window is controlled by
1114    /// METRICS_RETENTION
1115    pub is_retained_metrics_object: bool,
1116}
1117
1118impl Source {
1119    /// Creates a new `Source`.
1120    ///
1121    /// # Panics
1122    /// - If an ingestion-based plan is not given a cluster_id.
1123    /// - If a non-ingestion-based source has a defined cluster config in its plan.
1124    /// - If a non-ingestion-based source is given a cluster_id.
1125    pub fn new(
1126        plan: CreateSourcePlan,
1127        global_id: GlobalId,
1128        resolved_ids: ResolvedIds,
1129        custom_logical_compaction_window: Option<CompactionWindow>,
1130        is_retained_metrics_object: bool,
1131    ) -> Source {
1132        Source {
1133            create_sql: Some(plan.source.create_sql),
1134            data_source: match plan.source.data_source {
1135                mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1136                    desc,
1137                    cluster_id: plan
1138                        .in_cluster
1139                        .expect("ingestion-based sources must be given a cluster ID"),
1140                },
1141                mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1142                    desc,
1143                    progress_subsource,
1144                    data_config,
1145                    details,
1146                } => DataSourceDesc::OldSyntaxIngestion {
1147                    desc,
1148                    cluster_id: plan
1149                        .in_cluster
1150                        .expect("ingestion-based sources must be given a cluster ID"),
1151                    progress_subsource,
1152                    data_config,
1153                    details,
1154                },
1155                mz_sql::plan::DataSourceDesc::Progress => {
1156                    assert!(
1157                        plan.in_cluster.is_none(),
1158                        "subsources must not have a host config or cluster_id defined"
1159                    );
1160                    DataSourceDesc::Progress
1161                }
1162                mz_sql::plan::DataSourceDesc::IngestionExport {
1163                    ingestion_id,
1164                    external_reference,
1165                    details,
1166                    data_config,
1167                } => {
1168                    assert!(
1169                        plan.in_cluster.is_none(),
1170                        "subsources must not have a host config or cluster_id defined"
1171                    );
1172                    DataSourceDesc::IngestionExport {
1173                        ingestion_id,
1174                        external_reference,
1175                        details,
1176                        data_config,
1177                    }
1178                }
1179                mz_sql::plan::DataSourceDesc::Webhook {
1180                    validate_using,
1181                    body_format,
1182                    headers,
1183                    cluster_id,
1184                } => {
1185                    mz_ore::soft_assert_or_log!(
1186                        cluster_id.is_none(),
1187                        "cluster_id set at Source level for Webhooks"
1188                    );
1189                    DataSourceDesc::Webhook {
1190                        validate_using,
1191                        body_format,
1192                        headers,
1193                        cluster_id: plan
1194                            .in_cluster
1195                            .expect("webhook sources must be given a cluster ID"),
1196                    }
1197                }
1198            },
1199            desc: plan.source.desc,
1200            global_id,
1201            timeline: plan.timeline,
1202            resolved_ids,
1203            custom_logical_compaction_window: plan
1204                .source
1205                .compaction_window
1206                .or(custom_logical_compaction_window),
1207            is_retained_metrics_object,
1208        }
1209    }
1210
1211    /// Type of the source.
1212    pub fn source_type(&self) -> &str {
1213        match &self.data_source {
1214            DataSourceDesc::Ingestion { desc, .. }
1215            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.name(),
1216            DataSourceDesc::Progress => "progress",
1217            DataSourceDesc::IngestionExport { .. } => "subsource",
1218            DataSourceDesc::Introspection(_) => "source",
1219            DataSourceDesc::Webhook { .. } => "webhook",
1220        }
1221    }
1222
1223    /// Connection ID of the source, if one exists.
1224    pub fn connection_id(&self) -> Option<CatalogItemId> {
1225        match &self.data_source {
1226            DataSourceDesc::Ingestion { desc, .. }
1227            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.connection_id(),
1228            DataSourceDesc::IngestionExport { .. }
1229            | DataSourceDesc::Introspection(_)
1230            | DataSourceDesc::Webhook { .. }
1231            | DataSourceDesc::Progress => None,
1232        }
1233    }
1234
1235    /// The single [`GlobalId`] that refers to this Source.
1236    pub fn global_id(&self) -> GlobalId {
1237        self.global_id
1238    }
1239
1240    /// The expensive resource that each source consumes is persist shards. To
1241    /// prevent abuse, we want to prevent users from creating sources that use an
1242    /// unbounded number of persist shards. But we also don't want to count
1243    /// persist shards that are mandated by teh system (e.g., the progress
1244    /// shard) so that future versions of Materialize can introduce additional
1245    /// per-source shards (e.g., a per-source status shard) without impacting
1246    /// the limit calculation.
1247    pub fn user_controllable_persist_shard_count(&self) -> i64 {
1248        match &self.data_source {
1249            DataSourceDesc::Ingestion { .. } => 0,
1250            DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1251                match &desc.connection {
1252                    // These multi-output sources do not use their primary
1253                    // source's data shard, so we don't include it in accounting
1254                    // for users.
1255                    GenericSourceConnection::Postgres(_)
1256                    | GenericSourceConnection::MySql(_)
1257                    | GenericSourceConnection::SqlServer(_) => 0,
1258                    GenericSourceConnection::LoadGenerator(lg) => match lg.load_generator {
1259                        // Load generators that output data in their primary shard
1260                        LoadGenerator::Clock
1261                        | LoadGenerator::Counter { .. }
1262                        | LoadGenerator::Datums
1263                        | LoadGenerator::KeyValue(_) => 1,
1264                        LoadGenerator::Auction
1265                        | LoadGenerator::Marketing
1266                        | LoadGenerator::Tpch { .. } => 0,
1267                    },
1268                    GenericSourceConnection::Kafka(_) => 1,
1269                }
1270            }
1271            //  DataSourceDesc::IngestionExport represents a subsource, which
1272            //  use a data shard.
1273            DataSourceDesc::IngestionExport { .. } => 1,
1274            DataSourceDesc::Webhook { .. } => 1,
1275            // Introspection and progress subsources are not under the user's control, so shouldn't
1276            // count toward their quota.
1277            DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => 0,
1278        }
1279    }
1280}
1281
1282#[derive(Debug, Clone, Serialize)]
1283pub struct Log {
1284    /// The category of data this log stores.
1285    pub variant: LogVariant,
1286    /// [`GlobalId`] used to reference this log from outside the catalog.
1287    pub global_id: GlobalId,
1288}
1289
1290impl Log {
1291    /// The single [`GlobalId`] that refers to this Log.
1292    pub fn global_id(&self) -> GlobalId {
1293        self.global_id
1294    }
1295}
1296
1297#[derive(Debug, Clone, Serialize)]
1298pub struct Sink {
1299    /// Parse-able SQL that defines this sink.
1300    pub create_sql: String,
1301    /// [`GlobalId`] used to reference this sink from outside the catalog, e.g storage.
1302    pub global_id: GlobalId,
1303    /// Collection we read into this sink.
1304    pub from: GlobalId,
1305    /// Connection to the external service we're sinking into, e.g. Kafka.
1306    pub connection: StorageSinkConnection<ReferencedConnection>,
1307    /// Envelope we use to sink into the external system.
1308    ///
1309    /// TODO(guswynn): this probably should just be in the `connection`.
1310    pub envelope: SinkEnvelope,
1311    /// Emit an initial snapshot into the sink.
1312    pub with_snapshot: bool,
1313    /// Used to fence other writes into this sink as we evolve the upstream materialized view.
1314    pub version: u64,
1315    /// Other catalog objects this sink references.
1316    pub resolved_ids: ResolvedIds,
1317    /// Cluster this sink runs on.
1318    pub cluster_id: ClusterId,
1319}
1320
1321impl Sink {
1322    pub fn sink_type(&self) -> &str {
1323        self.connection.name()
1324    }
1325
1326    /// Envelope of the sink.
1327    pub fn envelope(&self) -> Option<&str> {
1328        match &self.envelope {
1329            SinkEnvelope::Debezium => Some("debezium"),
1330            SinkEnvelope::Upsert => Some("upsert"),
1331        }
1332    }
1333
1334    /// Output a combined format string of the sink. For legacy reasons
1335    /// if the key-format is none or the key & value formats are
1336    /// both the same (either avro or json), we return the value format name,
1337    /// otherwise we return a composite name.
1338    pub fn combined_format(&self) -> Cow<'_, str> {
1339        let StorageSinkConnection::Kafka(connection) = &self.connection;
1340        connection.format.get_format_name()
1341    }
1342
1343    /// Output distinct key_format and value_format of the sink.
1344    pub fn formats(&self) -> (Option<&str>, &str) {
1345        let StorageSinkConnection::Kafka(connection) = &self.connection;
1346        let key_format = connection
1347            .format
1348            .key_format
1349            .as_ref()
1350            .map(|format| format.get_format_name());
1351        let value_format = connection.format.value_format.get_format_name();
1352        (key_format, value_format)
1353    }
1354
1355    pub fn connection_id(&self) -> Option<CatalogItemId> {
1356        self.connection.connection_id()
1357    }
1358
1359    /// The single [`GlobalId`] that this Sink can be referenced by.
1360    pub fn global_id(&self) -> GlobalId {
1361        self.global_id
1362    }
1363}
1364
1365#[derive(Debug, Clone, Serialize)]
1366pub struct View {
1367    /// Parse-able SQL that defines this view.
1368    pub create_sql: String,
1369    /// [`GlobalId`] used to reference this view from outside the catalog, e.g. compute.
1370    pub global_id: GlobalId,
1371    /// Unoptimized high-level expression from parsing the `create_sql`.
1372    pub raw_expr: Arc<HirRelationExpr>,
1373    /// Optimized mid-level expression from (locally) optimizing the `raw_expr`.
1374    pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1375    /// Columns of this view.
1376    pub desc: RelationDesc,
1377    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
1378    pub conn_id: Option<ConnectionId>,
1379    /// Other catalog objects that are referenced by this view, determined at name resolution.
1380    pub resolved_ids: ResolvedIds,
1381    /// All of the catalog objects that are referenced by this view.
1382    pub dependencies: DependencyIds,
1383}
1384
1385impl View {
1386    /// The single [`GlobalId`] this [`View`] can be referenced by.
1387    pub fn global_id(&self) -> GlobalId {
1388        self.global_id
1389    }
1390}
1391
1392#[derive(Debug, Clone, Serialize)]
1393pub struct MaterializedView {
1394    /// Parse-able SQL that defines this materialized view.
1395    pub create_sql: String,
1396    /// [`GlobalId`] used to reference this materialized view from outside the catalog.
1397    pub global_id: GlobalId,
1398    /// Raw high-level expression from planning, derived from the `create_sql`.
1399    pub raw_expr: Arc<HirRelationExpr>,
1400    /// Optimized mid-level expression, derived from the `raw_expr`.
1401    pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1402    /// Columns for this materialized view.
1403    pub desc: RelationDesc,
1404    /// Other catalog items that this materialized view references, determined at name resolution.
1405    pub resolved_ids: ResolvedIds,
1406    /// All of the catalog objects that are referenced by this view.
1407    pub dependencies: DependencyIds,
1408    /// Cluster that this materialized view runs on.
1409    pub cluster_id: ClusterId,
1410    /// Column indexes that we assert are not `NULL`.
1411    ///
1412    /// TODO(parkmycar): Switch this to use the `ColumnIdx` type.
1413    pub non_null_assertions: Vec<usize>,
1414    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
1415    pub custom_logical_compaction_window: Option<CompactionWindow>,
1416    /// Schedule to refresh this materialized view, e.g. set via `REFRESH EVERY` option.
1417    pub refresh_schedule: Option<RefreshSchedule>,
1418    /// The initial `as_of` of the storage collection associated with the materialized view.
1419    ///
1420    /// Note: This doesn't change upon restarts.
1421    /// (The dataflow's initial `as_of` can be different.)
1422    pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1423}
1424
1425impl MaterializedView {
1426    /// The single [`GlobalId`] this [`MaterializedView`] can be referenced by.
1427    pub fn global_id(&self) -> GlobalId {
1428        self.global_id
1429    }
1430}
1431
1432#[derive(Debug, Clone, Serialize)]
1433pub struct Index {
1434    /// Parse-able SQL that defines this table.
1435    pub create_sql: String,
1436    /// [`GlobalId`] used to reference this index from outside the catalog, e.g. compute.
1437    pub global_id: GlobalId,
1438    /// The [`GlobalId`] this Index is on.
1439    pub on: GlobalId,
1440    /// Keys of the index.
1441    pub keys: Arc<[MirScalarExpr]>,
1442    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
1443    pub conn_id: Option<ConnectionId>,
1444    /// Other catalog objects referenced by this index, e.g. the object we're indexing.
1445    pub resolved_ids: ResolvedIds,
1446    /// Cluster this index is installed on.
1447    pub cluster_id: ClusterId,
1448    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
1449    pub custom_logical_compaction_window: Option<CompactionWindow>,
1450    /// Whether the table's logical compaction window is controlled by the ['metrics_retention']
1451    /// session variable.
1452    ///
1453    /// ['metrics_retention']: mz_sql::session::vars::METRICS_RETENTION
1454    pub is_retained_metrics_object: bool,
1455}
1456
1457impl Index {
1458    /// The [`GlobalId`] that refers to this Index.
1459    pub fn global_id(&self) -> GlobalId {
1460        self.global_id
1461    }
1462}
1463
1464#[derive(Debug, Clone, Serialize)]
1465pub struct Type {
1466    /// Parse-able SQL that defines this type.
1467    pub create_sql: Option<String>,
1468    /// [`GlobalId`] used to reference this type from outside the catalog.
1469    pub global_id: GlobalId,
1470    #[serde(skip)]
1471    pub details: CatalogTypeDetails<IdReference>,
1472    pub desc: Option<RelationDesc>,
1473    /// Other catalog objects referenced by this type.
1474    pub resolved_ids: ResolvedIds,
1475}
1476
1477#[derive(Debug, Clone, Serialize)]
1478pub struct Func {
1479    /// Static definition of the function.
1480    #[serde(skip)]
1481    pub inner: &'static mz_sql::func::Func,
1482    /// [`GlobalId`] used to reference this function from outside the catalog.
1483    pub global_id: GlobalId,
1484}
1485
1486#[derive(Debug, Clone, Serialize)]
1487pub struct Secret {
1488    /// Parse-able SQL that defines this secret.
1489    pub create_sql: String,
1490    /// [`GlobalId`] used to reference this secret from outside the catalog.
1491    pub global_id: GlobalId,
1492}
1493
1494#[derive(Debug, Clone, Serialize)]
1495pub struct Connection {
1496    /// Parse-able SQL that defines this connection.
1497    pub create_sql: String,
1498    /// [`GlobalId`] used to reference this connection from the storage layer.
1499    pub global_id: GlobalId,
1500    /// The kind of connection.
1501    pub details: ConnectionDetails,
1502    /// Other objects this connection depends on.
1503    pub resolved_ids: ResolvedIds,
1504}
1505
1506impl Connection {
1507    /// The single [`GlobalId`] used to reference this connection.
1508    pub fn global_id(&self) -> GlobalId {
1509        self.global_id
1510    }
1511}
1512
1513#[derive(Debug, Clone, Serialize)]
1514pub struct ContinualTask {
1515    /// Parse-able SQL that defines this continual task.
1516    pub create_sql: String,
1517    /// [`GlobalId`] used to reference this continual task from outside the catalog.
1518    pub global_id: GlobalId,
1519    /// [`GlobalId`] of the collection that we read into this continual task.
1520    pub input_id: GlobalId,
1521    pub with_snapshot: bool,
1522    /// ContinualTasks are self-referential. We make this work by using a
1523    /// placeholder `LocalId` for the CT itself through name resolution and
1524    /// planning. Then we fill in the real `GlobalId` before constructing this
1525    /// catalog item.
1526    pub raw_expr: Arc<HirRelationExpr>,
1527    /// Columns for this continual task.
1528    pub desc: RelationDesc,
1529    /// Other catalog items that this continual task references, determined at name resolution.
1530    pub resolved_ids: ResolvedIds,
1531    /// All of the catalog objects that are referenced by this continual task.
1532    pub dependencies: DependencyIds,
1533    /// Cluster that this continual task runs on.
1534    pub cluster_id: ClusterId,
1535    /// See the comment on [MaterializedView::initial_as_of].
1536    pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1537}
1538
1539impl ContinualTask {
1540    /// The single [`GlobalId`] used to reference this continual task.
1541    pub fn global_id(&self) -> GlobalId {
1542        self.global_id
1543    }
1544}
1545
1546#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
1547pub struct NetworkPolicy {
1548    pub name: String,
1549    pub id: NetworkPolicyId,
1550    pub oid: u32,
1551    pub rules: Vec<NetworkPolicyRule>,
1552    pub owner_id: RoleId,
1553    pub privileges: PrivilegeMap,
1554}
1555
1556impl From<NetworkPolicy> for durable::NetworkPolicy {
1557    fn from(policy: NetworkPolicy) -> durable::NetworkPolicy {
1558        durable::NetworkPolicy {
1559            id: policy.id,
1560            oid: policy.oid,
1561            name: policy.name,
1562            rules: policy.rules,
1563            owner_id: policy.owner_id,
1564            privileges: policy.privileges.into_all_values().collect(),
1565        }
1566    }
1567}
1568
1569impl From<durable::NetworkPolicy> for NetworkPolicy {
1570    fn from(
1571        durable::NetworkPolicy {
1572            id,
1573            oid,
1574            name,
1575            rules,
1576            owner_id,
1577            privileges,
1578        }: durable::NetworkPolicy,
1579    ) -> Self {
1580        NetworkPolicy {
1581            id,
1582            oid,
1583            name,
1584            rules,
1585            owner_id,
1586            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1587        }
1588    }
1589}
1590
1591impl UpdateFrom<durable::NetworkPolicy> for NetworkPolicy {
1592    fn update_from(
1593        &mut self,
1594        durable::NetworkPolicy {
1595            id,
1596            oid,
1597            name,
1598            rules,
1599            owner_id,
1600            privileges,
1601        }: durable::NetworkPolicy,
1602    ) {
1603        self.id = id;
1604        self.oid = oid;
1605        self.name = name;
1606        self.rules = rules;
1607        self.owner_id = owner_id;
1608        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1609    }
1610}
1611
1612impl CatalogItem {
1613    /// Returns a string indicating the type of this catalog entry.
1614    pub fn typ(&self) -> mz_sql::catalog::CatalogItemType {
1615        match self {
1616            CatalogItem::Table(_) => mz_sql::catalog::CatalogItemType::Table,
1617            CatalogItem::Source(_) => mz_sql::catalog::CatalogItemType::Source,
1618            CatalogItem::Log(_) => mz_sql::catalog::CatalogItemType::Source,
1619            CatalogItem::Sink(_) => mz_sql::catalog::CatalogItemType::Sink,
1620            CatalogItem::View(_) => mz_sql::catalog::CatalogItemType::View,
1621            CatalogItem::MaterializedView(_) => mz_sql::catalog::CatalogItemType::MaterializedView,
1622            CatalogItem::Index(_) => mz_sql::catalog::CatalogItemType::Index,
1623            CatalogItem::Type(_) => mz_sql::catalog::CatalogItemType::Type,
1624            CatalogItem::Func(_) => mz_sql::catalog::CatalogItemType::Func,
1625            CatalogItem::Secret(_) => mz_sql::catalog::CatalogItemType::Secret,
1626            CatalogItem::Connection(_) => mz_sql::catalog::CatalogItemType::Connection,
1627            CatalogItem::ContinualTask(_) => mz_sql::catalog::CatalogItemType::ContinualTask,
1628        }
1629    }
1630
1631    /// Returns the [`GlobalId`]s that reference this item, if any.
1632    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1633        let gid = match self {
1634            CatalogItem::Source(source) => source.global_id,
1635            CatalogItem::Log(log) => log.global_id,
1636            CatalogItem::Sink(sink) => sink.global_id,
1637            CatalogItem::View(view) => view.global_id,
1638            CatalogItem::MaterializedView(mv) => mv.global_id,
1639            CatalogItem::ContinualTask(ct) => ct.global_id,
1640            CatalogItem::Index(index) => index.global_id,
1641            CatalogItem::Func(func) => func.global_id,
1642            CatalogItem::Type(ty) => ty.global_id,
1643            CatalogItem::Secret(secret) => secret.global_id,
1644            CatalogItem::Connection(conn) => conn.global_id,
1645            CatalogItem::Table(table) => {
1646                return itertools::Either::Left(table.collections.values().copied());
1647            }
1648        };
1649        itertools::Either::Right(std::iter::once(gid))
1650    }
1651
1652    /// Returns the most up-to-date [`GlobalId`] for this item.
1653    ///
1654    /// Note: The only type of object that can have multiple [`GlobalId`]s are tables.
1655    pub fn latest_global_id(&self) -> GlobalId {
1656        match self {
1657            CatalogItem::Source(source) => source.global_id,
1658            CatalogItem::Log(log) => log.global_id,
1659            CatalogItem::Sink(sink) => sink.global_id,
1660            CatalogItem::View(view) => view.global_id,
1661            CatalogItem::MaterializedView(mv) => mv.global_id,
1662            CatalogItem::ContinualTask(ct) => ct.global_id,
1663            CatalogItem::Index(index) => index.global_id,
1664            CatalogItem::Func(func) => func.global_id,
1665            CatalogItem::Type(ty) => ty.global_id,
1666            CatalogItem::Secret(secret) => secret.global_id,
1667            CatalogItem::Connection(conn) => conn.global_id,
1668            CatalogItem::Table(table) => table.global_id_writes(),
1669        }
1670    }
1671
1672    /// Whether this item represents a storage collection.
1673    pub fn is_storage_collection(&self) -> bool {
1674        match self {
1675            CatalogItem::Table(_)
1676            | CatalogItem::Source(_)
1677            | CatalogItem::MaterializedView(_)
1678            | CatalogItem::Sink(_)
1679            | CatalogItem::ContinualTask(_) => true,
1680            CatalogItem::Log(_)
1681            | CatalogItem::View(_)
1682            | CatalogItem::Index(_)
1683            | CatalogItem::Type(_)
1684            | CatalogItem::Func(_)
1685            | CatalogItem::Secret(_)
1686            | CatalogItem::Connection(_) => false,
1687        }
1688    }
1689
1690    pub fn desc(
1691        &self,
1692        name: &FullItemName,
1693        version: RelationVersionSelector,
1694    ) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
1695        self.desc_opt(version)
1696            .ok_or_else(|| SqlCatalogError::InvalidDependency {
1697                name: name.to_string(),
1698                typ: self.typ(),
1699            })
1700    }
1701
1702    pub fn desc_opt(&self, version: RelationVersionSelector) -> Option<Cow<'_, RelationDesc>> {
1703        match &self {
1704            CatalogItem::Source(src) => Some(Cow::Borrowed(&src.desc)),
1705            CatalogItem::Log(log) => Some(Cow::Owned(log.variant.desc())),
1706            CatalogItem::Table(tbl) => Some(Cow::Owned(tbl.desc.at_version(version))),
1707            CatalogItem::View(view) => Some(Cow::Borrowed(&view.desc)),
1708            CatalogItem::MaterializedView(mview) => Some(Cow::Borrowed(&mview.desc)),
1709            CatalogItem::Type(typ) => typ.desc.as_ref().map(Cow::Borrowed),
1710            CatalogItem::ContinualTask(ct) => Some(Cow::Borrowed(&ct.desc)),
1711            CatalogItem::Func(_)
1712            | CatalogItem::Index(_)
1713            | CatalogItem::Sink(_)
1714            | CatalogItem::Secret(_)
1715            | CatalogItem::Connection(_) => None,
1716        }
1717    }
1718
1719    pub fn func(
1720        &self,
1721        entry: &CatalogEntry,
1722    ) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
1723        match &self {
1724            CatalogItem::Func(func) => Ok(func.inner),
1725            _ => Err(SqlCatalogError::UnexpectedType {
1726                name: entry.name().item.to_string(),
1727                actual_type: entry.item_type(),
1728                expected_type: CatalogItemType::Func,
1729            }),
1730        }
1731    }
1732
1733    pub fn source_desc(
1734        &self,
1735        entry: &CatalogEntry,
1736    ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
1737        match &self {
1738            CatalogItem::Source(source) => match &source.data_source {
1739                DataSourceDesc::Ingestion { desc, .. }
1740                | DataSourceDesc::OldSyntaxIngestion { desc, .. } => Ok(Some(desc)),
1741                DataSourceDesc::IngestionExport { .. }
1742                | DataSourceDesc::Introspection(_)
1743                | DataSourceDesc::Webhook { .. }
1744                | DataSourceDesc::Progress => Ok(None),
1745            },
1746            _ => Err(SqlCatalogError::UnexpectedType {
1747                name: entry.name().item.to_string(),
1748                actual_type: entry.item_type(),
1749                expected_type: CatalogItemType::Source,
1750            }),
1751        }
1752    }
1753
1754    /// Reports whether this catalog entry is a progress source.
1755    pub fn is_progress_source(&self) -> bool {
1756        matches!(
1757            self,
1758            CatalogItem::Source(Source {
1759                data_source: DataSourceDesc::Progress,
1760                ..
1761            })
1762        )
1763    }
1764
1765    /// Collects the identifiers of the objects that were encountered when resolving names in the
1766    /// item's DDL statement.
1767    pub fn references(&self) -> &ResolvedIds {
1768        static EMPTY: LazyLock<ResolvedIds> = LazyLock::new(ResolvedIds::empty);
1769        match self {
1770            CatalogItem::Func(_) => &*EMPTY,
1771            CatalogItem::Index(idx) => &idx.resolved_ids,
1772            CatalogItem::Sink(sink) => &sink.resolved_ids,
1773            CatalogItem::Source(source) => &source.resolved_ids,
1774            CatalogItem::Log(_) => &*EMPTY,
1775            CatalogItem::Table(table) => &table.resolved_ids,
1776            CatalogItem::Type(typ) => &typ.resolved_ids,
1777            CatalogItem::View(view) => &view.resolved_ids,
1778            CatalogItem::MaterializedView(mview) => &mview.resolved_ids,
1779            CatalogItem::Secret(_) => &*EMPTY,
1780            CatalogItem::Connection(connection) => &connection.resolved_ids,
1781            CatalogItem::ContinualTask(ct) => &ct.resolved_ids,
1782        }
1783    }
1784
1785    /// Collects the identifiers of the objects used by this [`CatalogItem`].
1786    ///
1787    /// Like [`CatalogItem::references()`] but also includes objects that are not directly
1788    /// referenced. For example this will include any catalog objects used to implement functions
1789    /// and casts in the item.
1790    pub fn uses(&self) -> BTreeSet<CatalogItemId> {
1791        let mut uses: BTreeSet<_> = self.references().items().copied().collect();
1792        match self {
1793            // TODO(jkosh44) This isn't really correct for functions. They may use other objects in
1794            // their implementation. However, currently there's no way to get that information.
1795            CatalogItem::Func(_) => {}
1796            CatalogItem::Index(_) => {}
1797            CatalogItem::Sink(_) => {}
1798            CatalogItem::Source(_) => {}
1799            CatalogItem::Log(_) => {}
1800            CatalogItem::Table(_) => {}
1801            CatalogItem::Type(_) => {}
1802            CatalogItem::View(view) => uses.extend(view.dependencies.0.iter().copied()),
1803            CatalogItem::MaterializedView(mview) => {
1804                uses.extend(mview.dependencies.0.iter().copied())
1805            }
1806            CatalogItem::ContinualTask(ct) => uses.extend(ct.dependencies.0.iter().copied()),
1807            CatalogItem::Secret(_) => {}
1808            CatalogItem::Connection(_) => {}
1809        }
1810        uses
1811    }
1812
1813    /// Returns the connection ID that this item belongs to, if this item is
1814    /// temporary.
1815    pub fn conn_id(&self) -> Option<&ConnectionId> {
1816        match self {
1817            CatalogItem::View(view) => view.conn_id.as_ref(),
1818            CatalogItem::Index(index) => index.conn_id.as_ref(),
1819            CatalogItem::Table(table) => table.conn_id.as_ref(),
1820            CatalogItem::Log(_)
1821            | CatalogItem::Source(_)
1822            | CatalogItem::Sink(_)
1823            | CatalogItem::MaterializedView(_)
1824            | CatalogItem::Secret(_)
1825            | CatalogItem::Type(_)
1826            | CatalogItem::Func(_)
1827            | CatalogItem::Connection(_)
1828            | CatalogItem::ContinualTask(_) => None,
1829        }
1830    }
1831
1832    /// Indicates whether this item is temporary or not.
1833    pub fn is_temporary(&self) -> bool {
1834        self.conn_id().is_some()
1835    }
1836
1837    pub fn rename_schema_refs(
1838        &self,
1839        database_name: &str,
1840        cur_schema_name: &str,
1841        new_schema_name: &str,
1842    ) -> Result<CatalogItem, (String, String)> {
1843        let do_rewrite = |create_sql: String| -> Result<String, (String, String)> {
1844            let mut create_stmt = mz_sql::parse::parse(&create_sql)
1845                .expect("invalid create sql persisted to catalog")
1846                .into_element()
1847                .ast;
1848
1849            // Rename all references to cur_schema_name.
1850            mz_sql::ast::transform::create_stmt_rename_schema_refs(
1851                &mut create_stmt,
1852                database_name,
1853                cur_schema_name,
1854                new_schema_name,
1855            )?;
1856
1857            Ok(create_stmt.to_ast_string_stable())
1858        };
1859
1860        match self {
1861            CatalogItem::Table(i) => {
1862                let mut i = i.clone();
1863                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1864                Ok(CatalogItem::Table(i))
1865            }
1866            CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
1867            CatalogItem::Source(i) => {
1868                let mut i = i.clone();
1869                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1870                Ok(CatalogItem::Source(i))
1871            }
1872            CatalogItem::Sink(i) => {
1873                let mut i = i.clone();
1874                i.create_sql = do_rewrite(i.create_sql)?;
1875                Ok(CatalogItem::Sink(i))
1876            }
1877            CatalogItem::View(i) => {
1878                let mut i = i.clone();
1879                i.create_sql = do_rewrite(i.create_sql)?;
1880                Ok(CatalogItem::View(i))
1881            }
1882            CatalogItem::MaterializedView(i) => {
1883                let mut i = i.clone();
1884                i.create_sql = do_rewrite(i.create_sql)?;
1885                Ok(CatalogItem::MaterializedView(i))
1886            }
1887            CatalogItem::Index(i) => {
1888                let mut i = i.clone();
1889                i.create_sql = do_rewrite(i.create_sql)?;
1890                Ok(CatalogItem::Index(i))
1891            }
1892            CatalogItem::Secret(i) => {
1893                let mut i = i.clone();
1894                i.create_sql = do_rewrite(i.create_sql)?;
1895                Ok(CatalogItem::Secret(i))
1896            }
1897            CatalogItem::Connection(i) => {
1898                let mut i = i.clone();
1899                i.create_sql = do_rewrite(i.create_sql)?;
1900                Ok(CatalogItem::Connection(i))
1901            }
1902            CatalogItem::Type(i) => {
1903                let mut i = i.clone();
1904                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1905                Ok(CatalogItem::Type(i))
1906            }
1907            CatalogItem::Func(i) => Ok(CatalogItem::Func(i.clone())),
1908            CatalogItem::ContinualTask(i) => {
1909                let mut i = i.clone();
1910                i.create_sql = do_rewrite(i.create_sql)?;
1911                Ok(CatalogItem::ContinualTask(i))
1912            }
1913        }
1914    }
1915
1916    /// Returns a clone of `self` with all instances of `from` renamed to `to`
1917    /// (with the option of including the item's own name) or errors if request
1918    /// is ambiguous.
1919    pub fn rename_item_refs(
1920        &self,
1921        from: FullItemName,
1922        to_item_name: String,
1923        rename_self: bool,
1924    ) -> Result<CatalogItem, String> {
1925        let do_rewrite = |create_sql: String| -> Result<String, String> {
1926            let mut create_stmt = mz_sql::parse::parse(&create_sql)
1927                .expect("invalid create sql persisted to catalog")
1928                .into_element()
1929                .ast;
1930            if rename_self {
1931                mz_sql::ast::transform::create_stmt_rename(&mut create_stmt, to_item_name.clone());
1932            }
1933            // Determination of what constitutes an ambiguous request is done here.
1934            mz_sql::ast::transform::create_stmt_rename_refs(&mut create_stmt, from, to_item_name)?;
1935            Ok(create_stmt.to_ast_string_stable())
1936        };
1937
1938        match self {
1939            CatalogItem::Table(i) => {
1940                let mut i = i.clone();
1941                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1942                Ok(CatalogItem::Table(i))
1943            }
1944            CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
1945            CatalogItem::Source(i) => {
1946                let mut i = i.clone();
1947                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1948                Ok(CatalogItem::Source(i))
1949            }
1950            CatalogItem::Sink(i) => {
1951                let mut i = i.clone();
1952                i.create_sql = do_rewrite(i.create_sql)?;
1953                Ok(CatalogItem::Sink(i))
1954            }
1955            CatalogItem::View(i) => {
1956                let mut i = i.clone();
1957                i.create_sql = do_rewrite(i.create_sql)?;
1958                Ok(CatalogItem::View(i))
1959            }
1960            CatalogItem::MaterializedView(i) => {
1961                let mut i = i.clone();
1962                i.create_sql = do_rewrite(i.create_sql)?;
1963                Ok(CatalogItem::MaterializedView(i))
1964            }
1965            CatalogItem::Index(i) => {
1966                let mut i = i.clone();
1967                i.create_sql = do_rewrite(i.create_sql)?;
1968                Ok(CatalogItem::Index(i))
1969            }
1970            CatalogItem::Secret(i) => {
1971                let mut i = i.clone();
1972                i.create_sql = do_rewrite(i.create_sql)?;
1973                Ok(CatalogItem::Secret(i))
1974            }
1975            CatalogItem::Func(_) | CatalogItem::Type(_) => {
1976                unreachable!("{}s cannot be renamed", self.typ())
1977            }
1978            CatalogItem::Connection(i) => {
1979                let mut i = i.clone();
1980                i.create_sql = do_rewrite(i.create_sql)?;
1981                Ok(CatalogItem::Connection(i))
1982            }
1983            CatalogItem::ContinualTask(i) => {
1984                let mut i = i.clone();
1985                i.create_sql = do_rewrite(i.create_sql)?;
1986                Ok(CatalogItem::ContinualTask(i))
1987            }
1988        }
1989    }
1990
1991    /// Updates the retain history for an item. Returns the previous retain history value. Returns
1992    /// an error if this item does not support retain history.
1993    pub fn update_retain_history(
1994        &mut self,
1995        value: Option<Value>,
1996        window: CompactionWindow,
1997    ) -> Result<Option<WithOptionValue<Raw>>, ()> {
1998        let update = |mut ast: &mut Statement<Raw>| {
1999            // Each statement type has unique option types. This macro handles them commonly.
2000            macro_rules! update_retain_history {
2001                ( $stmt:ident, $opt:ident, $name:ident ) => {{
2002                    // Replace or add the option.
2003                    let pos = $stmt
2004                        .with_options
2005                        .iter()
2006                        // In case there are ever multiple, look for the last one.
2007                        .rposition(|o| o.name == mz_sql_parser::ast::$name::RetainHistory);
2008                    if let Some(value) = value {
2009                        let next = mz_sql_parser::ast::$opt {
2010                            name: mz_sql_parser::ast::$name::RetainHistory,
2011                            value: Some(WithOptionValue::RetainHistoryFor(value)),
2012                        };
2013                        if let Some(idx) = pos {
2014                            let previous = $stmt.with_options[idx].clone();
2015                            $stmt.with_options[idx] = next;
2016                            previous.value
2017                        } else {
2018                            $stmt.with_options.push(next);
2019                            None
2020                        }
2021                    } else {
2022                        if let Some(idx) = pos {
2023                            $stmt.with_options.swap_remove(idx).value
2024                        } else {
2025                            None
2026                        }
2027                    }
2028                }};
2029            }
2030            let previous = match &mut ast {
2031                Statement::CreateTable(stmt) => {
2032                    update_retain_history!(stmt, TableOption, TableOptionName)
2033                }
2034                Statement::CreateIndex(stmt) => {
2035                    update_retain_history!(stmt, IndexOption, IndexOptionName)
2036                }
2037                Statement::CreateSource(stmt) => {
2038                    update_retain_history!(stmt, CreateSourceOption, CreateSourceOptionName)
2039                }
2040                Statement::CreateMaterializedView(stmt) => {
2041                    update_retain_history!(stmt, MaterializedViewOption, MaterializedViewOptionName)
2042                }
2043                _ => {
2044                    return Err(());
2045                }
2046            };
2047            Ok(previous)
2048        };
2049
2050        let res = self.update_sql(update)?;
2051        let cw = self
2052            .custom_logical_compaction_window_mut()
2053            .expect("item must have compaction window");
2054        *cw = Some(window);
2055        Ok(res)
2056    }
2057
2058    pub fn add_column(
2059        &mut self,
2060        name: ColumnName,
2061        typ: SqlColumnType,
2062        sql: RawDataType,
2063    ) -> Result<RelationVersion, PlanError> {
2064        let CatalogItem::Table(table) = self else {
2065            return Err(PlanError::Unsupported {
2066                feature: "adding columns to a non-Table".to_string(),
2067                discussion_no: None,
2068            });
2069        };
2070        let next_version = table.desc.add_column(name.clone(), typ);
2071
2072        let update = |mut ast: &mut Statement<Raw>| match &mut ast {
2073            Statement::CreateTable(stmt) => {
2074                let version = ColumnOptionDef {
2075                    name: None,
2076                    option: ColumnOption::Versioned {
2077                        action: ColumnVersioned::Added,
2078                        version: next_version.into(),
2079                    },
2080                };
2081                let column = ColumnDef {
2082                    name: name.into(),
2083                    data_type: sql,
2084                    collation: None,
2085                    options: vec![version],
2086                };
2087                stmt.columns.push(column);
2088                Ok(())
2089            }
2090            _ => Err(()),
2091        };
2092
2093        self.update_sql(update)
2094            .map_err(|()| PlanError::Unstructured("expected CREATE TABLE statement".to_string()))?;
2095        Ok(next_version)
2096    }
2097
2098    /// Updates the create_sql field of this item. Returns an error if this is a builtin item,
2099    /// otherwise returns f's result.
2100    pub fn update_sql<F, T>(&mut self, f: F) -> Result<T, ()>
2101    where
2102        F: FnOnce(&mut Statement<Raw>) -> Result<T, ()>,
2103    {
2104        let create_sql = match self {
2105            CatalogItem::Table(Table { create_sql, .. })
2106            | CatalogItem::Type(Type { create_sql, .. })
2107            | CatalogItem::Source(Source { create_sql, .. }) => create_sql.as_mut(),
2108            CatalogItem::Sink(Sink { create_sql, .. })
2109            | CatalogItem::View(View { create_sql, .. })
2110            | CatalogItem::MaterializedView(MaterializedView { create_sql, .. })
2111            | CatalogItem::Index(Index { create_sql, .. })
2112            | CatalogItem::Secret(Secret { create_sql, .. })
2113            | CatalogItem::Connection(Connection { create_sql, .. })
2114            | CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => Some(create_sql),
2115            CatalogItem::Func(_) | CatalogItem::Log(_) => None,
2116        };
2117        let Some(create_sql) = create_sql else {
2118            return Err(());
2119        };
2120        let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
2121            .expect("non-system items must be parseable")
2122            .into_element()
2123            .ast;
2124        debug!("rewrite: {}", ast.to_ast_string_redacted());
2125        let t = f(&mut ast)?;
2126        *create_sql = ast.to_ast_string_stable();
2127        debug!("rewrote: {}", ast.to_ast_string_redacted());
2128        Ok(t)
2129    }
2130
2131    /// If the object is considered a "compute object"
2132    /// (i.e., it is managed by the compute controller),
2133    /// this function returns its cluster ID. Otherwise, it returns nothing.
2134    ///
2135    /// This function differs from `cluster_id` because while all
2136    /// compute objects run on a cluster, the converse is not true.
2137    pub fn is_compute_object_on_cluster(&self) -> Option<ClusterId> {
2138        match self {
2139            CatalogItem::Index(index) => Some(index.cluster_id),
2140            CatalogItem::Table(_)
2141            | CatalogItem::Source(_)
2142            | CatalogItem::Log(_)
2143            | CatalogItem::View(_)
2144            | CatalogItem::MaterializedView(_)
2145            | CatalogItem::Sink(_)
2146            | CatalogItem::Type(_)
2147            | CatalogItem::Func(_)
2148            | CatalogItem::Secret(_)
2149            | CatalogItem::Connection(_)
2150            | CatalogItem::ContinualTask(_) => None,
2151        }
2152    }
2153
2154    pub fn cluster_id(&self) -> Option<ClusterId> {
2155        match self {
2156            CatalogItem::MaterializedView(mv) => Some(mv.cluster_id),
2157            CatalogItem::Index(index) => Some(index.cluster_id),
2158            CatalogItem::Source(source) => match &source.data_source {
2159                DataSourceDesc::Ingestion { cluster_id, .. }
2160                | DataSourceDesc::OldSyntaxIngestion { cluster_id, .. } => Some(*cluster_id),
2161                // This is somewhat of a lie because the export runs on the same
2162                // cluster as its ingestion but we don't yet have a way of
2163                // cross-referencing the items
2164                DataSourceDesc::IngestionExport { .. } => None,
2165                DataSourceDesc::Webhook { cluster_id, .. } => Some(*cluster_id),
2166                DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => None,
2167            },
2168            CatalogItem::Sink(sink) => Some(sink.cluster_id),
2169            CatalogItem::ContinualTask(ct) => Some(ct.cluster_id),
2170            CatalogItem::Table(_)
2171            | CatalogItem::Log(_)
2172            | CatalogItem::View(_)
2173            | CatalogItem::Type(_)
2174            | CatalogItem::Func(_)
2175            | CatalogItem::Secret(_)
2176            | CatalogItem::Connection(_) => None,
2177        }
2178    }
2179
2180    /// The custom compaction window, if any has been set. This does not reflect any propagated
2181    /// compaction window (i.e., source -> subsource).
2182    pub fn custom_logical_compaction_window(&self) -> Option<CompactionWindow> {
2183        match self {
2184            CatalogItem::Table(table) => table.custom_logical_compaction_window,
2185            CatalogItem::Source(source) => source.custom_logical_compaction_window,
2186            CatalogItem::Index(index) => index.custom_logical_compaction_window,
2187            CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
2188            CatalogItem::Log(_)
2189            | CatalogItem::View(_)
2190            | CatalogItem::Sink(_)
2191            | CatalogItem::Type(_)
2192            | CatalogItem::Func(_)
2193            | CatalogItem::Secret(_)
2194            | CatalogItem::Connection(_)
2195            | CatalogItem::ContinualTask(_) => None,
2196        }
2197    }
2198
2199    /// Mutable access to the custom compaction window, or None if this type does not support custom
2200    /// compaction windows. This does not reflect any propagated compaction window (i.e., source ->
2201    /// subsource).
2202    pub fn custom_logical_compaction_window_mut(
2203        &mut self,
2204    ) -> Option<&mut Option<CompactionWindow>> {
2205        let cw = match self {
2206            CatalogItem::Table(table) => &mut table.custom_logical_compaction_window,
2207            CatalogItem::Source(source) => &mut source.custom_logical_compaction_window,
2208            CatalogItem::Index(index) => &mut index.custom_logical_compaction_window,
2209            CatalogItem::MaterializedView(mview) => &mut mview.custom_logical_compaction_window,
2210            CatalogItem::Log(_)
2211            | CatalogItem::View(_)
2212            | CatalogItem::Sink(_)
2213            | CatalogItem::Type(_)
2214            | CatalogItem::Func(_)
2215            | CatalogItem::Secret(_)
2216            | CatalogItem::Connection(_)
2217            | CatalogItem::ContinualTask(_) => return None,
2218        };
2219        Some(cw)
2220    }
2221
2222    /// The initial compaction window, for objects that have one; that is, tables, sources, indexes,
2223    /// and MVs. This does not reflect any propagated compaction window (i.e., source -> subsource).
2224    ///
2225    /// If `custom_logical_compaction_window()` returns something, use that.  Otherwise, use a
2226    /// sensible default (currently 1s).
2227    ///
2228    /// For objects that do not have the concept of compaction window, return None.
2229    pub fn initial_logical_compaction_window(&self) -> Option<CompactionWindow> {
2230        let custom_logical_compaction_window = match self {
2231            CatalogItem::Table(_)
2232            | CatalogItem::Source(_)
2233            | CatalogItem::Index(_)
2234            | CatalogItem::MaterializedView(_)
2235            | CatalogItem::ContinualTask(_) => self.custom_logical_compaction_window(),
2236            CatalogItem::Log(_)
2237            | CatalogItem::View(_)
2238            | CatalogItem::Sink(_)
2239            | CatalogItem::Type(_)
2240            | CatalogItem::Func(_)
2241            | CatalogItem::Secret(_)
2242            | CatalogItem::Connection(_) => return None,
2243        };
2244        Some(custom_logical_compaction_window.unwrap_or(CompactionWindow::Default))
2245    }
2246
2247    /// Whether the item's logical compaction window
2248    /// is controlled by the METRICS_RETENTION
2249    /// system var.
2250    pub fn is_retained_metrics_object(&self) -> bool {
2251        match self {
2252            CatalogItem::Table(table) => table.is_retained_metrics_object,
2253            CatalogItem::Source(source) => source.is_retained_metrics_object,
2254            CatalogItem::Index(index) => index.is_retained_metrics_object,
2255            CatalogItem::Log(_)
2256            | CatalogItem::View(_)
2257            | CatalogItem::MaterializedView(_)
2258            | CatalogItem::Sink(_)
2259            | CatalogItem::Type(_)
2260            | CatalogItem::Func(_)
2261            | CatalogItem::Secret(_)
2262            | CatalogItem::Connection(_)
2263            | CatalogItem::ContinualTask(_) => false,
2264        }
2265    }
2266
2267    pub fn to_serialized(&self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2268        match self {
2269            CatalogItem::Table(table) => {
2270                let create_sql = table
2271                    .create_sql
2272                    .clone()
2273                    .expect("builtin tables cannot be serialized");
2274                let mut collections = table.collections.clone();
2275                let global_id = collections
2276                    .remove(&RelationVersion::root())
2277                    .expect("at least one version");
2278                (create_sql, global_id, collections)
2279            }
2280            CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2281            CatalogItem::Source(source) => {
2282                assert!(
2283                    !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2284                    "cannot serialize introspection/builtin sources",
2285                );
2286                let create_sql = source
2287                    .create_sql
2288                    .clone()
2289                    .expect("builtin sources cannot be serialized");
2290                (create_sql, source.global_id, BTreeMap::new())
2291            }
2292            CatalogItem::View(view) => (view.create_sql.clone(), view.global_id, BTreeMap::new()),
2293            CatalogItem::MaterializedView(mview) => {
2294                (mview.create_sql.clone(), mview.global_id, BTreeMap::new())
2295            }
2296            CatalogItem::Index(index) => {
2297                (index.create_sql.clone(), index.global_id, BTreeMap::new())
2298            }
2299            CatalogItem::Sink(sink) => (sink.create_sql.clone(), sink.global_id, BTreeMap::new()),
2300            CatalogItem::Type(typ) => {
2301                let create_sql = typ
2302                    .create_sql
2303                    .clone()
2304                    .expect("builtin types cannot be serialized");
2305                (create_sql, typ.global_id, BTreeMap::new())
2306            }
2307            CatalogItem::Secret(secret) => {
2308                (secret.create_sql.clone(), secret.global_id, BTreeMap::new())
2309            }
2310            CatalogItem::Connection(connection) => (
2311                connection.create_sql.clone(),
2312                connection.global_id,
2313                BTreeMap::new(),
2314            ),
2315            CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2316            CatalogItem::ContinualTask(ct) => {
2317                (ct.create_sql.clone(), ct.global_id, BTreeMap::new())
2318            }
2319        }
2320    }
2321
2322    pub fn into_serialized(self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2323        match self {
2324            CatalogItem::Table(mut table) => {
2325                let create_sql = table
2326                    .create_sql
2327                    .expect("builtin tables cannot be serialized");
2328                let global_id = table
2329                    .collections
2330                    .remove(&RelationVersion::root())
2331                    .expect("at least one version");
2332                (create_sql, global_id, table.collections)
2333            }
2334            CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2335            CatalogItem::Source(source) => {
2336                assert!(
2337                    !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2338                    "cannot serialize introspection/builtin sources",
2339                );
2340                let create_sql = source
2341                    .create_sql
2342                    .expect("builtin sources cannot be serialized");
2343                (create_sql, source.global_id, BTreeMap::new())
2344            }
2345            CatalogItem::View(view) => (view.create_sql, view.global_id, BTreeMap::new()),
2346            CatalogItem::MaterializedView(mview) => {
2347                (mview.create_sql, mview.global_id, BTreeMap::new())
2348            }
2349            CatalogItem::Index(index) => (index.create_sql, index.global_id, BTreeMap::new()),
2350            CatalogItem::Sink(sink) => (sink.create_sql, sink.global_id, BTreeMap::new()),
2351            CatalogItem::Type(typ) => {
2352                let create_sql = typ.create_sql.expect("builtin types cannot be serialized");
2353                (create_sql, typ.global_id, BTreeMap::new())
2354            }
2355            CatalogItem::Secret(secret) => (secret.create_sql, secret.global_id, BTreeMap::new()),
2356            CatalogItem::Connection(connection) => {
2357                (connection.create_sql, connection.global_id, BTreeMap::new())
2358            }
2359            CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2360            CatalogItem::ContinualTask(ct) => (ct.create_sql, ct.global_id, BTreeMap::new()),
2361        }
2362    }
2363}
2364
2365impl CatalogEntry {
2366    /// Reports the latest [`RelationDesc`] of the rows produced by this [`CatalogEntry`],
2367    /// returning an error if this [`CatalogEntry`] does not produce rows.
2368    ///
2369    /// If you need to get the [`RelationDesc`] for a specific version, see [`CatalogItem::desc`].
2370    pub fn desc_latest(
2371        &self,
2372        name: &FullItemName,
2373    ) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
2374        self.item.desc(name, RelationVersionSelector::Latest)
2375    }
2376
2377    /// Reports the latest [`RelationDesc`] of the rows produced by this [`CatalogEntry`], if it
2378    /// produces rows.
2379    pub fn desc_opt_latest(&self) -> Option<Cow<'_, RelationDesc>> {
2380        self.item.desc_opt(RelationVersionSelector::Latest)
2381    }
2382
2383    /// Reports if the item has columns.
2384    pub fn has_columns(&self) -> bool {
2385        self.desc_opt_latest().is_some()
2386    }
2387
2388    /// Returns the [`mz_sql::func::Func`] associated with this `CatalogEntry`.
2389    pub fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
2390        self.item.func(self)
2391    }
2392
2393    /// Returns the inner [`Index`] if this entry is an index, else `None`.
2394    pub fn index(&self) -> Option<&Index> {
2395        match self.item() {
2396            CatalogItem::Index(idx) => Some(idx),
2397            _ => None,
2398        }
2399    }
2400
2401    /// Returns the inner [`MaterializedView`] if this entry is a materialized view, else `None`.
2402    pub fn materialized_view(&self) -> Option<&MaterializedView> {
2403        match self.item() {
2404            CatalogItem::MaterializedView(mv) => Some(mv),
2405            _ => None,
2406        }
2407    }
2408
2409    /// Returns the inner [`Table`] if this entry is a table, else `None`.
2410    pub fn table(&self) -> Option<&Table> {
2411        match self.item() {
2412            CatalogItem::Table(tbl) => Some(tbl),
2413            _ => None,
2414        }
2415    }
2416
2417    /// Returns the inner [`Source`] if this entry is a source, else `None`.
2418    pub fn source(&self) -> Option<&Source> {
2419        match self.item() {
2420            CatalogItem::Source(src) => Some(src),
2421            _ => None,
2422        }
2423    }
2424
2425    /// Returns the inner [`Sink`] if this entry is a sink, else `None`.
2426    pub fn sink(&self) -> Option<&Sink> {
2427        match self.item() {
2428            CatalogItem::Sink(sink) => Some(sink),
2429            _ => None,
2430        }
2431    }
2432
2433    /// Returns the inner [`Secret`] if this entry is a secret, else `None`.
2434    pub fn secret(&self) -> Option<&Secret> {
2435        match self.item() {
2436            CatalogItem::Secret(secret) => Some(secret),
2437            _ => None,
2438        }
2439    }
2440
2441    pub fn connection(&self) -> Result<&Connection, SqlCatalogError> {
2442        match self.item() {
2443            CatalogItem::Connection(connection) => Ok(connection),
2444            _ => {
2445                let db_name = match self.name().qualifiers.database_spec {
2446                    ResolvedDatabaseSpecifier::Ambient => "".to_string(),
2447                    ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
2448                };
2449                Err(SqlCatalogError::UnknownConnection(format!(
2450                    "{}{}.{}",
2451                    db_name,
2452                    self.name().qualifiers.schema_spec,
2453                    self.name().item
2454                )))
2455            }
2456        }
2457    }
2458
2459    /// Returns the [`mz_storage_types::sources::SourceDesc`] associated with
2460    /// this `CatalogEntry`, if any.
2461    pub fn source_desc(
2462        &self,
2463    ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
2464        self.item.source_desc(self)
2465    }
2466
2467    /// Reports whether this catalog entry is a connection.
2468    pub fn is_connection(&self) -> bool {
2469        matches!(self.item(), CatalogItem::Connection(_))
2470    }
2471
2472    /// Reports whether this catalog entry is a table.
2473    pub fn is_table(&self) -> bool {
2474        matches!(self.item(), CatalogItem::Table(_))
2475    }
2476
2477    /// Reports whether this catalog entry is a source. Note that this includes
2478    /// subsources.
2479    pub fn is_source(&self) -> bool {
2480        matches!(self.item(), CatalogItem::Source(_))
2481    }
2482
2483    /// Reports whether this catalog entry is a subsource and, if it is, the
2484    /// ingestion it is an export of, as well as the item it exports.
2485    pub fn subsource_details(
2486        &self,
2487    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
2488        match &self.item() {
2489            CatalogItem::Source(source) => match &source.data_source {
2490                DataSourceDesc::IngestionExport {
2491                    ingestion_id,
2492                    external_reference,
2493                    details,
2494                    data_config: _,
2495                } => Some((*ingestion_id, external_reference, details)),
2496                _ => None,
2497            },
2498            _ => None,
2499        }
2500    }
2501
2502    /// Reports whether this catalog entry is a source export and, if it is, the
2503    /// ingestion it is an export of, as well as the item it exports.
2504    pub fn source_export_details(
2505        &self,
2506    ) -> Option<(
2507        CatalogItemId,
2508        &UnresolvedItemName,
2509        &SourceExportDetails,
2510        &SourceExportDataConfig<ReferencedConnection>,
2511    )> {
2512        match &self.item() {
2513            CatalogItem::Source(source) => match &source.data_source {
2514                DataSourceDesc::IngestionExport {
2515                    ingestion_id,
2516                    external_reference,
2517                    details,
2518                    data_config,
2519                } => Some((*ingestion_id, external_reference, details, data_config)),
2520                _ => None,
2521            },
2522            CatalogItem::Table(table) => match &table.data_source {
2523                TableDataSource::DataSource {
2524                    desc:
2525                        DataSourceDesc::IngestionExport {
2526                            ingestion_id,
2527                            external_reference,
2528                            details,
2529                            data_config,
2530                        },
2531                    timeline: _,
2532                } => Some((*ingestion_id, external_reference, details, data_config)),
2533                _ => None,
2534            },
2535            _ => None,
2536        }
2537    }
2538
2539    /// Reports whether this catalog entry is a progress source.
2540    pub fn is_progress_source(&self) -> bool {
2541        self.item().is_progress_source()
2542    }
2543
2544    /// Returns the `GlobalId` of all of this entry's progress ID.
2545    pub fn progress_id(&self) -> Option<CatalogItemId> {
2546        match &self.item() {
2547            CatalogItem::Source(source) => match &source.data_source {
2548                DataSourceDesc::Ingestion { .. } => Some(self.id),
2549                DataSourceDesc::OldSyntaxIngestion {
2550                    progress_subsource, ..
2551                } => Some(*progress_subsource),
2552                DataSourceDesc::IngestionExport { .. }
2553                | DataSourceDesc::Introspection(_)
2554                | DataSourceDesc::Progress
2555                | DataSourceDesc::Webhook { .. } => None,
2556            },
2557            CatalogItem::Table(_)
2558            | CatalogItem::Log(_)
2559            | CatalogItem::View(_)
2560            | CatalogItem::MaterializedView(_)
2561            | CatalogItem::Sink(_)
2562            | CatalogItem::Index(_)
2563            | CatalogItem::Type(_)
2564            | CatalogItem::Func(_)
2565            | CatalogItem::Secret(_)
2566            | CatalogItem::Connection(_)
2567            | CatalogItem::ContinualTask(_) => None,
2568        }
2569    }
2570
2571    /// Reports whether this catalog entry is a sink.
2572    pub fn is_sink(&self) -> bool {
2573        matches!(self.item(), CatalogItem::Sink(_))
2574    }
2575
2576    /// Reports whether this catalog entry is a materialized view.
2577    pub fn is_materialized_view(&self) -> bool {
2578        matches!(self.item(), CatalogItem::MaterializedView(_))
2579    }
2580
2581    /// Reports whether this catalog entry is a view.
2582    pub fn is_view(&self) -> bool {
2583        matches!(self.item(), CatalogItem::View(_))
2584    }
2585
2586    /// Reports whether this catalog entry is a secret.
2587    pub fn is_secret(&self) -> bool {
2588        matches!(self.item(), CatalogItem::Secret(_))
2589    }
2590
2591    /// Reports whether this catalog entry is an introspection source.
2592    pub fn is_introspection_source(&self) -> bool {
2593        matches!(self.item(), CatalogItem::Log(_))
2594    }
2595
2596    /// Reports whether this catalog entry is an index.
2597    pub fn is_index(&self) -> bool {
2598        matches!(self.item(), CatalogItem::Index(_))
2599    }
2600
2601    /// Reports whether this catalog entry is a continual task.
2602    pub fn is_continual_task(&self) -> bool {
2603        matches!(self.item(), CatalogItem::ContinualTask(_))
2604    }
2605
2606    /// Reports whether this catalog entry can be treated as a relation, it can produce rows.
2607    pub fn is_relation(&self) -> bool {
2608        mz_sql::catalog::ObjectType::from(self.item_type()).is_relation()
2609    }
2610
2611    /// Collects the identifiers of the objects that were encountered when
2612    /// resolving names in the item's DDL statement.
2613    pub fn references(&self) -> &ResolvedIds {
2614        self.item.references()
2615    }
2616
2617    /// Collects the identifiers of the objects used by this [`CatalogEntry`].
2618    ///
2619    /// Like [`CatalogEntry::references()`] but also includes objects that are not directly
2620    /// referenced. For example this will include any catalog objects used to implement functions
2621    /// and casts in the item.
2622    pub fn uses(&self) -> BTreeSet<CatalogItemId> {
2623        self.item.uses()
2624    }
2625
2626    /// Returns the `CatalogItem` associated with this catalog entry.
2627    pub fn item(&self) -> &CatalogItem {
2628        &self.item
2629    }
2630
2631    /// Returns the [`CatalogItemId`] of this catalog entry.
2632    pub fn id(&self) -> CatalogItemId {
2633        self.id
2634    }
2635
2636    /// Returns all of the [`GlobalId`]s associated with this item.
2637    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2638        self.item().global_ids()
2639    }
2640
2641    pub fn latest_global_id(&self) -> GlobalId {
2642        self.item().latest_global_id()
2643    }
2644
2645    /// Returns the OID of this catalog entry.
2646    pub fn oid(&self) -> u32 {
2647        self.oid
2648    }
2649
2650    /// Returns the fully qualified name of this catalog entry.
2651    pub fn name(&self) -> &QualifiedItemName {
2652        &self.name
2653    }
2654
2655    /// Returns the identifiers of the dataflows that are directly referenced by this dataflow.
2656    pub fn referenced_by(&self) -> &[CatalogItemId] {
2657        &self.referenced_by
2658    }
2659
2660    /// Returns the identifiers of the dataflows that depend upon this dataflow.
2661    pub fn used_by(&self) -> &[CatalogItemId] {
2662        &self.used_by
2663    }
2664
2665    /// Returns the connection ID that this item belongs to, if this item is
2666    /// temporary.
2667    pub fn conn_id(&self) -> Option<&ConnectionId> {
2668        self.item.conn_id()
2669    }
2670
2671    /// Returns the role ID of the entry owner.
2672    pub fn owner_id(&self) -> &RoleId {
2673        &self.owner_id
2674    }
2675
2676    /// Returns the privileges of the entry.
2677    pub fn privileges(&self) -> &PrivilegeMap {
2678        &self.privileges
2679    }
2680}
2681
2682#[derive(Debug, Clone, Default)]
2683pub struct CommentsMap {
2684    map: BTreeMap<CommentObjectId, BTreeMap<Option<usize>, String>>,
2685}
2686
2687impl CommentsMap {
2688    pub fn update_comment(
2689        &mut self,
2690        object_id: CommentObjectId,
2691        sub_component: Option<usize>,
2692        comment: Option<String>,
2693    ) -> Option<String> {
2694        let object_comments = self.map.entry(object_id).or_default();
2695
2696        // Either replace the existing comment, or remove it if comment is None/NULL.
2697        let (empty, prev) = if let Some(comment) = comment {
2698            let prev = object_comments.insert(sub_component, comment);
2699            (false, prev)
2700        } else {
2701            let prev = object_comments.remove(&sub_component);
2702            (object_comments.is_empty(), prev)
2703        };
2704
2705        // Cleanup entries that are now empty.
2706        if empty {
2707            self.map.remove(&object_id);
2708        }
2709
2710        // Return the previous comment, if there was one, for easy removal.
2711        prev
2712    }
2713
2714    /// Remove all comments for `object_id` from the map.
2715    ///
2716    /// Generally there is one comment for a given [`CommentObjectId`], but in the case of
2717    /// relations you can also have comments on the individual columns. Dropping the comments for a
2718    /// relation will also drop all of the comments on any columns.
2719    pub fn drop_comments(
2720        &mut self,
2721        object_ids: &BTreeSet<CommentObjectId>,
2722    ) -> Vec<(CommentObjectId, Option<usize>, String)> {
2723        let mut removed_comments = Vec::new();
2724
2725        for object_id in object_ids {
2726            if let Some(comments) = self.map.remove(object_id) {
2727                let removed = comments
2728                    .into_iter()
2729                    .map(|(sub_comp, comment)| (object_id.clone(), sub_comp, comment));
2730                removed_comments.extend(removed);
2731            }
2732        }
2733
2734        removed_comments
2735    }
2736
2737    pub fn iter(&self) -> impl Iterator<Item = (CommentObjectId, Option<usize>, &str)> {
2738        self.map
2739            .iter()
2740            .map(|(id, comments)| {
2741                comments
2742                    .iter()
2743                    .map(|(pos, comment)| (*id, *pos, comment.as_str()))
2744            })
2745            .flatten()
2746    }
2747
2748    pub fn get_object_comments(
2749        &self,
2750        object_id: CommentObjectId,
2751    ) -> Option<&BTreeMap<Option<usize>, String>> {
2752        self.map.get(&object_id)
2753    }
2754}
2755
2756impl Serialize for CommentsMap {
2757    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2758    where
2759        S: serde::Serializer,
2760    {
2761        let comment_count = self
2762            .map
2763            .iter()
2764            .map(|(_object_id, comments)| comments.len())
2765            .sum();
2766
2767        let mut seq = serializer.serialize_seq(Some(comment_count))?;
2768        for (object_id, sub) in &self.map {
2769            for (sub_component, comment) in sub {
2770                seq.serialize_element(&(
2771                    format!("{object_id:?}"),
2772                    format!("{sub_component:?}"),
2773                    comment,
2774                ))?;
2775            }
2776        }
2777        seq.end()
2778    }
2779}
2780
2781#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
2782pub struct DefaultPrivileges {
2783    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
2784    privileges: BTreeMap<DefaultPrivilegeObject, RoleDefaultPrivileges>,
2785}
2786
2787// Use a new type here because otherwise we have two levels of BTreeMap, both needing
2788// map_key_to_string.
2789#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
2790struct RoleDefaultPrivileges(
2791    /// Denormalized, the key is the grantee Role.
2792    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
2793    BTreeMap<RoleId, DefaultPrivilegeAclItem>,
2794);
2795
2796impl Deref for RoleDefaultPrivileges {
2797    type Target = BTreeMap<RoleId, DefaultPrivilegeAclItem>;
2798
2799    fn deref(&self) -> &Self::Target {
2800        &self.0
2801    }
2802}
2803
2804impl DerefMut for RoleDefaultPrivileges {
2805    fn deref_mut(&mut self) -> &mut Self::Target {
2806        &mut self.0
2807    }
2808}
2809
2810impl DefaultPrivileges {
2811    /// Add a new default privilege into the set of all default privileges.
2812    pub fn grant(&mut self, object: DefaultPrivilegeObject, privilege: DefaultPrivilegeAclItem) {
2813        if privilege.acl_mode.is_empty() {
2814            return;
2815        }
2816
2817        let privileges = self.privileges.entry(object).or_default();
2818        if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
2819            default_privilege.acl_mode |= privilege.acl_mode;
2820        } else {
2821            privileges.insert(privilege.grantee, privilege);
2822        }
2823    }
2824
2825    /// Revoke a default privilege from the set of all default privileges.
2826    pub fn revoke(&mut self, object: &DefaultPrivilegeObject, privilege: &DefaultPrivilegeAclItem) {
2827        if let Some(privileges) = self.privileges.get_mut(object) {
2828            if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
2829                default_privilege.acl_mode =
2830                    default_privilege.acl_mode.difference(privilege.acl_mode);
2831                if default_privilege.acl_mode.is_empty() {
2832                    privileges.remove(&privilege.grantee);
2833                }
2834            }
2835            if privileges.is_empty() {
2836                self.privileges.remove(object);
2837            }
2838        }
2839    }
2840
2841    /// Get the privileges that will be granted on all objects matching `object` to `grantee`, if
2842    /// any exist.
2843    pub fn get_privileges_for_grantee(
2844        &self,
2845        object: &DefaultPrivilegeObject,
2846        grantee: &RoleId,
2847    ) -> Option<&AclMode> {
2848        self.privileges
2849            .get(object)
2850            .and_then(|privileges| privileges.get(grantee))
2851            .map(|privilege| &privilege.acl_mode)
2852    }
2853
2854    /// Get all default privileges that apply to the provided object details.
2855    pub fn get_applicable_privileges(
2856        &self,
2857        role_id: RoleId,
2858        database_id: Option<DatabaseId>,
2859        schema_id: Option<SchemaId>,
2860        object_type: mz_sql::catalog::ObjectType,
2861    ) -> impl Iterator<Item = DefaultPrivilegeAclItem> + '_ {
2862        // Privileges consider all relations to be of type table due to PostgreSQL compatibility. We
2863        // don't require the caller to worry about that and we will map their `object_type` to the
2864        // correct type for privileges.
2865        let privilege_object_type = if object_type.is_relation() {
2866            mz_sql::catalog::ObjectType::Table
2867        } else {
2868            object_type
2869        };
2870        let valid_acl_mode = rbac::all_object_privileges(SystemObjectType::Object(object_type));
2871
2872        // Collect all entries that apply to the provided object details.
2873        // If either `database_id` or `schema_id` are `None`, then we might end up with duplicate
2874        // entries in the vec below. That's OK because we consolidate the results after.
2875        [
2876            DefaultPrivilegeObject {
2877                role_id,
2878                database_id,
2879                schema_id,
2880                object_type: privilege_object_type,
2881            },
2882            DefaultPrivilegeObject {
2883                role_id,
2884                database_id,
2885                schema_id: None,
2886                object_type: privilege_object_type,
2887            },
2888            DefaultPrivilegeObject {
2889                role_id,
2890                database_id: None,
2891                schema_id: None,
2892                object_type: privilege_object_type,
2893            },
2894            DefaultPrivilegeObject {
2895                role_id: RoleId::Public,
2896                database_id,
2897                schema_id,
2898                object_type: privilege_object_type,
2899            },
2900            DefaultPrivilegeObject {
2901                role_id: RoleId::Public,
2902                database_id,
2903                schema_id: None,
2904                object_type: privilege_object_type,
2905            },
2906            DefaultPrivilegeObject {
2907                role_id: RoleId::Public,
2908                database_id: None,
2909                schema_id: None,
2910                object_type: privilege_object_type,
2911            },
2912        ]
2913        .into_iter()
2914        .filter_map(|object| self.privileges.get(&object))
2915        .flat_map(|acl_map| acl_map.values())
2916        // Consolidate privileges with a common grantee.
2917        .fold(
2918            BTreeMap::new(),
2919            |mut accum, DefaultPrivilegeAclItem { grantee, acl_mode }| {
2920                let accum_acl_mode = accum.entry(grantee).or_insert_with(AclMode::empty);
2921                *accum_acl_mode |= *acl_mode;
2922                accum
2923            },
2924        )
2925        .into_iter()
2926        // Restrict the acl_mode to only privileges valid for the provided object type. If the
2927        // default privilege has an object type of Table, then it may contain privileges valid for
2928        // tables but not other relations. If the passed in object type is another relation, then
2929        // we need to remove any privilege that is not valid for the specified relation.
2930        .map(move |(grantee, acl_mode)| (grantee, acl_mode & valid_acl_mode))
2931        // Filter out empty privileges.
2932        .filter(|(_, acl_mode)| !acl_mode.is_empty())
2933        .map(|(grantee, acl_mode)| DefaultPrivilegeAclItem {
2934            grantee: *grantee,
2935            acl_mode,
2936        })
2937    }
2938
2939    pub fn iter(
2940        &self,
2941    ) -> impl Iterator<
2942        Item = (
2943            &DefaultPrivilegeObject,
2944            impl Iterator<Item = &DefaultPrivilegeAclItem>,
2945        ),
2946    > {
2947        self.privileges
2948            .iter()
2949            .map(|(object, acl_map)| (object, acl_map.values()))
2950    }
2951}
2952
2953#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
2954pub struct ClusterConfig {
2955    pub variant: ClusterVariant,
2956    pub workload_class: Option<String>,
2957}
2958
2959impl ClusterConfig {
2960    pub fn features(&self) -> Option<&OptimizerFeatureOverrides> {
2961        match &self.variant {
2962            ClusterVariant::Managed(managed) => Some(&managed.optimizer_feature_overrides),
2963            ClusterVariant::Unmanaged => None,
2964        }
2965    }
2966}
2967
2968impl From<ClusterConfig> for durable::ClusterConfig {
2969    fn from(config: ClusterConfig) -> Self {
2970        Self {
2971            variant: config.variant.into(),
2972            workload_class: config.workload_class,
2973        }
2974    }
2975}
2976
2977impl From<durable::ClusterConfig> for ClusterConfig {
2978    fn from(config: durable::ClusterConfig) -> Self {
2979        Self {
2980            variant: config.variant.into(),
2981            workload_class: config.workload_class,
2982        }
2983    }
2984}
2985
2986#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
2987pub struct ClusterVariantManaged {
2988    pub size: String,
2989    pub availability_zones: Vec<String>,
2990    pub logging: ReplicaLogging,
2991    pub replication_factor: u32,
2992    pub optimizer_feature_overrides: OptimizerFeatureOverrides,
2993    pub schedule: ClusterSchedule,
2994}
2995
2996impl From<ClusterVariantManaged> for durable::ClusterVariantManaged {
2997    fn from(managed: ClusterVariantManaged) -> Self {
2998        Self {
2999            size: managed.size,
3000            availability_zones: managed.availability_zones,
3001            logging: managed.logging,
3002            replication_factor: managed.replication_factor,
3003            optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3004            schedule: managed.schedule,
3005        }
3006    }
3007}
3008
3009impl From<durable::ClusterVariantManaged> for ClusterVariantManaged {
3010    fn from(managed: durable::ClusterVariantManaged) -> Self {
3011        Self {
3012            size: managed.size,
3013            availability_zones: managed.availability_zones,
3014            logging: managed.logging,
3015            replication_factor: managed.replication_factor,
3016            optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3017            schedule: managed.schedule,
3018        }
3019    }
3020}
3021
3022#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3023pub enum ClusterVariant {
3024    Managed(ClusterVariantManaged),
3025    Unmanaged,
3026}
3027
3028impl From<ClusterVariant> for durable::ClusterVariant {
3029    fn from(variant: ClusterVariant) -> Self {
3030        match variant {
3031            ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3032            ClusterVariant::Unmanaged => Self::Unmanaged,
3033        }
3034    }
3035}
3036
3037impl From<durable::ClusterVariant> for ClusterVariant {
3038    fn from(variant: durable::ClusterVariant) -> Self {
3039        match variant {
3040            durable::ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3041            durable::ClusterVariant::Unmanaged => Self::Unmanaged,
3042        }
3043    }
3044}
3045
3046impl mz_sql::catalog::CatalogDatabase for Database {
3047    fn name(&self) -> &str {
3048        &self.name
3049    }
3050
3051    fn id(&self) -> DatabaseId {
3052        self.id
3053    }
3054
3055    fn has_schemas(&self) -> bool {
3056        !self.schemas_by_name.is_empty()
3057    }
3058
3059    fn schema_ids(&self) -> &BTreeMap<String, SchemaId> {
3060        &self.schemas_by_name
3061    }
3062
3063    // `as` is ok to use to cast to a trait object.
3064    #[allow(clippy::as_conversions)]
3065    fn schemas(&self) -> Vec<&dyn CatalogSchema> {
3066        self.schemas_by_id
3067            .values()
3068            .map(|schema| schema as &dyn CatalogSchema)
3069            .collect()
3070    }
3071
3072    fn owner_id(&self) -> RoleId {
3073        self.owner_id
3074    }
3075
3076    fn privileges(&self) -> &PrivilegeMap {
3077        &self.privileges
3078    }
3079}
3080
3081impl mz_sql::catalog::CatalogSchema for Schema {
3082    fn database(&self) -> &ResolvedDatabaseSpecifier {
3083        &self.name.database
3084    }
3085
3086    fn name(&self) -> &QualifiedSchemaName {
3087        &self.name
3088    }
3089
3090    fn id(&self) -> &SchemaSpecifier {
3091        &self.id
3092    }
3093
3094    fn has_items(&self) -> bool {
3095        !self.items.is_empty()
3096    }
3097
3098    fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_> {
3099        Box::new(
3100            self.items
3101                .values()
3102                .chain(self.functions.values())
3103                .chain(self.types.values())
3104                .copied(),
3105        )
3106    }
3107
3108    fn owner_id(&self) -> RoleId {
3109        self.owner_id
3110    }
3111
3112    fn privileges(&self) -> &PrivilegeMap {
3113        &self.privileges
3114    }
3115}
3116
3117impl mz_sql::catalog::CatalogRole for Role {
3118    fn name(&self) -> &str {
3119        &self.name
3120    }
3121
3122    fn id(&self) -> RoleId {
3123        self.id
3124    }
3125
3126    fn membership(&self) -> &BTreeMap<RoleId, RoleId> {
3127        &self.membership.map
3128    }
3129
3130    fn attributes(&self) -> &RoleAttributes {
3131        &self.attributes
3132    }
3133
3134    fn vars(&self) -> &BTreeMap<String, OwnedVarInput> {
3135        &self.vars.map
3136    }
3137}
3138
3139impl mz_sql::catalog::CatalogNetworkPolicy for NetworkPolicy {
3140    fn name(&self) -> &str {
3141        &self.name
3142    }
3143
3144    fn id(&self) -> NetworkPolicyId {
3145        self.id
3146    }
3147
3148    fn owner_id(&self) -> RoleId {
3149        self.owner_id
3150    }
3151
3152    fn privileges(&self) -> &PrivilegeMap {
3153        &self.privileges
3154    }
3155}
3156
3157impl mz_sql::catalog::CatalogCluster<'_> for Cluster {
3158    fn name(&self) -> &str {
3159        &self.name
3160    }
3161
3162    fn id(&self) -> ClusterId {
3163        self.id
3164    }
3165
3166    fn bound_objects(&self) -> &BTreeSet<CatalogItemId> {
3167        &self.bound_objects
3168    }
3169
3170    fn replica_ids(&self) -> &BTreeMap<String, ReplicaId> {
3171        &self.replica_id_by_name_
3172    }
3173
3174    // `as` is ok to use to cast to a trait object.
3175    #[allow(clippy::as_conversions)]
3176    fn replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>> {
3177        self.replicas()
3178            .map(|replica| replica as &dyn CatalogClusterReplica)
3179            .collect()
3180    }
3181
3182    fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica<'_> {
3183        self.replica(id).expect("catalog out of sync")
3184    }
3185
3186    fn owner_id(&self) -> RoleId {
3187        self.owner_id
3188    }
3189
3190    fn privileges(&self) -> &PrivilegeMap {
3191        &self.privileges
3192    }
3193
3194    fn is_managed(&self) -> bool {
3195        self.is_managed()
3196    }
3197
3198    fn managed_size(&self) -> Option<&str> {
3199        match &self.config.variant {
3200            ClusterVariant::Managed(ClusterVariantManaged { size, .. }) => Some(size),
3201            _ => None,
3202        }
3203    }
3204
3205    fn schedule(&self) -> Option<&ClusterSchedule> {
3206        match &self.config.variant {
3207            ClusterVariant::Managed(ClusterVariantManaged { schedule, .. }) => Some(schedule),
3208            _ => None,
3209        }
3210    }
3211
3212    fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
3213        self.try_to_plan()
3214    }
3215}
3216
3217impl mz_sql::catalog::CatalogClusterReplica<'_> for ClusterReplica {
3218    fn name(&self) -> &str {
3219        &self.name
3220    }
3221
3222    fn cluster_id(&self) -> ClusterId {
3223        self.cluster_id
3224    }
3225
3226    fn replica_id(&self) -> ReplicaId {
3227        self.replica_id
3228    }
3229
3230    fn owner_id(&self) -> RoleId {
3231        self.owner_id
3232    }
3233
3234    fn internal(&self) -> bool {
3235        self.config.location.internal()
3236    }
3237}
3238
3239impl mz_sql::catalog::CatalogItem for CatalogEntry {
3240    fn name(&self) -> &QualifiedItemName {
3241        self.name()
3242    }
3243
3244    fn id(&self) -> CatalogItemId {
3245        self.id()
3246    }
3247
3248    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
3249        Box::new(self.global_ids())
3250    }
3251
3252    fn oid(&self) -> u32 {
3253        self.oid()
3254    }
3255
3256    fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
3257        self.func()
3258    }
3259
3260    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
3261        self.source_desc()
3262    }
3263
3264    fn connection(
3265        &self,
3266    ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
3267    {
3268        Ok(self.connection()?.details.to_connection())
3269    }
3270
3271    fn create_sql(&self) -> &str {
3272        match self.item() {
3273            CatalogItem::Table(Table { create_sql, .. }) => {
3274                create_sql.as_deref().unwrap_or("<builtin>")
3275            }
3276            CatalogItem::Source(Source { create_sql, .. }) => {
3277                create_sql.as_deref().unwrap_or("<builtin>")
3278            }
3279            CatalogItem::Sink(Sink { create_sql, .. }) => create_sql,
3280            CatalogItem::View(View { create_sql, .. }) => create_sql,
3281            CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) => create_sql,
3282            CatalogItem::Index(Index { create_sql, .. }) => create_sql,
3283            CatalogItem::Type(Type { create_sql, .. }) => {
3284                create_sql.as_deref().unwrap_or("<builtin>")
3285            }
3286            CatalogItem::Secret(Secret { create_sql, .. }) => create_sql,
3287            CatalogItem::Connection(Connection { create_sql, .. }) => create_sql,
3288            CatalogItem::Func(_) => "<builtin>",
3289            CatalogItem::Log(_) => "<builtin>",
3290            CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => create_sql,
3291        }
3292    }
3293
3294    fn item_type(&self) -> SqlCatalogItemType {
3295        self.item().typ()
3296    }
3297
3298    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
3299        if let CatalogItem::Index(Index { keys, on, .. }) = self.item() {
3300            Some((keys, *on))
3301        } else {
3302            None
3303        }
3304    }
3305
3306    fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
3307        if let CatalogItem::Table(Table {
3308            data_source: TableDataSource::TableWrites { defaults },
3309            ..
3310        }) = self.item()
3311        {
3312            Some(defaults.as_slice())
3313        } else {
3314            None
3315        }
3316    }
3317
3318    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
3319        if let CatalogItem::Type(Type { details, .. }) = self.item() {
3320            Some(details)
3321        } else {
3322            None
3323        }
3324    }
3325
3326    fn references(&self) -> &ResolvedIds {
3327        self.references()
3328    }
3329
3330    fn uses(&self) -> BTreeSet<CatalogItemId> {
3331        self.uses()
3332    }
3333
3334    fn referenced_by(&self) -> &[CatalogItemId] {
3335        self.referenced_by()
3336    }
3337
3338    fn used_by(&self) -> &[CatalogItemId] {
3339        self.used_by()
3340    }
3341
3342    fn subsource_details(
3343        &self,
3344    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
3345        self.subsource_details()
3346    }
3347
3348    fn source_export_details(
3349        &self,
3350    ) -> Option<(
3351        CatalogItemId,
3352        &UnresolvedItemName,
3353        &SourceExportDetails,
3354        &SourceExportDataConfig<ReferencedConnection>,
3355    )> {
3356        self.source_export_details()
3357    }
3358
3359    fn is_progress_source(&self) -> bool {
3360        self.is_progress_source()
3361    }
3362
3363    fn progress_id(&self) -> Option<CatalogItemId> {
3364        self.progress_id()
3365    }
3366
3367    fn owner_id(&self) -> RoleId {
3368        self.owner_id
3369    }
3370
3371    fn privileges(&self) -> &PrivilegeMap {
3372        &self.privileges
3373    }
3374
3375    fn cluster_id(&self) -> Option<ClusterId> {
3376        self.item().cluster_id()
3377    }
3378
3379    fn at_version(
3380        &self,
3381        version: RelationVersionSelector,
3382    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
3383        Box::new(CatalogCollectionEntry {
3384            entry: self.clone(),
3385            version,
3386        })
3387    }
3388
3389    fn latest_version(&self) -> Option<RelationVersion> {
3390        self.table().map(|t| t.desc.latest_version())
3391    }
3392}
3393
3394/// A single update to the catalog state.
3395#[derive(Debug)]
3396pub struct StateUpdate {
3397    pub kind: StateUpdateKind,
3398    pub ts: Timestamp,
3399    pub diff: StateDiff,
3400}
3401
3402/// The contents of a single state update.
3403///
3404/// Variants are listed in dependency order.
3405#[derive(Debug, Clone)]
3406pub enum StateUpdateKind {
3407    Role(durable::objects::Role),
3408    RoleAuth(durable::objects::RoleAuth),
3409    Database(durable::objects::Database),
3410    Schema(durable::objects::Schema),
3411    DefaultPrivilege(durable::objects::DefaultPrivilege),
3412    SystemPrivilege(MzAclItem),
3413    SystemConfiguration(durable::objects::SystemConfiguration),
3414    Cluster(durable::objects::Cluster),
3415    NetworkPolicy(durable::objects::NetworkPolicy),
3416    IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3417    ClusterReplica(durable::objects::ClusterReplica),
3418    SourceReferences(durable::objects::SourceReferences),
3419    SystemObjectMapping(durable::objects::SystemObjectMapping),
3420    // Temporary items are not actually updated via the durable catalog, but this allows us to
3421    // model them the same way as all other items.
3422    TemporaryItem(TemporaryItem),
3423    Item(durable::objects::Item),
3424    Comment(durable::objects::Comment),
3425    AuditLog(durable::objects::AuditLog),
3426    // Storage updates.
3427    StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3428    UnfinalizedShard(durable::objects::UnfinalizedShard),
3429}
3430
3431/// Valid diffs for catalog state updates.
3432#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
3433pub enum StateDiff {
3434    Retraction,
3435    Addition,
3436}
3437
3438impl From<StateDiff> for Diff {
3439    fn from(diff: StateDiff) -> Self {
3440        match diff {
3441            StateDiff::Retraction => Diff::MINUS_ONE,
3442            StateDiff::Addition => Diff::ONE,
3443        }
3444    }
3445}
3446impl TryFrom<Diff> for StateDiff {
3447    type Error = String;
3448
3449    fn try_from(diff: Diff) -> Result<Self, Self::Error> {
3450        match diff {
3451            Diff::MINUS_ONE => Ok(Self::Retraction),
3452            Diff::ONE => Ok(Self::Addition),
3453            diff => Err(format!("invalid diff {diff}")),
3454        }
3455    }
3456}
3457
3458/// Information needed to process an update to a temporary item.
3459#[derive(Debug, Clone)]
3460pub struct TemporaryItem {
3461    pub id: CatalogItemId,
3462    pub oid: u32,
3463    pub name: QualifiedItemName,
3464    pub item: CatalogItem,
3465    pub owner_id: RoleId,
3466    pub privileges: PrivilegeMap,
3467}
3468
3469impl From<CatalogEntry> for TemporaryItem {
3470    fn from(entry: CatalogEntry) -> Self {
3471        TemporaryItem {
3472            id: entry.id,
3473            oid: entry.oid,
3474            name: entry.name,
3475            item: entry.item,
3476            owner_id: entry.owner_id,
3477            privileges: entry.privileges,
3478        }
3479    }
3480}
3481
3482/// The same as [`StateUpdateKind`], but without `TemporaryItem` so we can derive [`Ord`].
3483#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
3484pub enum BootstrapStateUpdateKind {
3485    Role(durable::objects::Role),
3486    RoleAuth(durable::objects::RoleAuth),
3487    Database(durable::objects::Database),
3488    Schema(durable::objects::Schema),
3489    DefaultPrivilege(durable::objects::DefaultPrivilege),
3490    SystemPrivilege(MzAclItem),
3491    SystemConfiguration(durable::objects::SystemConfiguration),
3492    Cluster(durable::objects::Cluster),
3493    NetworkPolicy(durable::objects::NetworkPolicy),
3494    IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3495    ClusterReplica(durable::objects::ClusterReplica),
3496    SourceReferences(durable::objects::SourceReferences),
3497    SystemObjectMapping(durable::objects::SystemObjectMapping),
3498    Item(durable::objects::Item),
3499    Comment(durable::objects::Comment),
3500    AuditLog(durable::objects::AuditLog),
3501    // Storage updates.
3502    StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3503    UnfinalizedShard(durable::objects::UnfinalizedShard),
3504}
3505
3506impl From<BootstrapStateUpdateKind> for StateUpdateKind {
3507    fn from(value: BootstrapStateUpdateKind) -> Self {
3508        match value {
3509            BootstrapStateUpdateKind::Role(kind) => StateUpdateKind::Role(kind),
3510            BootstrapStateUpdateKind::RoleAuth(kind) => StateUpdateKind::RoleAuth(kind),
3511            BootstrapStateUpdateKind::Database(kind) => StateUpdateKind::Database(kind),
3512            BootstrapStateUpdateKind::Schema(kind) => StateUpdateKind::Schema(kind),
3513            BootstrapStateUpdateKind::DefaultPrivilege(kind) => {
3514                StateUpdateKind::DefaultPrivilege(kind)
3515            }
3516            BootstrapStateUpdateKind::SystemPrivilege(kind) => {
3517                StateUpdateKind::SystemPrivilege(kind)
3518            }
3519            BootstrapStateUpdateKind::SystemConfiguration(kind) => {
3520                StateUpdateKind::SystemConfiguration(kind)
3521            }
3522            BootstrapStateUpdateKind::SourceReferences(kind) => {
3523                StateUpdateKind::SourceReferences(kind)
3524            }
3525            BootstrapStateUpdateKind::Cluster(kind) => StateUpdateKind::Cluster(kind),
3526            BootstrapStateUpdateKind::NetworkPolicy(kind) => StateUpdateKind::NetworkPolicy(kind),
3527            BootstrapStateUpdateKind::IntrospectionSourceIndex(kind) => {
3528                StateUpdateKind::IntrospectionSourceIndex(kind)
3529            }
3530            BootstrapStateUpdateKind::ClusterReplica(kind) => StateUpdateKind::ClusterReplica(kind),
3531            BootstrapStateUpdateKind::SystemObjectMapping(kind) => {
3532                StateUpdateKind::SystemObjectMapping(kind)
3533            }
3534            BootstrapStateUpdateKind::Item(kind) => StateUpdateKind::Item(kind),
3535            BootstrapStateUpdateKind::Comment(kind) => StateUpdateKind::Comment(kind),
3536            BootstrapStateUpdateKind::AuditLog(kind) => StateUpdateKind::AuditLog(kind),
3537            BootstrapStateUpdateKind::StorageCollectionMetadata(kind) => {
3538                StateUpdateKind::StorageCollectionMetadata(kind)
3539            }
3540            BootstrapStateUpdateKind::UnfinalizedShard(kind) => {
3541                StateUpdateKind::UnfinalizedShard(kind)
3542            }
3543        }
3544    }
3545}
3546
3547impl TryFrom<StateUpdateKind> for BootstrapStateUpdateKind {
3548    type Error = TemporaryItem;
3549
3550    fn try_from(value: StateUpdateKind) -> Result<Self, Self::Error> {
3551        match value {
3552            StateUpdateKind::Role(kind) => Ok(BootstrapStateUpdateKind::Role(kind)),
3553            StateUpdateKind::RoleAuth(kind) => Ok(BootstrapStateUpdateKind::RoleAuth(kind)),
3554            StateUpdateKind::Database(kind) => Ok(BootstrapStateUpdateKind::Database(kind)),
3555            StateUpdateKind::Schema(kind) => Ok(BootstrapStateUpdateKind::Schema(kind)),
3556            StateUpdateKind::DefaultPrivilege(kind) => {
3557                Ok(BootstrapStateUpdateKind::DefaultPrivilege(kind))
3558            }
3559            StateUpdateKind::SystemPrivilege(kind) => {
3560                Ok(BootstrapStateUpdateKind::SystemPrivilege(kind))
3561            }
3562            StateUpdateKind::SystemConfiguration(kind) => {
3563                Ok(BootstrapStateUpdateKind::SystemConfiguration(kind))
3564            }
3565            StateUpdateKind::Cluster(kind) => Ok(BootstrapStateUpdateKind::Cluster(kind)),
3566            StateUpdateKind::NetworkPolicy(kind) => {
3567                Ok(BootstrapStateUpdateKind::NetworkPolicy(kind))
3568            }
3569            StateUpdateKind::IntrospectionSourceIndex(kind) => {
3570                Ok(BootstrapStateUpdateKind::IntrospectionSourceIndex(kind))
3571            }
3572            StateUpdateKind::ClusterReplica(kind) => {
3573                Ok(BootstrapStateUpdateKind::ClusterReplica(kind))
3574            }
3575            StateUpdateKind::SourceReferences(kind) => {
3576                Ok(BootstrapStateUpdateKind::SourceReferences(kind))
3577            }
3578            StateUpdateKind::SystemObjectMapping(kind) => {
3579                Ok(BootstrapStateUpdateKind::SystemObjectMapping(kind))
3580            }
3581            StateUpdateKind::TemporaryItem(kind) => Err(kind),
3582            StateUpdateKind::Item(kind) => Ok(BootstrapStateUpdateKind::Item(kind)),
3583            StateUpdateKind::Comment(kind) => Ok(BootstrapStateUpdateKind::Comment(kind)),
3584            StateUpdateKind::AuditLog(kind) => Ok(BootstrapStateUpdateKind::AuditLog(kind)),
3585            StateUpdateKind::StorageCollectionMetadata(kind) => {
3586                Ok(BootstrapStateUpdateKind::StorageCollectionMetadata(kind))
3587            }
3588            StateUpdateKind::UnfinalizedShard(kind) => {
3589                Ok(BootstrapStateUpdateKind::UnfinalizedShard(kind))
3590            }
3591        }
3592    }
3593}