mz_catalog/memory/
objects.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The current types used by the in-memory Catalog. Many of the objects in this module are
11//! extremely similar to the objects found in [`crate::durable::objects`] but in a format that is
12//! easier consumed by higher layers.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, LazyLock};
18
19use chrono::{DateTime, Utc};
20use mz_adapter_types::compaction::CompactionWindow;
21use mz_adapter_types::connection::ConnectionId;
22use mz_compute_client::logging::LogVariant;
23use mz_controller::clusters::{ClusterRole, ClusterStatus, ReplicaConfig, ReplicaLogging};
24use mz_controller_types::{ClusterId, ReplicaId};
25use mz_expr::{MirScalarExpr, OptimizedMirRelationExpr};
26use mz_ore::collections::CollectionExt;
27use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
28use mz_repr::network_policy_id::NetworkPolicyId;
29use mz_repr::optimize::OptimizerFeatureOverrides;
30use mz_repr::refresh_schedule::RefreshSchedule;
31use mz_repr::role_id::RoleId;
32use mz_repr::{
33    CatalogItemId, ColumnName, Diff, GlobalId, RelationDesc, RelationVersion,
34    RelationVersionSelector, SqlColumnType, Timestamp, VersionedRelationDesc,
35};
36use mz_sql::ast::display::AstDisplay;
37use mz_sql::ast::{
38    ColumnDef, ColumnOption, ColumnOptionDef, ColumnVersioned, Expr, Raw, RawDataType, Statement,
39    UnresolvedItemName, Value, WithOptionValue,
40};
41use mz_sql::catalog::{
42    CatalogClusterReplica, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem,
43    CatalogItemType as SqlCatalogItemType, CatalogItemType, CatalogSchema, CatalogTypeDetails,
44    DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, RoleAttributes, RoleMembership,
45    RoleVars, SystemObjectType,
46};
47use mz_sql::names::{
48    Aug, CommentObjectId, DatabaseId, DependencyIds, FullItemName, QualifiedItemName,
49    QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier,
50};
51use mz_sql::plan::{
52    ClusterSchedule, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, ConnectionDetails,
53    CreateClusterManagedPlan, CreateClusterPlan, CreateClusterVariant, CreateSourcePlan,
54    HirRelationExpr, NetworkPolicyRule, PlanError, WebhookBodyFormat, WebhookHeaders,
55    WebhookValidation,
56};
57use mz_sql::rbac;
58use mz_sql::session::vars::OwnedVarInput;
59use mz_storage_client::controller::IntrospectionType;
60use mz_storage_types::connections::inline::ReferencedConnection;
61use mz_storage_types::sinks::{SinkEnvelope, StorageSinkConnection};
62use mz_storage_types::sources::load_generator::LoadGenerator;
63use mz_storage_types::sources::{
64    GenericSourceConnection, SourceConnection, SourceDesc, SourceEnvelope, SourceExportDataConfig,
65    SourceExportDetails, Timeline,
66};
67use serde::ser::SerializeSeq;
68use serde::{Deserialize, Serialize};
69use timely::progress::Antichain;
70use tracing::debug;
71
72use crate::builtin::{MZ_CATALOG_SERVER_CLUSTER, MZ_SYSTEM_CLUSTER};
73use crate::durable;
74
75/// Used to update `self` from the input value while consuming the input value.
76pub trait UpdateFrom<T>: From<T> {
77    fn update_from(&mut self, from: T);
78}
79
80#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
81pub struct Database {
82    pub name: String,
83    pub id: DatabaseId,
84    pub oid: u32,
85    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
86    pub schemas_by_id: BTreeMap<SchemaId, Schema>,
87    pub schemas_by_name: BTreeMap<String, SchemaId>,
88    pub owner_id: RoleId,
89    pub privileges: PrivilegeMap,
90}
91
92impl From<Database> for durable::Database {
93    fn from(database: Database) -> durable::Database {
94        durable::Database {
95            id: database.id,
96            oid: database.oid,
97            name: database.name,
98            owner_id: database.owner_id,
99            privileges: database.privileges.into_all_values().collect(),
100        }
101    }
102}
103
104impl From<durable::Database> for Database {
105    fn from(
106        durable::Database {
107            id,
108            oid,
109            name,
110            owner_id,
111            privileges,
112        }: durable::Database,
113    ) -> Database {
114        Database {
115            id,
116            oid,
117            schemas_by_id: BTreeMap::new(),
118            schemas_by_name: BTreeMap::new(),
119            name,
120            owner_id,
121            privileges: PrivilegeMap::from_mz_acl_items(privileges),
122        }
123    }
124}
125
126impl UpdateFrom<durable::Database> for Database {
127    fn update_from(
128        &mut self,
129        durable::Database {
130            id,
131            oid,
132            name,
133            owner_id,
134            privileges,
135        }: durable::Database,
136    ) {
137        self.id = id;
138        self.oid = oid;
139        self.name = name;
140        self.owner_id = owner_id;
141        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
142    }
143}
144
145#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
146pub struct Schema {
147    pub name: QualifiedSchemaName,
148    pub id: SchemaSpecifier,
149    pub oid: u32,
150    pub items: BTreeMap<String, CatalogItemId>,
151    pub functions: BTreeMap<String, CatalogItemId>,
152    pub types: BTreeMap<String, CatalogItemId>,
153    pub owner_id: RoleId,
154    pub privileges: PrivilegeMap,
155}
156
157impl From<Schema> for durable::Schema {
158    fn from(schema: Schema) -> durable::Schema {
159        durable::Schema {
160            id: schema.id.into(),
161            oid: schema.oid,
162            name: schema.name.schema,
163            database_id: schema.name.database.id(),
164            owner_id: schema.owner_id,
165            privileges: schema.privileges.into_all_values().collect(),
166        }
167    }
168}
169
170impl From<durable::Schema> for Schema {
171    fn from(
172        durable::Schema {
173            id,
174            oid,
175            name,
176            database_id,
177            owner_id,
178            privileges,
179        }: durable::Schema,
180    ) -> Schema {
181        Schema {
182            name: QualifiedSchemaName {
183                database: database_id.into(),
184                schema: name,
185            },
186            id: id.into(),
187            oid,
188            items: BTreeMap::new(),
189            functions: BTreeMap::new(),
190            types: BTreeMap::new(),
191            owner_id,
192            privileges: PrivilegeMap::from_mz_acl_items(privileges),
193        }
194    }
195}
196
197impl UpdateFrom<durable::Schema> for Schema {
198    fn update_from(
199        &mut self,
200        durable::Schema {
201            id,
202            oid,
203            name,
204            database_id,
205            owner_id,
206            privileges,
207        }: durable::Schema,
208    ) {
209        self.name = QualifiedSchemaName {
210            database: database_id.into(),
211            schema: name,
212        };
213        self.id = id.into();
214        self.oid = oid;
215        self.owner_id = owner_id;
216        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
217    }
218}
219
220#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
221pub struct Role {
222    pub name: String,
223    pub id: RoleId,
224    pub oid: u32,
225    pub attributes: RoleAttributes,
226    pub membership: RoleMembership,
227    pub vars: RoleVars,
228}
229
230impl Role {
231    pub fn is_user(&self) -> bool {
232        self.id.is_user()
233    }
234
235    pub fn vars<'a>(&'a self) -> impl Iterator<Item = (&'a str, &'a OwnedVarInput)> {
236        self.vars.map.iter().map(|(name, val)| (name.as_str(), val))
237    }
238}
239
240impl From<Role> for durable::Role {
241    fn from(role: Role) -> durable::Role {
242        durable::Role {
243            id: role.id,
244            oid: role.oid,
245            name: role.name,
246            attributes: role.attributes,
247            membership: role.membership,
248            vars: role.vars,
249        }
250    }
251}
252
253impl From<durable::Role> for Role {
254    fn from(
255        durable::Role {
256            id,
257            oid,
258            name,
259            attributes,
260            membership,
261            vars,
262        }: durable::Role,
263    ) -> Self {
264        Role {
265            name,
266            id,
267            oid,
268            attributes,
269            membership,
270            vars,
271        }
272    }
273}
274
275impl UpdateFrom<durable::Role> for Role {
276    fn update_from(
277        &mut self,
278        durable::Role {
279            id,
280            oid,
281            name,
282            attributes,
283            membership,
284            vars,
285        }: durable::Role,
286    ) {
287        self.id = id;
288        self.oid = oid;
289        self.name = name;
290        self.attributes = attributes;
291        self.membership = membership;
292        self.vars = vars;
293    }
294}
295
296#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
297pub struct RoleAuth {
298    pub role_id: RoleId,
299    pub password_hash: Option<String>,
300    pub updated_at: u64,
301}
302
303impl From<RoleAuth> for durable::RoleAuth {
304    fn from(role_auth: RoleAuth) -> durable::RoleAuth {
305        durable::RoleAuth {
306            role_id: role_auth.role_id,
307            password_hash: role_auth.password_hash,
308            updated_at: role_auth.updated_at,
309        }
310    }
311}
312
313impl From<durable::RoleAuth> for RoleAuth {
314    fn from(
315        durable::RoleAuth {
316            role_id,
317            password_hash,
318            updated_at,
319        }: durable::RoleAuth,
320    ) -> RoleAuth {
321        RoleAuth {
322            role_id,
323            password_hash,
324            updated_at,
325        }
326    }
327}
328
329impl UpdateFrom<durable::RoleAuth> for RoleAuth {
330    fn update_from(&mut self, from: durable::RoleAuth) {
331        self.role_id = from.role_id;
332        self.password_hash = from.password_hash;
333    }
334}
335
336#[derive(Debug, Serialize, Clone, PartialEq)]
337pub struct Cluster {
338    pub name: String,
339    pub id: ClusterId,
340    pub config: ClusterConfig,
341    #[serde(skip)]
342    pub log_indexes: BTreeMap<LogVariant, GlobalId>,
343    /// Objects bound to this cluster. Does not include introspection source
344    /// indexes.
345    pub bound_objects: BTreeSet<CatalogItemId>,
346    pub replica_id_by_name_: BTreeMap<String, ReplicaId>,
347    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
348    pub replicas_by_id_: BTreeMap<ReplicaId, ClusterReplica>,
349    pub owner_id: RoleId,
350    pub privileges: PrivilegeMap,
351}
352
353impl Cluster {
354    /// The role of the cluster. Currently used to set alert severity.
355    pub fn role(&self) -> ClusterRole {
356        // NOTE - These roles power monitoring systems. Do not change
357        // them without talking to the cloud or observability groups.
358        if self.name == MZ_SYSTEM_CLUSTER.name {
359            ClusterRole::SystemCritical
360        } else if self.name == MZ_CATALOG_SERVER_CLUSTER.name {
361            ClusterRole::System
362        } else {
363            ClusterRole::User
364        }
365    }
366
367    /// Returns `true` if the cluster is a managed cluster.
368    pub fn is_managed(&self) -> bool {
369        matches!(self.config.variant, ClusterVariant::Managed { .. })
370    }
371
372    /// Lists the user replicas, which are those that do not have the internal flag set.
373    pub fn user_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
374        self.replicas().filter(|r| !r.config.location.internal())
375    }
376
377    /// Lists all replicas in the cluster
378    pub fn replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
379        self.replicas_by_id_.values()
380    }
381
382    /// Lookup a replica by ID.
383    pub fn replica(&self, replica_id: ReplicaId) -> Option<&ClusterReplica> {
384        self.replicas_by_id_.get(&replica_id)
385    }
386
387    /// Lookup a replica ID by name.
388    pub fn replica_id(&self, name: &str) -> Option<ReplicaId> {
389        self.replica_id_by_name_.get(name).copied()
390    }
391
392    /// Returns the availability zones of this cluster, if they exist.
393    pub fn availability_zones(&self) -> Option<&[String]> {
394        match &self.config.variant {
395            ClusterVariant::Managed(managed) => Some(&managed.availability_zones),
396            ClusterVariant::Unmanaged => None,
397        }
398    }
399
400    pub fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
401        let name = self.name.clone();
402        let variant = match &self.config.variant {
403            ClusterVariant::Managed(ClusterVariantManaged {
404                size,
405                availability_zones,
406                logging,
407                replication_factor,
408                optimizer_feature_overrides,
409                schedule,
410            }) => {
411                let introspection = match logging {
412                    ReplicaLogging {
413                        log_logging,
414                        interval: Some(interval),
415                    } => Some(ComputeReplicaIntrospectionConfig {
416                        debugging: *log_logging,
417                        interval: interval.clone(),
418                    }),
419                    ReplicaLogging {
420                        log_logging: _,
421                        interval: None,
422                    } => None,
423                };
424                let compute = ComputeReplicaConfig { introspection };
425                CreateClusterVariant::Managed(CreateClusterManagedPlan {
426                    replication_factor: replication_factor.clone(),
427                    size: size.clone(),
428                    availability_zones: availability_zones.clone(),
429                    compute,
430                    optimizer_feature_overrides: optimizer_feature_overrides.clone(),
431                    schedule: schedule.clone(),
432                })
433            }
434            ClusterVariant::Unmanaged => {
435                // Unmanaged clusters are deprecated, so hopefully we can remove
436                // them before we have to implement this.
437                return Err(PlanError::Unsupported {
438                    feature: "SHOW CREATE for unmanaged clusters".to_string(),
439                    discussion_no: None,
440                });
441            }
442        };
443        let workload_class = self.config.workload_class.clone();
444        Ok(CreateClusterPlan {
445            name,
446            variant,
447            workload_class,
448        })
449    }
450}
451
452impl From<Cluster> for durable::Cluster {
453    fn from(cluster: Cluster) -> durable::Cluster {
454        durable::Cluster {
455            id: cluster.id,
456            name: cluster.name,
457            owner_id: cluster.owner_id,
458            privileges: cluster.privileges.into_all_values().collect(),
459            config: cluster.config.into(),
460        }
461    }
462}
463
464impl From<durable::Cluster> for Cluster {
465    fn from(
466        durable::Cluster {
467            id,
468            name,
469            owner_id,
470            privileges,
471            config,
472        }: durable::Cluster,
473    ) -> Self {
474        Cluster {
475            name: name.clone(),
476            id,
477            bound_objects: BTreeSet::new(),
478            log_indexes: BTreeMap::new(),
479            replica_id_by_name_: BTreeMap::new(),
480            replicas_by_id_: BTreeMap::new(),
481            owner_id,
482            privileges: PrivilegeMap::from_mz_acl_items(privileges),
483            config: config.into(),
484        }
485    }
486}
487
488impl UpdateFrom<durable::Cluster> for Cluster {
489    fn update_from(
490        &mut self,
491        durable::Cluster {
492            id,
493            name,
494            owner_id,
495            privileges,
496            config,
497        }: durable::Cluster,
498    ) {
499        self.id = id;
500        self.name = name;
501        self.owner_id = owner_id;
502        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
503        self.config = config.into();
504    }
505}
506
507#[derive(Debug, Serialize, Clone, PartialEq)]
508pub struct ClusterReplica {
509    pub name: String,
510    pub cluster_id: ClusterId,
511    pub replica_id: ReplicaId,
512    pub config: ReplicaConfig,
513    pub owner_id: RoleId,
514}
515
516impl From<ClusterReplica> for durable::ClusterReplica {
517    fn from(replica: ClusterReplica) -> durable::ClusterReplica {
518        durable::ClusterReplica {
519            cluster_id: replica.cluster_id,
520            replica_id: replica.replica_id,
521            name: replica.name,
522            config: replica.config.into(),
523            owner_id: replica.owner_id,
524        }
525    }
526}
527
528#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
529pub struct ClusterReplicaProcessStatus {
530    pub status: ClusterStatus,
531    pub time: DateTime<Utc>,
532}
533
534#[derive(Debug, Serialize, Clone, PartialEq)]
535pub struct SourceReferences {
536    pub updated_at: u64,
537    pub references: Vec<SourceReference>,
538}
539
540#[derive(Debug, Serialize, Clone, PartialEq)]
541pub struct SourceReference {
542    pub name: String,
543    pub namespace: Option<String>,
544    pub columns: Vec<String>,
545}
546
547impl From<SourceReference> for durable::SourceReference {
548    fn from(source_reference: SourceReference) -> durable::SourceReference {
549        durable::SourceReference {
550            name: source_reference.name,
551            namespace: source_reference.namespace,
552            columns: source_reference.columns,
553        }
554    }
555}
556
557impl SourceReferences {
558    pub fn to_durable(self, source_id: CatalogItemId) -> durable::SourceReferences {
559        durable::SourceReferences {
560            source_id,
561            updated_at: self.updated_at,
562            references: self.references.into_iter().map(Into::into).collect(),
563        }
564    }
565}
566
567impl From<durable::SourceReference> for SourceReference {
568    fn from(source_reference: durable::SourceReference) -> SourceReference {
569        SourceReference {
570            name: source_reference.name,
571            namespace: source_reference.namespace,
572            columns: source_reference.columns,
573        }
574    }
575}
576
577impl From<durable::SourceReferences> for SourceReferences {
578    fn from(source_references: durable::SourceReferences) -> SourceReferences {
579        SourceReferences {
580            updated_at: source_references.updated_at,
581            references: source_references
582                .references
583                .into_iter()
584                .map(|source_reference| source_reference.into())
585                .collect(),
586        }
587    }
588}
589
590impl From<mz_sql::plan::SourceReference> for SourceReference {
591    fn from(source_reference: mz_sql::plan::SourceReference) -> SourceReference {
592        SourceReference {
593            name: source_reference.name,
594            namespace: source_reference.namespace,
595            columns: source_reference.columns,
596        }
597    }
598}
599
600impl From<mz_sql::plan::SourceReferences> for SourceReferences {
601    fn from(source_references: mz_sql::plan::SourceReferences) -> SourceReferences {
602        SourceReferences {
603            updated_at: source_references.updated_at,
604            references: source_references
605                .references
606                .into_iter()
607                .map(|source_reference| source_reference.into())
608                .collect(),
609        }
610    }
611}
612
613impl From<SourceReferences> for mz_sql::plan::SourceReferences {
614    fn from(source_references: SourceReferences) -> mz_sql::plan::SourceReferences {
615        mz_sql::plan::SourceReferences {
616            updated_at: source_references.updated_at,
617            references: source_references
618                .references
619                .into_iter()
620                .map(|source_reference| source_reference.into())
621                .collect(),
622        }
623    }
624}
625
626impl From<SourceReference> for mz_sql::plan::SourceReference {
627    fn from(source_reference: SourceReference) -> mz_sql::plan::SourceReference {
628        mz_sql::plan::SourceReference {
629            name: source_reference.name,
630            namespace: source_reference.namespace,
631            columns: source_reference.columns,
632        }
633    }
634}
635
636#[derive(Clone, Debug, Serialize)]
637pub struct CatalogEntry {
638    pub item: CatalogItem,
639    #[serde(skip)]
640    pub referenced_by: Vec<CatalogItemId>,
641    // TODO(database-issues#7922)––this should have an invariant tied to it that all
642    // dependents (i.e. entries in this field) have IDs greater than this
643    // entry's ID.
644    #[serde(skip)]
645    pub used_by: Vec<CatalogItemId>,
646    pub id: CatalogItemId,
647    pub oid: u32,
648    pub name: QualifiedItemName,
649    pub owner_id: RoleId,
650    pub privileges: PrivilegeMap,
651}
652
653/// A [`CatalogEntry`] that is associated with a specific "collection" of data.
654/// A single item in the catalog may be associated with multiple "collections".
655///
656/// Here "collection" generally means a pTVC, e.g. a Persist Shard, an Index, a
657/// currently running dataflow, etc.
658///
659/// Items in the Catalog have a stable name -> ID mapping, in other words for
660/// the entire lifetime of an object its [`CatalogItemId`] will _never_ change.
661/// Similarly, we need to maintain a stable mapping from [`GlobalId`] to pTVC.
662/// This presents a challenge when `ALTER`-ing an object, e.g. adding columns
663/// to a table. We can't just change the schema of the underlying Persist Shard
664/// because that would be rebinding the [`GlobalId`] of the pTVC. Instead we
665/// allocate a new [`GlobalId`] to refer to the new version of the table, and
666/// then the [`CatalogEntry`] tracks the [`GlobalId`] for each version.
667///
668/// TODO(ct): Add a note here if we end up using this for associating continual
669/// tasks with a single catalog item.
670#[derive(Clone, Debug)]
671pub struct CatalogCollectionEntry {
672    pub entry: CatalogEntry,
673    pub version: RelationVersionSelector,
674}
675
676impl CatalogCollectionEntry {
677    pub fn desc(&self, name: &FullItemName) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
678        self.item().desc(name, self.version)
679    }
680
681    pub fn desc_opt(&self) -> Option<Cow<'_, RelationDesc>> {
682        self.item().desc_opt(self.version)
683    }
684}
685
686impl mz_sql::catalog::CatalogCollectionItem for CatalogCollectionEntry {
687    fn desc(&self, name: &FullItemName) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
688        CatalogCollectionEntry::desc(self, name)
689    }
690
691    fn global_id(&self) -> GlobalId {
692        match self.entry.item() {
693            CatalogItem::Source(source) => source.global_id,
694            CatalogItem::Log(log) => log.global_id,
695            CatalogItem::View(view) => view.global_id,
696            CatalogItem::MaterializedView(mv) => mv.global_id,
697            CatalogItem::Sink(sink) => sink.global_id,
698            CatalogItem::Index(index) => index.global_id,
699            CatalogItem::Type(ty) => ty.global_id,
700            CatalogItem::Func(func) => func.global_id,
701            CatalogItem::Secret(secret) => secret.global_id,
702            CatalogItem::Connection(conn) => conn.global_id,
703            CatalogItem::ContinualTask(ct) => ct.global_id,
704            CatalogItem::Table(table) => match self.version {
705                RelationVersionSelector::Latest => {
706                    let (_version, gid) = table
707                        .collections
708                        .last_key_value()
709                        .expect("at least one version");
710                    *gid
711                }
712                RelationVersionSelector::Specific(version) => table
713                    .collections
714                    .get(&version)
715                    .expect("catalog corruption, missing version!")
716                    .clone(),
717            },
718        }
719    }
720}
721
722impl Deref for CatalogCollectionEntry {
723    type Target = CatalogEntry;
724
725    fn deref(&self) -> &CatalogEntry {
726        &self.entry
727    }
728}
729
730impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry {
731    fn name(&self) -> &QualifiedItemName {
732        self.entry.name()
733    }
734
735    fn id(&self) -> CatalogItemId {
736        self.entry.id()
737    }
738
739    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
740        Box::new(self.entry.global_ids())
741    }
742
743    fn oid(&self) -> u32 {
744        self.entry.oid()
745    }
746
747    fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
748        self.entry.func()
749    }
750
751    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
752        self.entry.source_desc()
753    }
754
755    fn connection(
756        &self,
757    ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
758    {
759        mz_sql::catalog::CatalogItem::connection(&self.entry)
760    }
761
762    fn create_sql(&self) -> &str {
763        self.entry.create_sql()
764    }
765
766    fn item_type(&self) -> SqlCatalogItemType {
767        self.entry.item_type()
768    }
769
770    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
771        self.entry.index_details()
772    }
773
774    fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
775        self.entry.writable_table_details()
776    }
777
778    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
779        self.entry.type_details()
780    }
781
782    fn references(&self) -> &ResolvedIds {
783        self.entry.references()
784    }
785
786    fn uses(&self) -> BTreeSet<CatalogItemId> {
787        self.entry.uses()
788    }
789
790    fn referenced_by(&self) -> &[CatalogItemId] {
791        self.entry.referenced_by()
792    }
793
794    fn used_by(&self) -> &[CatalogItemId] {
795        self.entry.used_by()
796    }
797
798    fn subsource_details(
799        &self,
800    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
801        self.entry.subsource_details()
802    }
803
804    fn source_export_details(
805        &self,
806    ) -> Option<(
807        CatalogItemId,
808        &UnresolvedItemName,
809        &SourceExportDetails,
810        &SourceExportDataConfig<ReferencedConnection>,
811    )> {
812        self.entry.source_export_details()
813    }
814
815    fn is_progress_source(&self) -> bool {
816        self.entry.is_progress_source()
817    }
818
819    fn progress_id(&self) -> Option<CatalogItemId> {
820        self.entry.progress_id()
821    }
822
823    fn owner_id(&self) -> RoleId {
824        *self.entry.owner_id()
825    }
826
827    fn privileges(&self) -> &PrivilegeMap {
828        self.entry.privileges()
829    }
830
831    fn cluster_id(&self) -> Option<ClusterId> {
832        self.entry.item().cluster_id()
833    }
834
835    fn at_version(
836        &self,
837        version: RelationVersionSelector,
838    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
839        Box::new(CatalogCollectionEntry {
840            entry: self.entry.clone(),
841            version,
842        })
843    }
844
845    fn latest_version(&self) -> Option<RelationVersion> {
846        self.entry.latest_version()
847    }
848}
849
850#[derive(Debug, Clone, Serialize)]
851pub enum CatalogItem {
852    Table(Table),
853    Source(Source),
854    Log(Log),
855    View(View),
856    MaterializedView(MaterializedView),
857    Sink(Sink),
858    Index(Index),
859    Type(Type),
860    Func(Func),
861    Secret(Secret),
862    Connection(Connection),
863    ContinualTask(ContinualTask),
864}
865
866impl From<CatalogEntry> for durable::Item {
867    fn from(entry: CatalogEntry) -> durable::Item {
868        let (create_sql, global_id, extra_versions) = entry.item.into_serialized();
869        durable::Item {
870            id: entry.id,
871            oid: entry.oid,
872            global_id,
873            schema_id: entry.name.qualifiers.schema_spec.into(),
874            name: entry.name.item,
875            create_sql,
876            owner_id: entry.owner_id,
877            privileges: entry.privileges.into_all_values().collect(),
878            extra_versions,
879        }
880    }
881}
882
883#[derive(Debug, Clone, Serialize)]
884pub struct Table {
885    /// Parse-able SQL that defines this table.
886    pub create_sql: Option<String>,
887    /// [`VersionedRelationDesc`] of this table, derived from the `create_sql`.
888    pub desc: VersionedRelationDesc,
889    /// Versions of this table, and the [`GlobalId`]s that refer to them.
890    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
891    pub collections: BTreeMap<RelationVersion, GlobalId>,
892    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
893    #[serde(skip)]
894    pub conn_id: Option<ConnectionId>,
895    /// Other catalog objects referenced by this table, e.g. custom types.
896    pub resolved_ids: ResolvedIds,
897    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
898    pub custom_logical_compaction_window: Option<CompactionWindow>,
899    /// Whether the table's logical compaction window is controlled by the ['metrics_retention']
900    /// session variable.
901    ///
902    /// ['metrics_retention']: mz_sql::session::vars::METRICS_RETENTION
903    pub is_retained_metrics_object: bool,
904    /// Where data for this table comes from, e.g. `INSERT` statements or an upstream source.
905    pub data_source: TableDataSource,
906}
907
908impl Table {
909    pub fn timeline(&self) -> Timeline {
910        match &self.data_source {
911            // The Coordinator controls insertions for writable tables
912            // (including system tables), so they are realtime.
913            TableDataSource::TableWrites { .. } => Timeline::EpochMilliseconds,
914            TableDataSource::DataSource { timeline, .. } => timeline.clone(),
915        }
916    }
917
918    /// Returns all of the [`GlobalId`]s that this [`Table`] can be referenced by.
919    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
920        self.collections.values().copied()
921    }
922
923    /// Returns the latest [`GlobalId`] for this [`Table`] which should be used for writes.
924    pub fn global_id_writes(&self) -> GlobalId {
925        self.collections
926            .last_key_value()
927            .expect("at least one version of a table")
928            .1
929            .clone()
930    }
931
932    /// Returns all of the collections and their [`RelationDesc`]s associated with this [`Table`].
933    pub fn collection_descs(
934        &self,
935    ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
936        self.collections.iter().map(|(version, gid)| {
937            let desc = self
938                .desc
939                .at_version(RelationVersionSelector::Specific(*version));
940            (*gid, *version, desc)
941        })
942    }
943
944    /// Returns the [`RelationDesc`] for a specific [`GlobalId`].
945    pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
946        let (version, _gid) = self
947            .collections
948            .iter()
949            .find(|(_version, gid)| *gid == id)
950            .expect("GlobalId to exist");
951        self.desc
952            .at_version(RelationVersionSelector::Specific(*version))
953    }
954}
955
956#[derive(Clone, Debug, Serialize)]
957pub enum TableDataSource {
958    /// The table owns data created via INSERT/UPDATE/DELETE statements.
959    TableWrites {
960        #[serde(skip)]
961        defaults: Vec<Expr<Aug>>,
962    },
963
964    /// The table receives its data from the identified `DataSourceDesc`.
965    /// This table type does not support INSERT/UPDATE/DELETE statements.
966    DataSource {
967        desc: DataSourceDesc,
968        timeline: Timeline,
969    },
970}
971
972#[derive(Debug, Clone, Serialize)]
973pub enum DataSourceDesc {
974    /// Receives data from an external system
975    Ingestion {
976        desc: SourceDesc<ReferencedConnection>,
977        cluster_id: ClusterId,
978    },
979    /// Receives data from an external system
980    OldSyntaxIngestion {
981        desc: SourceDesc<ReferencedConnection>,
982        cluster_id: ClusterId,
983        // If we're dealing with an old syntax ingestion the progress id will be some other collection
984        // and the ingestion itself will have the data from an external reference
985        progress_subsource: CatalogItemId,
986        data_config: SourceExportDataConfig<ReferencedConnection>,
987        details: SourceExportDetails,
988    },
989    /// This source receives its data from the identified ingestion,
990    /// specifically the output identified by `external_reference`.
991    /// N.B. that `external_reference` should not be used to identify
992    /// anything downstream of purification, as the purification process
993    /// encodes source-specific identifiers into the `details` struct.
994    /// The `external_reference` field is only used here for displaying
995    /// human-readable names in system tables.
996    IngestionExport {
997        ingestion_id: CatalogItemId,
998        external_reference: UnresolvedItemName,
999        details: SourceExportDetails,
1000        data_config: SourceExportDataConfig<ReferencedConnection>,
1001    },
1002    /// Receives introspection data from an internal system
1003    Introspection(IntrospectionType),
1004    /// Receives data from the source's reclocking/remapping operations.
1005    Progress,
1006    /// Receives data from HTTP requests.
1007    Webhook {
1008        /// Optional components used to validation a webhook request.
1009        validate_using: Option<WebhookValidation>,
1010        /// Describes how we deserialize the body of a webhook request.
1011        body_format: WebhookBodyFormat,
1012        /// Describes whether or not to include headers and how to map them.
1013        headers: WebhookHeaders,
1014        /// The cluster which this source is associated with.
1015        cluster_id: ClusterId,
1016    },
1017}
1018
1019impl DataSourceDesc {
1020    /// The key and value formats of the data source.
1021    pub fn formats(&self) -> (Option<&str>, Option<&str>) {
1022        match &self {
1023            DataSourceDesc::Ingestion { .. } => (None, None),
1024            DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1025                match &data_config.encoding.as_ref() {
1026                    Some(encoding) => match &encoding.key {
1027                        Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1028                        None => (None, Some(encoding.value.type_())),
1029                    },
1030                    None => (None, None),
1031                }
1032            }
1033            DataSourceDesc::IngestionExport { data_config, .. } => match &data_config.encoding {
1034                Some(encoding) => match &encoding.key {
1035                    Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1036                    None => (None, Some(encoding.value.type_())),
1037                },
1038                None => (None, None),
1039            },
1040            DataSourceDesc::Introspection(_)
1041            | DataSourceDesc::Webhook { .. }
1042            | DataSourceDesc::Progress => (None, None),
1043        }
1044    }
1045
1046    /// Envelope of the data source.
1047    pub fn envelope(&self) -> Option<&str> {
1048        // Note how "none"/"append-only" is different from `None`. Source
1049        // sources don't have an envelope (internal logs, for example), while
1050        // other sources have an envelope that we call the "NONE"-envelope.
1051
1052        fn envelope_string(envelope: &SourceEnvelope) -> &str {
1053            match envelope {
1054                SourceEnvelope::None(_) => "none",
1055                SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
1056                    mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert",
1057                    mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
1058                        // NOTE(aljoscha): Should we somehow mark that this is
1059                        // using upsert internally? See note above about
1060                        // DEBEZIUM.
1061                        "debezium"
1062                    }
1063                    mz_storage_types::sources::envelope::UpsertStyle::ValueErrInline { .. } => {
1064                        "upsert-value-err-inline"
1065                    }
1066                },
1067                SourceEnvelope::CdcV2 => {
1068                    // TODO(aljoscha): Should we even report this? It's
1069                    // currently not exposed.
1070                    "materialize"
1071                }
1072            }
1073        }
1074
1075        match self {
1076            // NOTE(aljoscha): We could move the block for ingestions into
1077            // `SourceEnvelope` itself, but that one feels more like an internal
1078            // thing and adapter should own how we represent envelopes as a
1079            // string? It would not be hard to convince me otherwise, though.
1080            DataSourceDesc::Ingestion { .. } => None,
1081            DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1082                Some(envelope_string(&data_config.envelope))
1083            }
1084            DataSourceDesc::IngestionExport { data_config, .. } => {
1085                Some(envelope_string(&data_config.envelope))
1086            }
1087            DataSourceDesc::Introspection(_)
1088            | DataSourceDesc::Webhook { .. }
1089            | DataSourceDesc::Progress => None,
1090        }
1091    }
1092}
1093
1094#[derive(Debug, Clone, Serialize)]
1095pub struct Source {
1096    /// Parse-able SQL that defines this table.
1097    pub create_sql: Option<String>,
1098    /// [`GlobalId`] used to reference this source from outside the catalog.
1099    pub global_id: GlobalId,
1100    // TODO: Unskip: currently blocked on some inner BTreeMap<X, _> problems.
1101    #[serde(skip)]
1102    pub data_source: DataSourceDesc,
1103    /// [`RelationDesc`] of this source, derived from the `create_sql`.
1104    pub desc: RelationDesc,
1105    /// The timeline this source exists on.
1106    pub timeline: Timeline,
1107    /// Other catalog objects referenced by this table, e.g. custom types.
1108    pub resolved_ids: ResolvedIds,
1109    /// This value is ignored for subsources, i.e. for
1110    /// [`DataSourceDesc::IngestionExport`]. Instead, it uses the primary
1111    /// sources logical compaction window.
1112    pub custom_logical_compaction_window: Option<CompactionWindow>,
1113    /// Whether the source's logical compaction window is controlled by
1114    /// METRICS_RETENTION
1115    pub is_retained_metrics_object: bool,
1116}
1117
1118impl Source {
1119    /// Creates a new `Source`.
1120    ///
1121    /// # Panics
1122    /// - If an ingestion-based plan is not given a cluster_id.
1123    /// - If a non-ingestion-based source has a defined cluster config in its plan.
1124    /// - If a non-ingestion-based source is given a cluster_id.
1125    pub fn new(
1126        plan: CreateSourcePlan,
1127        global_id: GlobalId,
1128        resolved_ids: ResolvedIds,
1129        custom_logical_compaction_window: Option<CompactionWindow>,
1130        is_retained_metrics_object: bool,
1131    ) -> Source {
1132        Source {
1133            create_sql: Some(plan.source.create_sql),
1134            data_source: match plan.source.data_source {
1135                mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1136                    desc,
1137                    cluster_id: plan
1138                        .in_cluster
1139                        .expect("ingestion-based sources must be given a cluster ID"),
1140                },
1141                mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1142                    desc,
1143                    progress_subsource,
1144                    data_config,
1145                    details,
1146                } => DataSourceDesc::OldSyntaxIngestion {
1147                    desc,
1148                    cluster_id: plan
1149                        .in_cluster
1150                        .expect("ingestion-based sources must be given a cluster ID"),
1151                    progress_subsource,
1152                    data_config,
1153                    details,
1154                },
1155                mz_sql::plan::DataSourceDesc::Progress => {
1156                    assert!(
1157                        plan.in_cluster.is_none(),
1158                        "subsources must not have a host config or cluster_id defined"
1159                    );
1160                    DataSourceDesc::Progress
1161                }
1162                mz_sql::plan::DataSourceDesc::IngestionExport {
1163                    ingestion_id,
1164                    external_reference,
1165                    details,
1166                    data_config,
1167                } => {
1168                    assert!(
1169                        plan.in_cluster.is_none(),
1170                        "subsources must not have a host config or cluster_id defined"
1171                    );
1172                    DataSourceDesc::IngestionExport {
1173                        ingestion_id,
1174                        external_reference,
1175                        details,
1176                        data_config,
1177                    }
1178                }
1179                mz_sql::plan::DataSourceDesc::Webhook {
1180                    validate_using,
1181                    body_format,
1182                    headers,
1183                    cluster_id,
1184                } => {
1185                    mz_ore::soft_assert_or_log!(
1186                        cluster_id.is_none(),
1187                        "cluster_id set at Source level for Webhooks"
1188                    );
1189                    DataSourceDesc::Webhook {
1190                        validate_using,
1191                        body_format,
1192                        headers,
1193                        cluster_id: plan
1194                            .in_cluster
1195                            .expect("webhook sources must be given a cluster ID"),
1196                    }
1197                }
1198            },
1199            desc: plan.source.desc,
1200            global_id,
1201            timeline: plan.timeline,
1202            resolved_ids,
1203            custom_logical_compaction_window: plan
1204                .source
1205                .compaction_window
1206                .or(custom_logical_compaction_window),
1207            is_retained_metrics_object,
1208        }
1209    }
1210
1211    /// Type of the source.
1212    pub fn source_type(&self) -> &str {
1213        match &self.data_source {
1214            DataSourceDesc::Ingestion { desc, .. }
1215            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.name(),
1216            DataSourceDesc::Progress => "progress",
1217            DataSourceDesc::IngestionExport { .. } => "subsource",
1218            DataSourceDesc::Introspection(_) => "source",
1219            DataSourceDesc::Webhook { .. } => "webhook",
1220        }
1221    }
1222
1223    /// Connection ID of the source, if one exists.
1224    pub fn connection_id(&self) -> Option<CatalogItemId> {
1225        match &self.data_source {
1226            DataSourceDesc::Ingestion { desc, .. }
1227            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.connection_id(),
1228            DataSourceDesc::IngestionExport { .. }
1229            | DataSourceDesc::Introspection(_)
1230            | DataSourceDesc::Webhook { .. }
1231            | DataSourceDesc::Progress => None,
1232        }
1233    }
1234
1235    /// The single [`GlobalId`] that refers to this Source.
1236    pub fn global_id(&self) -> GlobalId {
1237        self.global_id
1238    }
1239
1240    /// The expensive resource that each source consumes is persist shards. To
1241    /// prevent abuse, we want to prevent users from creating sources that use an
1242    /// unbounded number of persist shards. But we also don't want to count
1243    /// persist shards that are mandated by teh system (e.g., the progress
1244    /// shard) so that future versions of Materialize can introduce additional
1245    /// per-source shards (e.g., a per-source status shard) without impacting
1246    /// the limit calculation.
1247    pub fn user_controllable_persist_shard_count(&self) -> i64 {
1248        match &self.data_source {
1249            DataSourceDesc::Ingestion { .. } => 0,
1250            DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1251                match &desc.connection {
1252                    // These multi-output sources do not use their primary
1253                    // source's data shard, so we don't include it in accounting
1254                    // for users.
1255                    GenericSourceConnection::Postgres(_)
1256                    | GenericSourceConnection::MySql(_)
1257                    | GenericSourceConnection::SqlServer(_) => 0,
1258                    GenericSourceConnection::LoadGenerator(lg) => match lg.load_generator {
1259                        // Load generators that output data in their primary shard
1260                        LoadGenerator::Clock
1261                        | LoadGenerator::Counter { .. }
1262                        | LoadGenerator::Datums
1263                        | LoadGenerator::KeyValue(_) => 1,
1264                        LoadGenerator::Auction
1265                        | LoadGenerator::Marketing
1266                        | LoadGenerator::Tpch { .. } => 0,
1267                    },
1268                    GenericSourceConnection::Kafka(_) => 1,
1269                }
1270            }
1271            //  DataSourceDesc::IngestionExport represents a subsource, which
1272            //  use a data shard.
1273            DataSourceDesc::IngestionExport { .. } => 1,
1274            DataSourceDesc::Webhook { .. } => 1,
1275            // Introspection and progress subsources are not under the user's control, so shouldn't
1276            // count toward their quota.
1277            DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => 0,
1278        }
1279    }
1280}
1281
1282#[derive(Debug, Clone, Serialize)]
1283pub struct Log {
1284    /// The category of data this log stores.
1285    pub variant: LogVariant,
1286    /// [`GlobalId`] used to reference this log from outside the catalog.
1287    pub global_id: GlobalId,
1288}
1289
1290impl Log {
1291    /// The single [`GlobalId`] that refers to this Log.
1292    pub fn global_id(&self) -> GlobalId {
1293        self.global_id
1294    }
1295}
1296
1297#[derive(Debug, Clone, Serialize)]
1298pub struct Sink {
1299    /// Parse-able SQL that defines this sink.
1300    pub create_sql: String,
1301    /// [`GlobalId`] used to reference this sink from outside the catalog, e.g storage.
1302    pub global_id: GlobalId,
1303    /// Collection we read into this sink.
1304    pub from: GlobalId,
1305    /// Connection to the external service we're sinking into, e.g. Kafka.
1306    pub connection: StorageSinkConnection<ReferencedConnection>,
1307    /// Envelope we use to sink into the external system.
1308    ///
1309    /// TODO(guswynn): this probably should just be in the `connection`.
1310    pub envelope: SinkEnvelope,
1311    /// Emit an initial snapshot into the sink.
1312    pub with_snapshot: bool,
1313    /// Used to fence other writes into this sink as we evolve the upstream materialized view.
1314    pub version: u64,
1315    /// Other catalog objects this sink references.
1316    pub resolved_ids: ResolvedIds,
1317    /// Cluster this sink runs on.
1318    pub cluster_id: ClusterId,
1319}
1320
1321impl Sink {
1322    pub fn sink_type(&self) -> &str {
1323        self.connection.name()
1324    }
1325
1326    /// Envelope of the sink.
1327    pub fn envelope(&self) -> Option<&str> {
1328        match &self.envelope {
1329            SinkEnvelope::Debezium => Some("debezium"),
1330            SinkEnvelope::Upsert => Some("upsert"),
1331        }
1332    }
1333
1334    /// Output a combined format string of the sink. For legacy reasons
1335    /// if the key-format is none or the key & value formats are
1336    /// both the same (either avro or json), we return the value format name,
1337    /// otherwise we return a composite name.
1338    pub fn combined_format(&self) -> Option<Cow<'_, str>> {
1339        match &self.connection {
1340            StorageSinkConnection::Kafka(connection) => Some(connection.format.get_format_name()),
1341            _ => None,
1342        }
1343    }
1344
1345    /// Output distinct key_format and value_format of the sink.
1346    pub fn formats(&self) -> Option<(Option<&str>, &str)> {
1347        match &self.connection {
1348            StorageSinkConnection::Kafka(connection) => {
1349                let key_format = connection
1350                    .format
1351                    .key_format
1352                    .as_ref()
1353                    .map(|f| f.get_format_name());
1354                let value_format = connection.format.value_format.get_format_name();
1355                Some((key_format, value_format))
1356            }
1357            _ => None,
1358        }
1359    }
1360
1361    pub fn connection_id(&self) -> Option<CatalogItemId> {
1362        self.connection.connection_id()
1363    }
1364
1365    /// The single [`GlobalId`] that this Sink can be referenced by.
1366    pub fn global_id(&self) -> GlobalId {
1367        self.global_id
1368    }
1369}
1370
1371#[derive(Debug, Clone, Serialize)]
1372pub struct View {
1373    /// Parse-able SQL that defines this view.
1374    pub create_sql: String,
1375    /// [`GlobalId`] used to reference this view from outside the catalog, e.g. compute.
1376    pub global_id: GlobalId,
1377    /// Unoptimized high-level expression from parsing the `create_sql`.
1378    pub raw_expr: Arc<HirRelationExpr>,
1379    /// Optimized mid-level expression from (locally) optimizing the `raw_expr`.
1380    pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1381    /// Columns of this view.
1382    pub desc: RelationDesc,
1383    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
1384    pub conn_id: Option<ConnectionId>,
1385    /// Other catalog objects that are referenced by this view, determined at name resolution.
1386    pub resolved_ids: ResolvedIds,
1387    /// All of the catalog objects that are referenced by this view.
1388    pub dependencies: DependencyIds,
1389}
1390
1391impl View {
1392    /// The single [`GlobalId`] this [`View`] can be referenced by.
1393    pub fn global_id(&self) -> GlobalId {
1394        self.global_id
1395    }
1396}
1397
1398#[derive(Debug, Clone, Serialize)]
1399pub struct MaterializedView {
1400    /// Parse-able SQL that defines this materialized view.
1401    pub create_sql: String,
1402    /// [`GlobalId`] used to reference this materialized view from outside the catalog.
1403    pub global_id: GlobalId,
1404    /// Raw high-level expression from planning, derived from the `create_sql`.
1405    pub raw_expr: Arc<HirRelationExpr>,
1406    /// Optimized mid-level expression, derived from the `raw_expr`.
1407    pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1408    /// Columns for this materialized view.
1409    pub desc: RelationDesc,
1410    /// Other catalog items that this materialized view references, determined at name resolution.
1411    pub resolved_ids: ResolvedIds,
1412    /// All of the catalog objects that are referenced by this view.
1413    pub dependencies: DependencyIds,
1414    /// Cluster that this materialized view runs on.
1415    pub cluster_id: ClusterId,
1416    /// Column indexes that we assert are not `NULL`.
1417    ///
1418    /// TODO(parkmycar): Switch this to use the `ColumnIdx` type.
1419    pub non_null_assertions: Vec<usize>,
1420    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
1421    pub custom_logical_compaction_window: Option<CompactionWindow>,
1422    /// Schedule to refresh this materialized view, e.g. set via `REFRESH EVERY` option.
1423    pub refresh_schedule: Option<RefreshSchedule>,
1424    /// The initial `as_of` of the storage collection associated with the materialized view.
1425    ///
1426    /// Note: This doesn't change upon restarts.
1427    /// (The dataflow's initial `as_of` can be different.)
1428    pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1429}
1430
1431impl MaterializedView {
1432    /// The single [`GlobalId`] this [`MaterializedView`] can be referenced by.
1433    pub fn global_id(&self) -> GlobalId {
1434        self.global_id
1435    }
1436}
1437
1438#[derive(Debug, Clone, Serialize)]
1439pub struct Index {
1440    /// Parse-able SQL that defines this table.
1441    pub create_sql: String,
1442    /// [`GlobalId`] used to reference this index from outside the catalog, e.g. compute.
1443    pub global_id: GlobalId,
1444    /// The [`GlobalId`] this Index is on.
1445    pub on: GlobalId,
1446    /// Keys of the index.
1447    pub keys: Arc<[MirScalarExpr]>,
1448    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
1449    pub conn_id: Option<ConnectionId>,
1450    /// Other catalog objects referenced by this index, e.g. the object we're indexing.
1451    pub resolved_ids: ResolvedIds,
1452    /// Cluster this index is installed on.
1453    pub cluster_id: ClusterId,
1454    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
1455    pub custom_logical_compaction_window: Option<CompactionWindow>,
1456    /// Whether the table's logical compaction window is controlled by the ['metrics_retention']
1457    /// session variable.
1458    ///
1459    /// ['metrics_retention']: mz_sql::session::vars::METRICS_RETENTION
1460    pub is_retained_metrics_object: bool,
1461}
1462
1463impl Index {
1464    /// The [`GlobalId`] that refers to this Index.
1465    pub fn global_id(&self) -> GlobalId {
1466        self.global_id
1467    }
1468}
1469
1470#[derive(Debug, Clone, Serialize)]
1471pub struct Type {
1472    /// Parse-able SQL that defines this type.
1473    pub create_sql: Option<String>,
1474    /// [`GlobalId`] used to reference this type from outside the catalog.
1475    pub global_id: GlobalId,
1476    #[serde(skip)]
1477    pub details: CatalogTypeDetails<IdReference>,
1478    pub desc: Option<RelationDesc>,
1479    /// Other catalog objects referenced by this type.
1480    pub resolved_ids: ResolvedIds,
1481}
1482
1483#[derive(Debug, Clone, Serialize)]
1484pub struct Func {
1485    /// Static definition of the function.
1486    #[serde(skip)]
1487    pub inner: &'static mz_sql::func::Func,
1488    /// [`GlobalId`] used to reference this function from outside the catalog.
1489    pub global_id: GlobalId,
1490}
1491
1492#[derive(Debug, Clone, Serialize)]
1493pub struct Secret {
1494    /// Parse-able SQL that defines this secret.
1495    pub create_sql: String,
1496    /// [`GlobalId`] used to reference this secret from outside the catalog.
1497    pub global_id: GlobalId,
1498}
1499
1500#[derive(Debug, Clone, Serialize)]
1501pub struct Connection {
1502    /// Parse-able SQL that defines this connection.
1503    pub create_sql: String,
1504    /// [`GlobalId`] used to reference this connection from the storage layer.
1505    pub global_id: GlobalId,
1506    /// The kind of connection.
1507    pub details: ConnectionDetails,
1508    /// Other objects this connection depends on.
1509    pub resolved_ids: ResolvedIds,
1510}
1511
1512impl Connection {
1513    /// The single [`GlobalId`] used to reference this connection.
1514    pub fn global_id(&self) -> GlobalId {
1515        self.global_id
1516    }
1517}
1518
1519#[derive(Debug, Clone, Serialize)]
1520pub struct ContinualTask {
1521    /// Parse-able SQL that defines this continual task.
1522    pub create_sql: String,
1523    /// [`GlobalId`] used to reference this continual task from outside the catalog.
1524    pub global_id: GlobalId,
1525    /// [`GlobalId`] of the collection that we read into this continual task.
1526    pub input_id: GlobalId,
1527    pub with_snapshot: bool,
1528    /// ContinualTasks are self-referential. We make this work by using a
1529    /// placeholder `LocalId` for the CT itself through name resolution and
1530    /// planning. Then we fill in the real `GlobalId` before constructing this
1531    /// catalog item.
1532    pub raw_expr: Arc<HirRelationExpr>,
1533    /// Columns for this continual task.
1534    pub desc: RelationDesc,
1535    /// Other catalog items that this continual task references, determined at name resolution.
1536    pub resolved_ids: ResolvedIds,
1537    /// All of the catalog objects that are referenced by this continual task.
1538    pub dependencies: DependencyIds,
1539    /// Cluster that this continual task runs on.
1540    pub cluster_id: ClusterId,
1541    /// See the comment on [MaterializedView::initial_as_of].
1542    pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1543}
1544
1545impl ContinualTask {
1546    /// The single [`GlobalId`] used to reference this continual task.
1547    pub fn global_id(&self) -> GlobalId {
1548        self.global_id
1549    }
1550}
1551
1552#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
1553pub struct NetworkPolicy {
1554    pub name: String,
1555    pub id: NetworkPolicyId,
1556    pub oid: u32,
1557    pub rules: Vec<NetworkPolicyRule>,
1558    pub owner_id: RoleId,
1559    pub privileges: PrivilegeMap,
1560}
1561
1562impl From<NetworkPolicy> for durable::NetworkPolicy {
1563    fn from(policy: NetworkPolicy) -> durable::NetworkPolicy {
1564        durable::NetworkPolicy {
1565            id: policy.id,
1566            oid: policy.oid,
1567            name: policy.name,
1568            rules: policy.rules,
1569            owner_id: policy.owner_id,
1570            privileges: policy.privileges.into_all_values().collect(),
1571        }
1572    }
1573}
1574
1575impl From<durable::NetworkPolicy> for NetworkPolicy {
1576    fn from(
1577        durable::NetworkPolicy {
1578            id,
1579            oid,
1580            name,
1581            rules,
1582            owner_id,
1583            privileges,
1584        }: durable::NetworkPolicy,
1585    ) -> Self {
1586        NetworkPolicy {
1587            id,
1588            oid,
1589            name,
1590            rules,
1591            owner_id,
1592            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1593        }
1594    }
1595}
1596
1597impl UpdateFrom<durable::NetworkPolicy> for NetworkPolicy {
1598    fn update_from(
1599        &mut self,
1600        durable::NetworkPolicy {
1601            id,
1602            oid,
1603            name,
1604            rules,
1605            owner_id,
1606            privileges,
1607        }: durable::NetworkPolicy,
1608    ) {
1609        self.id = id;
1610        self.oid = oid;
1611        self.name = name;
1612        self.rules = rules;
1613        self.owner_id = owner_id;
1614        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1615    }
1616}
1617
1618impl CatalogItem {
1619    /// Returns a string indicating the type of this catalog entry.
1620    pub fn typ(&self) -> mz_sql::catalog::CatalogItemType {
1621        match self {
1622            CatalogItem::Table(_) => mz_sql::catalog::CatalogItemType::Table,
1623            CatalogItem::Source(_) => mz_sql::catalog::CatalogItemType::Source,
1624            CatalogItem::Log(_) => mz_sql::catalog::CatalogItemType::Source,
1625            CatalogItem::Sink(_) => mz_sql::catalog::CatalogItemType::Sink,
1626            CatalogItem::View(_) => mz_sql::catalog::CatalogItemType::View,
1627            CatalogItem::MaterializedView(_) => mz_sql::catalog::CatalogItemType::MaterializedView,
1628            CatalogItem::Index(_) => mz_sql::catalog::CatalogItemType::Index,
1629            CatalogItem::Type(_) => mz_sql::catalog::CatalogItemType::Type,
1630            CatalogItem::Func(_) => mz_sql::catalog::CatalogItemType::Func,
1631            CatalogItem::Secret(_) => mz_sql::catalog::CatalogItemType::Secret,
1632            CatalogItem::Connection(_) => mz_sql::catalog::CatalogItemType::Connection,
1633            CatalogItem::ContinualTask(_) => mz_sql::catalog::CatalogItemType::ContinualTask,
1634        }
1635    }
1636
1637    /// Returns the [`GlobalId`]s that reference this item, if any.
1638    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1639        let gid = match self {
1640            CatalogItem::Source(source) => source.global_id,
1641            CatalogItem::Log(log) => log.global_id,
1642            CatalogItem::Sink(sink) => sink.global_id,
1643            CatalogItem::View(view) => view.global_id,
1644            CatalogItem::MaterializedView(mv) => mv.global_id,
1645            CatalogItem::ContinualTask(ct) => ct.global_id,
1646            CatalogItem::Index(index) => index.global_id,
1647            CatalogItem::Func(func) => func.global_id,
1648            CatalogItem::Type(ty) => ty.global_id,
1649            CatalogItem::Secret(secret) => secret.global_id,
1650            CatalogItem::Connection(conn) => conn.global_id,
1651            CatalogItem::Table(table) => {
1652                return itertools::Either::Left(table.collections.values().copied());
1653            }
1654        };
1655        itertools::Either::Right(std::iter::once(gid))
1656    }
1657
1658    /// Returns the most up-to-date [`GlobalId`] for this item.
1659    ///
1660    /// Note: The only type of object that can have multiple [`GlobalId`]s are tables.
1661    pub fn latest_global_id(&self) -> GlobalId {
1662        match self {
1663            CatalogItem::Source(source) => source.global_id,
1664            CatalogItem::Log(log) => log.global_id,
1665            CatalogItem::Sink(sink) => sink.global_id,
1666            CatalogItem::View(view) => view.global_id,
1667            CatalogItem::MaterializedView(mv) => mv.global_id,
1668            CatalogItem::ContinualTask(ct) => ct.global_id,
1669            CatalogItem::Index(index) => index.global_id,
1670            CatalogItem::Func(func) => func.global_id,
1671            CatalogItem::Type(ty) => ty.global_id,
1672            CatalogItem::Secret(secret) => secret.global_id,
1673            CatalogItem::Connection(conn) => conn.global_id,
1674            CatalogItem::Table(table) => table.global_id_writes(),
1675        }
1676    }
1677
1678    /// Whether this item represents a storage collection.
1679    pub fn is_storage_collection(&self) -> bool {
1680        match self {
1681            CatalogItem::Table(_)
1682            | CatalogItem::Source(_)
1683            | CatalogItem::MaterializedView(_)
1684            | CatalogItem::Sink(_)
1685            | CatalogItem::ContinualTask(_) => true,
1686            CatalogItem::Log(_)
1687            | CatalogItem::View(_)
1688            | CatalogItem::Index(_)
1689            | CatalogItem::Type(_)
1690            | CatalogItem::Func(_)
1691            | CatalogItem::Secret(_)
1692            | CatalogItem::Connection(_) => false,
1693        }
1694    }
1695
1696    pub fn desc(
1697        &self,
1698        name: &FullItemName,
1699        version: RelationVersionSelector,
1700    ) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
1701        self.desc_opt(version)
1702            .ok_or_else(|| SqlCatalogError::InvalidDependency {
1703                name: name.to_string(),
1704                typ: self.typ(),
1705            })
1706    }
1707
1708    pub fn desc_opt(&self, version: RelationVersionSelector) -> Option<Cow<'_, RelationDesc>> {
1709        match &self {
1710            CatalogItem::Source(src) => Some(Cow::Borrowed(&src.desc)),
1711            CatalogItem::Log(log) => Some(Cow::Owned(log.variant.desc())),
1712            CatalogItem::Table(tbl) => Some(Cow::Owned(tbl.desc.at_version(version))),
1713            CatalogItem::View(view) => Some(Cow::Borrowed(&view.desc)),
1714            CatalogItem::MaterializedView(mview) => Some(Cow::Borrowed(&mview.desc)),
1715            CatalogItem::Type(typ) => typ.desc.as_ref().map(Cow::Borrowed),
1716            CatalogItem::ContinualTask(ct) => Some(Cow::Borrowed(&ct.desc)),
1717            CatalogItem::Func(_)
1718            | CatalogItem::Index(_)
1719            | CatalogItem::Sink(_)
1720            | CatalogItem::Secret(_)
1721            | CatalogItem::Connection(_) => None,
1722        }
1723    }
1724
1725    pub fn func(
1726        &self,
1727        entry: &CatalogEntry,
1728    ) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
1729        match &self {
1730            CatalogItem::Func(func) => Ok(func.inner),
1731            _ => Err(SqlCatalogError::UnexpectedType {
1732                name: entry.name().item.to_string(),
1733                actual_type: entry.item_type(),
1734                expected_type: CatalogItemType::Func,
1735            }),
1736        }
1737    }
1738
1739    pub fn source_desc(
1740        &self,
1741        entry: &CatalogEntry,
1742    ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
1743        match &self {
1744            CatalogItem::Source(source) => match &source.data_source {
1745                DataSourceDesc::Ingestion { desc, .. }
1746                | DataSourceDesc::OldSyntaxIngestion { desc, .. } => Ok(Some(desc)),
1747                DataSourceDesc::IngestionExport { .. }
1748                | DataSourceDesc::Introspection(_)
1749                | DataSourceDesc::Webhook { .. }
1750                | DataSourceDesc::Progress => Ok(None),
1751            },
1752            _ => Err(SqlCatalogError::UnexpectedType {
1753                name: entry.name().item.to_string(),
1754                actual_type: entry.item_type(),
1755                expected_type: CatalogItemType::Source,
1756            }),
1757        }
1758    }
1759
1760    /// Reports whether this catalog entry is a progress source.
1761    pub fn is_progress_source(&self) -> bool {
1762        matches!(
1763            self,
1764            CatalogItem::Source(Source {
1765                data_source: DataSourceDesc::Progress,
1766                ..
1767            })
1768        )
1769    }
1770
1771    /// Collects the identifiers of the objects that were encountered when resolving names in the
1772    /// item's DDL statement.
1773    pub fn references(&self) -> &ResolvedIds {
1774        static EMPTY: LazyLock<ResolvedIds> = LazyLock::new(ResolvedIds::empty);
1775        match self {
1776            CatalogItem::Func(_) => &*EMPTY,
1777            CatalogItem::Index(idx) => &idx.resolved_ids,
1778            CatalogItem::Sink(sink) => &sink.resolved_ids,
1779            CatalogItem::Source(source) => &source.resolved_ids,
1780            CatalogItem::Log(_) => &*EMPTY,
1781            CatalogItem::Table(table) => &table.resolved_ids,
1782            CatalogItem::Type(typ) => &typ.resolved_ids,
1783            CatalogItem::View(view) => &view.resolved_ids,
1784            CatalogItem::MaterializedView(mview) => &mview.resolved_ids,
1785            CatalogItem::Secret(_) => &*EMPTY,
1786            CatalogItem::Connection(connection) => &connection.resolved_ids,
1787            CatalogItem::ContinualTask(ct) => &ct.resolved_ids,
1788        }
1789    }
1790
1791    /// Collects the identifiers of the objects used by this [`CatalogItem`].
1792    ///
1793    /// Like [`CatalogItem::references()`] but also includes objects that are not directly
1794    /// referenced. For example this will include any catalog objects used to implement functions
1795    /// and casts in the item.
1796    pub fn uses(&self) -> BTreeSet<CatalogItemId> {
1797        let mut uses: BTreeSet<_> = self.references().items().copied().collect();
1798        match self {
1799            // TODO(jkosh44) This isn't really correct for functions. They may use other objects in
1800            // their implementation. However, currently there's no way to get that information.
1801            CatalogItem::Func(_) => {}
1802            CatalogItem::Index(_) => {}
1803            CatalogItem::Sink(_) => {}
1804            CatalogItem::Source(_) => {}
1805            CatalogItem::Log(_) => {}
1806            CatalogItem::Table(_) => {}
1807            CatalogItem::Type(_) => {}
1808            CatalogItem::View(view) => uses.extend(view.dependencies.0.iter().copied()),
1809            CatalogItem::MaterializedView(mview) => {
1810                uses.extend(mview.dependencies.0.iter().copied())
1811            }
1812            CatalogItem::ContinualTask(ct) => uses.extend(ct.dependencies.0.iter().copied()),
1813            CatalogItem::Secret(_) => {}
1814            CatalogItem::Connection(_) => {}
1815        }
1816        uses
1817    }
1818
1819    /// Returns the connection ID that this item belongs to, if this item is
1820    /// temporary.
1821    pub fn conn_id(&self) -> Option<&ConnectionId> {
1822        match self {
1823            CatalogItem::View(view) => view.conn_id.as_ref(),
1824            CatalogItem::Index(index) => index.conn_id.as_ref(),
1825            CatalogItem::Table(table) => table.conn_id.as_ref(),
1826            CatalogItem::Log(_)
1827            | CatalogItem::Source(_)
1828            | CatalogItem::Sink(_)
1829            | CatalogItem::MaterializedView(_)
1830            | CatalogItem::Secret(_)
1831            | CatalogItem::Type(_)
1832            | CatalogItem::Func(_)
1833            | CatalogItem::Connection(_)
1834            | CatalogItem::ContinualTask(_) => None,
1835        }
1836    }
1837
1838    /// Indicates whether this item is temporary or not.
1839    pub fn is_temporary(&self) -> bool {
1840        self.conn_id().is_some()
1841    }
1842
1843    pub fn rename_schema_refs(
1844        &self,
1845        database_name: &str,
1846        cur_schema_name: &str,
1847        new_schema_name: &str,
1848    ) -> Result<CatalogItem, (String, String)> {
1849        let do_rewrite = |create_sql: String| -> Result<String, (String, String)> {
1850            let mut create_stmt = mz_sql::parse::parse(&create_sql)
1851                .expect("invalid create sql persisted to catalog")
1852                .into_element()
1853                .ast;
1854
1855            // Rename all references to cur_schema_name.
1856            mz_sql::ast::transform::create_stmt_rename_schema_refs(
1857                &mut create_stmt,
1858                database_name,
1859                cur_schema_name,
1860                new_schema_name,
1861            )?;
1862
1863            Ok(create_stmt.to_ast_string_stable())
1864        };
1865
1866        match self {
1867            CatalogItem::Table(i) => {
1868                let mut i = i.clone();
1869                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1870                Ok(CatalogItem::Table(i))
1871            }
1872            CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
1873            CatalogItem::Source(i) => {
1874                let mut i = i.clone();
1875                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1876                Ok(CatalogItem::Source(i))
1877            }
1878            CatalogItem::Sink(i) => {
1879                let mut i = i.clone();
1880                i.create_sql = do_rewrite(i.create_sql)?;
1881                Ok(CatalogItem::Sink(i))
1882            }
1883            CatalogItem::View(i) => {
1884                let mut i = i.clone();
1885                i.create_sql = do_rewrite(i.create_sql)?;
1886                Ok(CatalogItem::View(i))
1887            }
1888            CatalogItem::MaterializedView(i) => {
1889                let mut i = i.clone();
1890                i.create_sql = do_rewrite(i.create_sql)?;
1891                Ok(CatalogItem::MaterializedView(i))
1892            }
1893            CatalogItem::Index(i) => {
1894                let mut i = i.clone();
1895                i.create_sql = do_rewrite(i.create_sql)?;
1896                Ok(CatalogItem::Index(i))
1897            }
1898            CatalogItem::Secret(i) => {
1899                let mut i = i.clone();
1900                i.create_sql = do_rewrite(i.create_sql)?;
1901                Ok(CatalogItem::Secret(i))
1902            }
1903            CatalogItem::Connection(i) => {
1904                let mut i = i.clone();
1905                i.create_sql = do_rewrite(i.create_sql)?;
1906                Ok(CatalogItem::Connection(i))
1907            }
1908            CatalogItem::Type(i) => {
1909                let mut i = i.clone();
1910                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1911                Ok(CatalogItem::Type(i))
1912            }
1913            CatalogItem::Func(i) => Ok(CatalogItem::Func(i.clone())),
1914            CatalogItem::ContinualTask(i) => {
1915                let mut i = i.clone();
1916                i.create_sql = do_rewrite(i.create_sql)?;
1917                Ok(CatalogItem::ContinualTask(i))
1918            }
1919        }
1920    }
1921
1922    /// Returns a clone of `self` with all instances of `from` renamed to `to`
1923    /// (with the option of including the item's own name) or errors if request
1924    /// is ambiguous.
1925    pub fn rename_item_refs(
1926        &self,
1927        from: FullItemName,
1928        to_item_name: String,
1929        rename_self: bool,
1930    ) -> Result<CatalogItem, String> {
1931        let do_rewrite = |create_sql: String| -> Result<String, String> {
1932            let mut create_stmt = mz_sql::parse::parse(&create_sql)
1933                .expect("invalid create sql persisted to catalog")
1934                .into_element()
1935                .ast;
1936            if rename_self {
1937                mz_sql::ast::transform::create_stmt_rename(&mut create_stmt, to_item_name.clone());
1938            }
1939            // Determination of what constitutes an ambiguous request is done here.
1940            mz_sql::ast::transform::create_stmt_rename_refs(&mut create_stmt, from, to_item_name)?;
1941            Ok(create_stmt.to_ast_string_stable())
1942        };
1943
1944        match self {
1945            CatalogItem::Table(i) => {
1946                let mut i = i.clone();
1947                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1948                Ok(CatalogItem::Table(i))
1949            }
1950            CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
1951            CatalogItem::Source(i) => {
1952                let mut i = i.clone();
1953                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1954                Ok(CatalogItem::Source(i))
1955            }
1956            CatalogItem::Sink(i) => {
1957                let mut i = i.clone();
1958                i.create_sql = do_rewrite(i.create_sql)?;
1959                Ok(CatalogItem::Sink(i))
1960            }
1961            CatalogItem::View(i) => {
1962                let mut i = i.clone();
1963                i.create_sql = do_rewrite(i.create_sql)?;
1964                Ok(CatalogItem::View(i))
1965            }
1966            CatalogItem::MaterializedView(i) => {
1967                let mut i = i.clone();
1968                i.create_sql = do_rewrite(i.create_sql)?;
1969                Ok(CatalogItem::MaterializedView(i))
1970            }
1971            CatalogItem::Index(i) => {
1972                let mut i = i.clone();
1973                i.create_sql = do_rewrite(i.create_sql)?;
1974                Ok(CatalogItem::Index(i))
1975            }
1976            CatalogItem::Secret(i) => {
1977                let mut i = i.clone();
1978                i.create_sql = do_rewrite(i.create_sql)?;
1979                Ok(CatalogItem::Secret(i))
1980            }
1981            CatalogItem::Func(_) | CatalogItem::Type(_) => {
1982                unreachable!("{}s cannot be renamed", self.typ())
1983            }
1984            CatalogItem::Connection(i) => {
1985                let mut i = i.clone();
1986                i.create_sql = do_rewrite(i.create_sql)?;
1987                Ok(CatalogItem::Connection(i))
1988            }
1989            CatalogItem::ContinualTask(i) => {
1990                let mut i = i.clone();
1991                i.create_sql = do_rewrite(i.create_sql)?;
1992                Ok(CatalogItem::ContinualTask(i))
1993            }
1994        }
1995    }
1996
1997    /// Updates the retain history for an item. Returns the previous retain history value. Returns
1998    /// an error if this item does not support retain history.
1999    pub fn update_retain_history(
2000        &mut self,
2001        value: Option<Value>,
2002        window: CompactionWindow,
2003    ) -> Result<Option<WithOptionValue<Raw>>, ()> {
2004        let update = |mut ast: &mut Statement<Raw>| {
2005            // Each statement type has unique option types. This macro handles them commonly.
2006            macro_rules! update_retain_history {
2007                ( $stmt:ident, $opt:ident, $name:ident ) => {{
2008                    // Replace or add the option.
2009                    let pos = $stmt
2010                        .with_options
2011                        .iter()
2012                        // In case there are ever multiple, look for the last one.
2013                        .rposition(|o| o.name == mz_sql_parser::ast::$name::RetainHistory);
2014                    if let Some(value) = value {
2015                        let next = mz_sql_parser::ast::$opt {
2016                            name: mz_sql_parser::ast::$name::RetainHistory,
2017                            value: Some(WithOptionValue::RetainHistoryFor(value)),
2018                        };
2019                        if let Some(idx) = pos {
2020                            let previous = $stmt.with_options[idx].clone();
2021                            $stmt.with_options[idx] = next;
2022                            previous.value
2023                        } else {
2024                            $stmt.with_options.push(next);
2025                            None
2026                        }
2027                    } else {
2028                        if let Some(idx) = pos {
2029                            $stmt.with_options.swap_remove(idx).value
2030                        } else {
2031                            None
2032                        }
2033                    }
2034                }};
2035            }
2036            let previous = match &mut ast {
2037                Statement::CreateTable(stmt) => {
2038                    update_retain_history!(stmt, TableOption, TableOptionName)
2039                }
2040                Statement::CreateIndex(stmt) => {
2041                    update_retain_history!(stmt, IndexOption, IndexOptionName)
2042                }
2043                Statement::CreateSource(stmt) => {
2044                    update_retain_history!(stmt, CreateSourceOption, CreateSourceOptionName)
2045                }
2046                Statement::CreateMaterializedView(stmt) => {
2047                    update_retain_history!(stmt, MaterializedViewOption, MaterializedViewOptionName)
2048                }
2049                _ => {
2050                    return Err(());
2051                }
2052            };
2053            Ok(previous)
2054        };
2055
2056        let res = self.update_sql(update)?;
2057        let cw = self
2058            .custom_logical_compaction_window_mut()
2059            .expect("item must have compaction window");
2060        *cw = Some(window);
2061        Ok(res)
2062    }
2063
2064    pub fn add_column(
2065        &mut self,
2066        name: ColumnName,
2067        typ: SqlColumnType,
2068        sql: RawDataType,
2069    ) -> Result<RelationVersion, PlanError> {
2070        let CatalogItem::Table(table) = self else {
2071            return Err(PlanError::Unsupported {
2072                feature: "adding columns to a non-Table".to_string(),
2073                discussion_no: None,
2074            });
2075        };
2076        let next_version = table.desc.add_column(name.clone(), typ);
2077
2078        let update = |mut ast: &mut Statement<Raw>| match &mut ast {
2079            Statement::CreateTable(stmt) => {
2080                let version = ColumnOptionDef {
2081                    name: None,
2082                    option: ColumnOption::Versioned {
2083                        action: ColumnVersioned::Added,
2084                        version: next_version.into(),
2085                    },
2086                };
2087                let column = ColumnDef {
2088                    name: name.into(),
2089                    data_type: sql,
2090                    collation: None,
2091                    options: vec![version],
2092                };
2093                stmt.columns.push(column);
2094                Ok(())
2095            }
2096            _ => Err(()),
2097        };
2098
2099        self.update_sql(update)
2100            .map_err(|()| PlanError::Unstructured("expected CREATE TABLE statement".to_string()))?;
2101        Ok(next_version)
2102    }
2103
2104    /// Updates the create_sql field of this item. Returns an error if this is a builtin item,
2105    /// otherwise returns f's result.
2106    pub fn update_sql<F, T>(&mut self, f: F) -> Result<T, ()>
2107    where
2108        F: FnOnce(&mut Statement<Raw>) -> Result<T, ()>,
2109    {
2110        let create_sql = match self {
2111            CatalogItem::Table(Table { create_sql, .. })
2112            | CatalogItem::Type(Type { create_sql, .. })
2113            | CatalogItem::Source(Source { create_sql, .. }) => create_sql.as_mut(),
2114            CatalogItem::Sink(Sink { create_sql, .. })
2115            | CatalogItem::View(View { create_sql, .. })
2116            | CatalogItem::MaterializedView(MaterializedView { create_sql, .. })
2117            | CatalogItem::Index(Index { create_sql, .. })
2118            | CatalogItem::Secret(Secret { create_sql, .. })
2119            | CatalogItem::Connection(Connection { create_sql, .. })
2120            | CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => Some(create_sql),
2121            CatalogItem::Func(_) | CatalogItem::Log(_) => None,
2122        };
2123        let Some(create_sql) = create_sql else {
2124            return Err(());
2125        };
2126        let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
2127            .expect("non-system items must be parseable")
2128            .into_element()
2129            .ast;
2130        debug!("rewrite: {}", ast.to_ast_string_redacted());
2131        let t = f(&mut ast)?;
2132        *create_sql = ast.to_ast_string_stable();
2133        debug!("rewrote: {}", ast.to_ast_string_redacted());
2134        Ok(t)
2135    }
2136
2137    /// If the object is considered a "compute object"
2138    /// (i.e., it is managed by the compute controller),
2139    /// this function returns its cluster ID. Otherwise, it returns nothing.
2140    ///
2141    /// This function differs from `cluster_id` because while all
2142    /// compute objects run on a cluster, the converse is not true.
2143    pub fn is_compute_object_on_cluster(&self) -> Option<ClusterId> {
2144        match self {
2145            CatalogItem::Index(index) => Some(index.cluster_id),
2146            CatalogItem::Table(_)
2147            | CatalogItem::Source(_)
2148            | CatalogItem::Log(_)
2149            | CatalogItem::View(_)
2150            | CatalogItem::MaterializedView(_)
2151            | CatalogItem::Sink(_)
2152            | CatalogItem::Type(_)
2153            | CatalogItem::Func(_)
2154            | CatalogItem::Secret(_)
2155            | CatalogItem::Connection(_)
2156            | CatalogItem::ContinualTask(_) => None,
2157        }
2158    }
2159
2160    pub fn cluster_id(&self) -> Option<ClusterId> {
2161        match self {
2162            CatalogItem::MaterializedView(mv) => Some(mv.cluster_id),
2163            CatalogItem::Index(index) => Some(index.cluster_id),
2164            CatalogItem::Source(source) => match &source.data_source {
2165                DataSourceDesc::Ingestion { cluster_id, .. }
2166                | DataSourceDesc::OldSyntaxIngestion { cluster_id, .. } => Some(*cluster_id),
2167                // This is somewhat of a lie because the export runs on the same
2168                // cluster as its ingestion but we don't yet have a way of
2169                // cross-referencing the items
2170                DataSourceDesc::IngestionExport { .. } => None,
2171                DataSourceDesc::Webhook { cluster_id, .. } => Some(*cluster_id),
2172                DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => None,
2173            },
2174            CatalogItem::Sink(sink) => Some(sink.cluster_id),
2175            CatalogItem::ContinualTask(ct) => Some(ct.cluster_id),
2176            CatalogItem::Table(_)
2177            | CatalogItem::Log(_)
2178            | CatalogItem::View(_)
2179            | CatalogItem::Type(_)
2180            | CatalogItem::Func(_)
2181            | CatalogItem::Secret(_)
2182            | CatalogItem::Connection(_) => None,
2183        }
2184    }
2185
2186    /// The custom compaction window, if any has been set. This does not reflect any propagated
2187    /// compaction window (i.e., source -> subsource).
2188    pub fn custom_logical_compaction_window(&self) -> Option<CompactionWindow> {
2189        match self {
2190            CatalogItem::Table(table) => table.custom_logical_compaction_window,
2191            CatalogItem::Source(source) => source.custom_logical_compaction_window,
2192            CatalogItem::Index(index) => index.custom_logical_compaction_window,
2193            CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
2194            CatalogItem::Log(_)
2195            | CatalogItem::View(_)
2196            | CatalogItem::Sink(_)
2197            | CatalogItem::Type(_)
2198            | CatalogItem::Func(_)
2199            | CatalogItem::Secret(_)
2200            | CatalogItem::Connection(_)
2201            | CatalogItem::ContinualTask(_) => None,
2202        }
2203    }
2204
2205    /// Mutable access to the custom compaction window, or None if this type does not support custom
2206    /// compaction windows. This does not reflect any propagated compaction window (i.e., source ->
2207    /// subsource).
2208    pub fn custom_logical_compaction_window_mut(
2209        &mut self,
2210    ) -> Option<&mut Option<CompactionWindow>> {
2211        let cw = match self {
2212            CatalogItem::Table(table) => &mut table.custom_logical_compaction_window,
2213            CatalogItem::Source(source) => &mut source.custom_logical_compaction_window,
2214            CatalogItem::Index(index) => &mut index.custom_logical_compaction_window,
2215            CatalogItem::MaterializedView(mview) => &mut mview.custom_logical_compaction_window,
2216            CatalogItem::Log(_)
2217            | CatalogItem::View(_)
2218            | CatalogItem::Sink(_)
2219            | CatalogItem::Type(_)
2220            | CatalogItem::Func(_)
2221            | CatalogItem::Secret(_)
2222            | CatalogItem::Connection(_)
2223            | CatalogItem::ContinualTask(_) => return None,
2224        };
2225        Some(cw)
2226    }
2227
2228    /// The initial compaction window, for objects that have one; that is, tables, sources, indexes,
2229    /// and MVs. This does not reflect any propagated compaction window (i.e., source -> subsource).
2230    ///
2231    /// If `custom_logical_compaction_window()` returns something, use that.  Otherwise, use a
2232    /// sensible default (currently 1s).
2233    ///
2234    /// For objects that do not have the concept of compaction window, return None.
2235    pub fn initial_logical_compaction_window(&self) -> Option<CompactionWindow> {
2236        let custom_logical_compaction_window = match self {
2237            CatalogItem::Table(_)
2238            | CatalogItem::Source(_)
2239            | CatalogItem::Index(_)
2240            | CatalogItem::MaterializedView(_)
2241            | CatalogItem::ContinualTask(_) => self.custom_logical_compaction_window(),
2242            CatalogItem::Log(_)
2243            | CatalogItem::View(_)
2244            | CatalogItem::Sink(_)
2245            | CatalogItem::Type(_)
2246            | CatalogItem::Func(_)
2247            | CatalogItem::Secret(_)
2248            | CatalogItem::Connection(_) => return None,
2249        };
2250        Some(custom_logical_compaction_window.unwrap_or(CompactionWindow::Default))
2251    }
2252
2253    /// Whether the item's logical compaction window
2254    /// is controlled by the METRICS_RETENTION
2255    /// system var.
2256    pub fn is_retained_metrics_object(&self) -> bool {
2257        match self {
2258            CatalogItem::Table(table) => table.is_retained_metrics_object,
2259            CatalogItem::Source(source) => source.is_retained_metrics_object,
2260            CatalogItem::Index(index) => index.is_retained_metrics_object,
2261            CatalogItem::Log(_)
2262            | CatalogItem::View(_)
2263            | CatalogItem::MaterializedView(_)
2264            | CatalogItem::Sink(_)
2265            | CatalogItem::Type(_)
2266            | CatalogItem::Func(_)
2267            | CatalogItem::Secret(_)
2268            | CatalogItem::Connection(_)
2269            | CatalogItem::ContinualTask(_) => false,
2270        }
2271    }
2272
2273    pub fn to_serialized(&self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2274        match self {
2275            CatalogItem::Table(table) => {
2276                let create_sql = table
2277                    .create_sql
2278                    .clone()
2279                    .expect("builtin tables cannot be serialized");
2280                let mut collections = table.collections.clone();
2281                let global_id = collections
2282                    .remove(&RelationVersion::root())
2283                    .expect("at least one version");
2284                (create_sql, global_id, collections)
2285            }
2286            CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2287            CatalogItem::Source(source) => {
2288                assert!(
2289                    !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2290                    "cannot serialize introspection/builtin sources",
2291                );
2292                let create_sql = source
2293                    .create_sql
2294                    .clone()
2295                    .expect("builtin sources cannot be serialized");
2296                (create_sql, source.global_id, BTreeMap::new())
2297            }
2298            CatalogItem::View(view) => (view.create_sql.clone(), view.global_id, BTreeMap::new()),
2299            CatalogItem::MaterializedView(mview) => {
2300                (mview.create_sql.clone(), mview.global_id, BTreeMap::new())
2301            }
2302            CatalogItem::Index(index) => {
2303                (index.create_sql.clone(), index.global_id, BTreeMap::new())
2304            }
2305            CatalogItem::Sink(sink) => (sink.create_sql.clone(), sink.global_id, BTreeMap::new()),
2306            CatalogItem::Type(typ) => {
2307                let create_sql = typ
2308                    .create_sql
2309                    .clone()
2310                    .expect("builtin types cannot be serialized");
2311                (create_sql, typ.global_id, BTreeMap::new())
2312            }
2313            CatalogItem::Secret(secret) => {
2314                (secret.create_sql.clone(), secret.global_id, BTreeMap::new())
2315            }
2316            CatalogItem::Connection(connection) => (
2317                connection.create_sql.clone(),
2318                connection.global_id,
2319                BTreeMap::new(),
2320            ),
2321            CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2322            CatalogItem::ContinualTask(ct) => {
2323                (ct.create_sql.clone(), ct.global_id, BTreeMap::new())
2324            }
2325        }
2326    }
2327
2328    pub fn into_serialized(self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2329        match self {
2330            CatalogItem::Table(mut table) => {
2331                let create_sql = table
2332                    .create_sql
2333                    .expect("builtin tables cannot be serialized");
2334                let global_id = table
2335                    .collections
2336                    .remove(&RelationVersion::root())
2337                    .expect("at least one version");
2338                (create_sql, global_id, table.collections)
2339            }
2340            CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2341            CatalogItem::Source(source) => {
2342                assert!(
2343                    !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2344                    "cannot serialize introspection/builtin sources",
2345                );
2346                let create_sql = source
2347                    .create_sql
2348                    .expect("builtin sources cannot be serialized");
2349                (create_sql, source.global_id, BTreeMap::new())
2350            }
2351            CatalogItem::View(view) => (view.create_sql, view.global_id, BTreeMap::new()),
2352            CatalogItem::MaterializedView(mview) => {
2353                (mview.create_sql, mview.global_id, BTreeMap::new())
2354            }
2355            CatalogItem::Index(index) => (index.create_sql, index.global_id, BTreeMap::new()),
2356            CatalogItem::Sink(sink) => (sink.create_sql, sink.global_id, BTreeMap::new()),
2357            CatalogItem::Type(typ) => {
2358                let create_sql = typ.create_sql.expect("builtin types cannot be serialized");
2359                (create_sql, typ.global_id, BTreeMap::new())
2360            }
2361            CatalogItem::Secret(secret) => (secret.create_sql, secret.global_id, BTreeMap::new()),
2362            CatalogItem::Connection(connection) => {
2363                (connection.create_sql, connection.global_id, BTreeMap::new())
2364            }
2365            CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2366            CatalogItem::ContinualTask(ct) => (ct.create_sql, ct.global_id, BTreeMap::new()),
2367        }
2368    }
2369}
2370
2371impl CatalogEntry {
2372    /// Reports the latest [`RelationDesc`] of the rows produced by this [`CatalogEntry`],
2373    /// returning an error if this [`CatalogEntry`] does not produce rows.
2374    ///
2375    /// If you need to get the [`RelationDesc`] for a specific version, see [`CatalogItem::desc`].
2376    pub fn desc_latest(
2377        &self,
2378        name: &FullItemName,
2379    ) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
2380        self.item.desc(name, RelationVersionSelector::Latest)
2381    }
2382
2383    /// Reports the latest [`RelationDesc`] of the rows produced by this [`CatalogEntry`], if it
2384    /// produces rows.
2385    pub fn desc_opt_latest(&self) -> Option<Cow<'_, RelationDesc>> {
2386        self.item.desc_opt(RelationVersionSelector::Latest)
2387    }
2388
2389    /// Reports if the item has columns.
2390    pub fn has_columns(&self) -> bool {
2391        self.desc_opt_latest().is_some()
2392    }
2393
2394    /// Returns the [`mz_sql::func::Func`] associated with this `CatalogEntry`.
2395    pub fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
2396        self.item.func(self)
2397    }
2398
2399    /// Returns the inner [`Index`] if this entry is an index, else `None`.
2400    pub fn index(&self) -> Option<&Index> {
2401        match self.item() {
2402            CatalogItem::Index(idx) => Some(idx),
2403            _ => None,
2404        }
2405    }
2406
2407    /// Returns the inner [`MaterializedView`] if this entry is a materialized view, else `None`.
2408    pub fn materialized_view(&self) -> Option<&MaterializedView> {
2409        match self.item() {
2410            CatalogItem::MaterializedView(mv) => Some(mv),
2411            _ => None,
2412        }
2413    }
2414
2415    /// Returns the inner [`Table`] if this entry is a table, else `None`.
2416    pub fn table(&self) -> Option<&Table> {
2417        match self.item() {
2418            CatalogItem::Table(tbl) => Some(tbl),
2419            _ => None,
2420        }
2421    }
2422
2423    /// Returns the inner [`Source`] if this entry is a source, else `None`.
2424    pub fn source(&self) -> Option<&Source> {
2425        match self.item() {
2426            CatalogItem::Source(src) => Some(src),
2427            _ => None,
2428        }
2429    }
2430
2431    /// Returns the inner [`Sink`] if this entry is a sink, else `None`.
2432    pub fn sink(&self) -> Option<&Sink> {
2433        match self.item() {
2434            CatalogItem::Sink(sink) => Some(sink),
2435            _ => None,
2436        }
2437    }
2438
2439    /// Returns the inner [`Secret`] if this entry is a secret, else `None`.
2440    pub fn secret(&self) -> Option<&Secret> {
2441        match self.item() {
2442            CatalogItem::Secret(secret) => Some(secret),
2443            _ => None,
2444        }
2445    }
2446
2447    pub fn connection(&self) -> Result<&Connection, SqlCatalogError> {
2448        match self.item() {
2449            CatalogItem::Connection(connection) => Ok(connection),
2450            _ => {
2451                let db_name = match self.name().qualifiers.database_spec {
2452                    ResolvedDatabaseSpecifier::Ambient => "".to_string(),
2453                    ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
2454                };
2455                Err(SqlCatalogError::UnknownConnection(format!(
2456                    "{}{}.{}",
2457                    db_name,
2458                    self.name().qualifiers.schema_spec,
2459                    self.name().item
2460                )))
2461            }
2462        }
2463    }
2464
2465    /// Returns the [`mz_storage_types::sources::SourceDesc`] associated with
2466    /// this `CatalogEntry`, if any.
2467    pub fn source_desc(
2468        &self,
2469    ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
2470        self.item.source_desc(self)
2471    }
2472
2473    /// Reports whether this catalog entry is a connection.
2474    pub fn is_connection(&self) -> bool {
2475        matches!(self.item(), CatalogItem::Connection(_))
2476    }
2477
2478    /// Reports whether this catalog entry is a table.
2479    pub fn is_table(&self) -> bool {
2480        matches!(self.item(), CatalogItem::Table(_))
2481    }
2482
2483    /// Reports whether this catalog entry is a source. Note that this includes
2484    /// subsources.
2485    pub fn is_source(&self) -> bool {
2486        matches!(self.item(), CatalogItem::Source(_))
2487    }
2488
2489    /// Reports whether this catalog entry is a subsource and, if it is, the
2490    /// ingestion it is an export of, as well as the item it exports.
2491    pub fn subsource_details(
2492        &self,
2493    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
2494        match &self.item() {
2495            CatalogItem::Source(source) => match &source.data_source {
2496                DataSourceDesc::IngestionExport {
2497                    ingestion_id,
2498                    external_reference,
2499                    details,
2500                    data_config: _,
2501                } => Some((*ingestion_id, external_reference, details)),
2502                _ => None,
2503            },
2504            _ => None,
2505        }
2506    }
2507
2508    /// Reports whether this catalog entry is a source export and, if it is, the
2509    /// ingestion it is an export of, as well as the item it exports.
2510    pub fn source_export_details(
2511        &self,
2512    ) -> Option<(
2513        CatalogItemId,
2514        &UnresolvedItemName,
2515        &SourceExportDetails,
2516        &SourceExportDataConfig<ReferencedConnection>,
2517    )> {
2518        match &self.item() {
2519            CatalogItem::Source(source) => match &source.data_source {
2520                DataSourceDesc::IngestionExport {
2521                    ingestion_id,
2522                    external_reference,
2523                    details,
2524                    data_config,
2525                } => Some((*ingestion_id, external_reference, details, data_config)),
2526                _ => None,
2527            },
2528            CatalogItem::Table(table) => match &table.data_source {
2529                TableDataSource::DataSource {
2530                    desc:
2531                        DataSourceDesc::IngestionExport {
2532                            ingestion_id,
2533                            external_reference,
2534                            details,
2535                            data_config,
2536                        },
2537                    timeline: _,
2538                } => Some((*ingestion_id, external_reference, details, data_config)),
2539                _ => None,
2540            },
2541            _ => None,
2542        }
2543    }
2544
2545    /// Reports whether this catalog entry is a progress source.
2546    pub fn is_progress_source(&self) -> bool {
2547        self.item().is_progress_source()
2548    }
2549
2550    /// Returns the `GlobalId` of all of this entry's progress ID.
2551    pub fn progress_id(&self) -> Option<CatalogItemId> {
2552        match &self.item() {
2553            CatalogItem::Source(source) => match &source.data_source {
2554                DataSourceDesc::Ingestion { .. } => Some(self.id),
2555                DataSourceDesc::OldSyntaxIngestion {
2556                    progress_subsource, ..
2557                } => Some(*progress_subsource),
2558                DataSourceDesc::IngestionExport { .. }
2559                | DataSourceDesc::Introspection(_)
2560                | DataSourceDesc::Progress
2561                | DataSourceDesc::Webhook { .. } => None,
2562            },
2563            CatalogItem::Table(_)
2564            | CatalogItem::Log(_)
2565            | CatalogItem::View(_)
2566            | CatalogItem::MaterializedView(_)
2567            | CatalogItem::Sink(_)
2568            | CatalogItem::Index(_)
2569            | CatalogItem::Type(_)
2570            | CatalogItem::Func(_)
2571            | CatalogItem::Secret(_)
2572            | CatalogItem::Connection(_)
2573            | CatalogItem::ContinualTask(_) => None,
2574        }
2575    }
2576
2577    /// Reports whether this catalog entry is a sink.
2578    pub fn is_sink(&self) -> bool {
2579        matches!(self.item(), CatalogItem::Sink(_))
2580    }
2581
2582    /// Reports whether this catalog entry is a materialized view.
2583    pub fn is_materialized_view(&self) -> bool {
2584        matches!(self.item(), CatalogItem::MaterializedView(_))
2585    }
2586
2587    /// Reports whether this catalog entry is a view.
2588    pub fn is_view(&self) -> bool {
2589        matches!(self.item(), CatalogItem::View(_))
2590    }
2591
2592    /// Reports whether this catalog entry is a secret.
2593    pub fn is_secret(&self) -> bool {
2594        matches!(self.item(), CatalogItem::Secret(_))
2595    }
2596
2597    /// Reports whether this catalog entry is an introspection source.
2598    pub fn is_introspection_source(&self) -> bool {
2599        matches!(self.item(), CatalogItem::Log(_))
2600    }
2601
2602    /// Reports whether this catalog entry is an index.
2603    pub fn is_index(&self) -> bool {
2604        matches!(self.item(), CatalogItem::Index(_))
2605    }
2606
2607    /// Reports whether this catalog entry is a continual task.
2608    pub fn is_continual_task(&self) -> bool {
2609        matches!(self.item(), CatalogItem::ContinualTask(_))
2610    }
2611
2612    /// Reports whether this catalog entry can be treated as a relation, it can produce rows.
2613    pub fn is_relation(&self) -> bool {
2614        mz_sql::catalog::ObjectType::from(self.item_type()).is_relation()
2615    }
2616
2617    /// Collects the identifiers of the objects that were encountered when
2618    /// resolving names in the item's DDL statement.
2619    pub fn references(&self) -> &ResolvedIds {
2620        self.item.references()
2621    }
2622
2623    /// Collects the identifiers of the objects used by this [`CatalogEntry`].
2624    ///
2625    /// Like [`CatalogEntry::references()`] but also includes objects that are not directly
2626    /// referenced. For example this will include any catalog objects used to implement functions
2627    /// and casts in the item.
2628    pub fn uses(&self) -> BTreeSet<CatalogItemId> {
2629        self.item.uses()
2630    }
2631
2632    /// Returns the `CatalogItem` associated with this catalog entry.
2633    pub fn item(&self) -> &CatalogItem {
2634        &self.item
2635    }
2636
2637    /// Returns the [`CatalogItemId`] of this catalog entry.
2638    pub fn id(&self) -> CatalogItemId {
2639        self.id
2640    }
2641
2642    /// Returns all of the [`GlobalId`]s associated with this item.
2643    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2644        self.item().global_ids()
2645    }
2646
2647    pub fn latest_global_id(&self) -> GlobalId {
2648        self.item().latest_global_id()
2649    }
2650
2651    /// Returns the OID of this catalog entry.
2652    pub fn oid(&self) -> u32 {
2653        self.oid
2654    }
2655
2656    /// Returns the fully qualified name of this catalog entry.
2657    pub fn name(&self) -> &QualifiedItemName {
2658        &self.name
2659    }
2660
2661    /// Returns the identifiers of the dataflows that are directly referenced by this dataflow.
2662    pub fn referenced_by(&self) -> &[CatalogItemId] {
2663        &self.referenced_by
2664    }
2665
2666    /// Returns the identifiers of the dataflows that depend upon this dataflow.
2667    pub fn used_by(&self) -> &[CatalogItemId] {
2668        &self.used_by
2669    }
2670
2671    /// Returns the connection ID that this item belongs to, if this item is
2672    /// temporary.
2673    pub fn conn_id(&self) -> Option<&ConnectionId> {
2674        self.item.conn_id()
2675    }
2676
2677    /// Returns the role ID of the entry owner.
2678    pub fn owner_id(&self) -> &RoleId {
2679        &self.owner_id
2680    }
2681
2682    /// Returns the privileges of the entry.
2683    pub fn privileges(&self) -> &PrivilegeMap {
2684        &self.privileges
2685    }
2686}
2687
2688#[derive(Debug, Clone, Default)]
2689pub struct CommentsMap {
2690    map: BTreeMap<CommentObjectId, BTreeMap<Option<usize>, String>>,
2691}
2692
2693impl CommentsMap {
2694    pub fn update_comment(
2695        &mut self,
2696        object_id: CommentObjectId,
2697        sub_component: Option<usize>,
2698        comment: Option<String>,
2699    ) -> Option<String> {
2700        let object_comments = self.map.entry(object_id).or_default();
2701
2702        // Either replace the existing comment, or remove it if comment is None/NULL.
2703        let (empty, prev) = if let Some(comment) = comment {
2704            let prev = object_comments.insert(sub_component, comment);
2705            (false, prev)
2706        } else {
2707            let prev = object_comments.remove(&sub_component);
2708            (object_comments.is_empty(), prev)
2709        };
2710
2711        // Cleanup entries that are now empty.
2712        if empty {
2713            self.map.remove(&object_id);
2714        }
2715
2716        // Return the previous comment, if there was one, for easy removal.
2717        prev
2718    }
2719
2720    /// Remove all comments for `object_id` from the map.
2721    ///
2722    /// Generally there is one comment for a given [`CommentObjectId`], but in the case of
2723    /// relations you can also have comments on the individual columns. Dropping the comments for a
2724    /// relation will also drop all of the comments on any columns.
2725    pub fn drop_comments(
2726        &mut self,
2727        object_ids: &BTreeSet<CommentObjectId>,
2728    ) -> Vec<(CommentObjectId, Option<usize>, String)> {
2729        let mut removed_comments = Vec::new();
2730
2731        for object_id in object_ids {
2732            if let Some(comments) = self.map.remove(object_id) {
2733                let removed = comments
2734                    .into_iter()
2735                    .map(|(sub_comp, comment)| (object_id.clone(), sub_comp, comment));
2736                removed_comments.extend(removed);
2737            }
2738        }
2739
2740        removed_comments
2741    }
2742
2743    pub fn iter(&self) -> impl Iterator<Item = (CommentObjectId, Option<usize>, &str)> {
2744        self.map
2745            .iter()
2746            .map(|(id, comments)| {
2747                comments
2748                    .iter()
2749                    .map(|(pos, comment)| (*id, *pos, comment.as_str()))
2750            })
2751            .flatten()
2752    }
2753
2754    pub fn get_object_comments(
2755        &self,
2756        object_id: CommentObjectId,
2757    ) -> Option<&BTreeMap<Option<usize>, String>> {
2758        self.map.get(&object_id)
2759    }
2760}
2761
2762impl Serialize for CommentsMap {
2763    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2764    where
2765        S: serde::Serializer,
2766    {
2767        let comment_count = self
2768            .map
2769            .iter()
2770            .map(|(_object_id, comments)| comments.len())
2771            .sum();
2772
2773        let mut seq = serializer.serialize_seq(Some(comment_count))?;
2774        for (object_id, sub) in &self.map {
2775            for (sub_component, comment) in sub {
2776                seq.serialize_element(&(
2777                    format!("{object_id:?}"),
2778                    format!("{sub_component:?}"),
2779                    comment,
2780                ))?;
2781            }
2782        }
2783        seq.end()
2784    }
2785}
2786
2787#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
2788pub struct DefaultPrivileges {
2789    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
2790    privileges: BTreeMap<DefaultPrivilegeObject, RoleDefaultPrivileges>,
2791}
2792
2793// Use a new type here because otherwise we have two levels of BTreeMap, both needing
2794// map_key_to_string.
2795#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
2796struct RoleDefaultPrivileges(
2797    /// Denormalized, the key is the grantee Role.
2798    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
2799    BTreeMap<RoleId, DefaultPrivilegeAclItem>,
2800);
2801
2802impl Deref for RoleDefaultPrivileges {
2803    type Target = BTreeMap<RoleId, DefaultPrivilegeAclItem>;
2804
2805    fn deref(&self) -> &Self::Target {
2806        &self.0
2807    }
2808}
2809
2810impl DerefMut for RoleDefaultPrivileges {
2811    fn deref_mut(&mut self) -> &mut Self::Target {
2812        &mut self.0
2813    }
2814}
2815
2816impl DefaultPrivileges {
2817    /// Add a new default privilege into the set of all default privileges.
2818    pub fn grant(&mut self, object: DefaultPrivilegeObject, privilege: DefaultPrivilegeAclItem) {
2819        if privilege.acl_mode.is_empty() {
2820            return;
2821        }
2822
2823        let privileges = self.privileges.entry(object).or_default();
2824        if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
2825            default_privilege.acl_mode |= privilege.acl_mode;
2826        } else {
2827            privileges.insert(privilege.grantee, privilege);
2828        }
2829    }
2830
2831    /// Revoke a default privilege from the set of all default privileges.
2832    pub fn revoke(&mut self, object: &DefaultPrivilegeObject, privilege: &DefaultPrivilegeAclItem) {
2833        if let Some(privileges) = self.privileges.get_mut(object) {
2834            if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
2835                default_privilege.acl_mode =
2836                    default_privilege.acl_mode.difference(privilege.acl_mode);
2837                if default_privilege.acl_mode.is_empty() {
2838                    privileges.remove(&privilege.grantee);
2839                }
2840            }
2841            if privileges.is_empty() {
2842                self.privileges.remove(object);
2843            }
2844        }
2845    }
2846
2847    /// Get the privileges that will be granted on all objects matching `object` to `grantee`, if
2848    /// any exist.
2849    pub fn get_privileges_for_grantee(
2850        &self,
2851        object: &DefaultPrivilegeObject,
2852        grantee: &RoleId,
2853    ) -> Option<&AclMode> {
2854        self.privileges
2855            .get(object)
2856            .and_then(|privileges| privileges.get(grantee))
2857            .map(|privilege| &privilege.acl_mode)
2858    }
2859
2860    /// Get all default privileges that apply to the provided object details.
2861    pub fn get_applicable_privileges(
2862        &self,
2863        role_id: RoleId,
2864        database_id: Option<DatabaseId>,
2865        schema_id: Option<SchemaId>,
2866        object_type: mz_sql::catalog::ObjectType,
2867    ) -> impl Iterator<Item = DefaultPrivilegeAclItem> + '_ {
2868        // Privileges consider all relations to be of type table due to PostgreSQL compatibility. We
2869        // don't require the caller to worry about that and we will map their `object_type` to the
2870        // correct type for privileges.
2871        let privilege_object_type = if object_type.is_relation() {
2872            mz_sql::catalog::ObjectType::Table
2873        } else {
2874            object_type
2875        };
2876        let valid_acl_mode = rbac::all_object_privileges(SystemObjectType::Object(object_type));
2877
2878        // Collect all entries that apply to the provided object details.
2879        // If either `database_id` or `schema_id` are `None`, then we might end up with duplicate
2880        // entries in the vec below. That's OK because we consolidate the results after.
2881        [
2882            DefaultPrivilegeObject {
2883                role_id,
2884                database_id,
2885                schema_id,
2886                object_type: privilege_object_type,
2887            },
2888            DefaultPrivilegeObject {
2889                role_id,
2890                database_id,
2891                schema_id: None,
2892                object_type: privilege_object_type,
2893            },
2894            DefaultPrivilegeObject {
2895                role_id,
2896                database_id: None,
2897                schema_id: None,
2898                object_type: privilege_object_type,
2899            },
2900            DefaultPrivilegeObject {
2901                role_id: RoleId::Public,
2902                database_id,
2903                schema_id,
2904                object_type: privilege_object_type,
2905            },
2906            DefaultPrivilegeObject {
2907                role_id: RoleId::Public,
2908                database_id,
2909                schema_id: None,
2910                object_type: privilege_object_type,
2911            },
2912            DefaultPrivilegeObject {
2913                role_id: RoleId::Public,
2914                database_id: None,
2915                schema_id: None,
2916                object_type: privilege_object_type,
2917            },
2918        ]
2919        .into_iter()
2920        .filter_map(|object| self.privileges.get(&object))
2921        .flat_map(|acl_map| acl_map.values())
2922        // Consolidate privileges with a common grantee.
2923        .fold(
2924            BTreeMap::new(),
2925            |mut accum, DefaultPrivilegeAclItem { grantee, acl_mode }| {
2926                let accum_acl_mode = accum.entry(grantee).or_insert_with(AclMode::empty);
2927                *accum_acl_mode |= *acl_mode;
2928                accum
2929            },
2930        )
2931        .into_iter()
2932        // Restrict the acl_mode to only privileges valid for the provided object type. If the
2933        // default privilege has an object type of Table, then it may contain privileges valid for
2934        // tables but not other relations. If the passed in object type is another relation, then
2935        // we need to remove any privilege that is not valid for the specified relation.
2936        .map(move |(grantee, acl_mode)| (grantee, acl_mode & valid_acl_mode))
2937        // Filter out empty privileges.
2938        .filter(|(_, acl_mode)| !acl_mode.is_empty())
2939        .map(|(grantee, acl_mode)| DefaultPrivilegeAclItem {
2940            grantee: *grantee,
2941            acl_mode,
2942        })
2943    }
2944
2945    pub fn iter(
2946        &self,
2947    ) -> impl Iterator<
2948        Item = (
2949            &DefaultPrivilegeObject,
2950            impl Iterator<Item = &DefaultPrivilegeAclItem>,
2951        ),
2952    > {
2953        self.privileges
2954            .iter()
2955            .map(|(object, acl_map)| (object, acl_map.values()))
2956    }
2957}
2958
2959#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
2960pub struct ClusterConfig {
2961    pub variant: ClusterVariant,
2962    pub workload_class: Option<String>,
2963}
2964
2965impl ClusterConfig {
2966    pub fn features(&self) -> Option<&OptimizerFeatureOverrides> {
2967        match &self.variant {
2968            ClusterVariant::Managed(managed) => Some(&managed.optimizer_feature_overrides),
2969            ClusterVariant::Unmanaged => None,
2970        }
2971    }
2972}
2973
2974impl From<ClusterConfig> for durable::ClusterConfig {
2975    fn from(config: ClusterConfig) -> Self {
2976        Self {
2977            variant: config.variant.into(),
2978            workload_class: config.workload_class,
2979        }
2980    }
2981}
2982
2983impl From<durable::ClusterConfig> for ClusterConfig {
2984    fn from(config: durable::ClusterConfig) -> Self {
2985        Self {
2986            variant: config.variant.into(),
2987            workload_class: config.workload_class,
2988        }
2989    }
2990}
2991
2992#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
2993pub struct ClusterVariantManaged {
2994    pub size: String,
2995    pub availability_zones: Vec<String>,
2996    pub logging: ReplicaLogging,
2997    pub replication_factor: u32,
2998    pub optimizer_feature_overrides: OptimizerFeatureOverrides,
2999    pub schedule: ClusterSchedule,
3000}
3001
3002impl From<ClusterVariantManaged> for durable::ClusterVariantManaged {
3003    fn from(managed: ClusterVariantManaged) -> Self {
3004        Self {
3005            size: managed.size,
3006            availability_zones: managed.availability_zones,
3007            logging: managed.logging,
3008            replication_factor: managed.replication_factor,
3009            optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3010            schedule: managed.schedule,
3011        }
3012    }
3013}
3014
3015impl From<durable::ClusterVariantManaged> for ClusterVariantManaged {
3016    fn from(managed: durable::ClusterVariantManaged) -> Self {
3017        Self {
3018            size: managed.size,
3019            availability_zones: managed.availability_zones,
3020            logging: managed.logging,
3021            replication_factor: managed.replication_factor,
3022            optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3023            schedule: managed.schedule,
3024        }
3025    }
3026}
3027
3028#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3029pub enum ClusterVariant {
3030    Managed(ClusterVariantManaged),
3031    Unmanaged,
3032}
3033
3034impl From<ClusterVariant> for durable::ClusterVariant {
3035    fn from(variant: ClusterVariant) -> Self {
3036        match variant {
3037            ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3038            ClusterVariant::Unmanaged => Self::Unmanaged,
3039        }
3040    }
3041}
3042
3043impl From<durable::ClusterVariant> for ClusterVariant {
3044    fn from(variant: durable::ClusterVariant) -> Self {
3045        match variant {
3046            durable::ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3047            durable::ClusterVariant::Unmanaged => Self::Unmanaged,
3048        }
3049    }
3050}
3051
3052impl mz_sql::catalog::CatalogDatabase for Database {
3053    fn name(&self) -> &str {
3054        &self.name
3055    }
3056
3057    fn id(&self) -> DatabaseId {
3058        self.id
3059    }
3060
3061    fn has_schemas(&self) -> bool {
3062        !self.schemas_by_name.is_empty()
3063    }
3064
3065    fn schema_ids(&self) -> &BTreeMap<String, SchemaId> {
3066        &self.schemas_by_name
3067    }
3068
3069    // `as` is ok to use to cast to a trait object.
3070    #[allow(clippy::as_conversions)]
3071    fn schemas(&self) -> Vec<&dyn CatalogSchema> {
3072        self.schemas_by_id
3073            .values()
3074            .map(|schema| schema as &dyn CatalogSchema)
3075            .collect()
3076    }
3077
3078    fn owner_id(&self) -> RoleId {
3079        self.owner_id
3080    }
3081
3082    fn privileges(&self) -> &PrivilegeMap {
3083        &self.privileges
3084    }
3085}
3086
3087impl mz_sql::catalog::CatalogSchema for Schema {
3088    fn database(&self) -> &ResolvedDatabaseSpecifier {
3089        &self.name.database
3090    }
3091
3092    fn name(&self) -> &QualifiedSchemaName {
3093        &self.name
3094    }
3095
3096    fn id(&self) -> &SchemaSpecifier {
3097        &self.id
3098    }
3099
3100    fn has_items(&self) -> bool {
3101        !self.items.is_empty()
3102    }
3103
3104    fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_> {
3105        Box::new(
3106            self.items
3107                .values()
3108                .chain(self.functions.values())
3109                .chain(self.types.values())
3110                .copied(),
3111        )
3112    }
3113
3114    fn owner_id(&self) -> RoleId {
3115        self.owner_id
3116    }
3117
3118    fn privileges(&self) -> &PrivilegeMap {
3119        &self.privileges
3120    }
3121}
3122
3123impl mz_sql::catalog::CatalogRole for Role {
3124    fn name(&self) -> &str {
3125        &self.name
3126    }
3127
3128    fn id(&self) -> RoleId {
3129        self.id
3130    }
3131
3132    fn membership(&self) -> &BTreeMap<RoleId, RoleId> {
3133        &self.membership.map
3134    }
3135
3136    fn attributes(&self) -> &RoleAttributes {
3137        &self.attributes
3138    }
3139
3140    fn vars(&self) -> &BTreeMap<String, OwnedVarInput> {
3141        &self.vars.map
3142    }
3143}
3144
3145impl mz_sql::catalog::CatalogNetworkPolicy for NetworkPolicy {
3146    fn name(&self) -> &str {
3147        &self.name
3148    }
3149
3150    fn id(&self) -> NetworkPolicyId {
3151        self.id
3152    }
3153
3154    fn owner_id(&self) -> RoleId {
3155        self.owner_id
3156    }
3157
3158    fn privileges(&self) -> &PrivilegeMap {
3159        &self.privileges
3160    }
3161}
3162
3163impl mz_sql::catalog::CatalogCluster<'_> for Cluster {
3164    fn name(&self) -> &str {
3165        &self.name
3166    }
3167
3168    fn id(&self) -> ClusterId {
3169        self.id
3170    }
3171
3172    fn bound_objects(&self) -> &BTreeSet<CatalogItemId> {
3173        &self.bound_objects
3174    }
3175
3176    fn replica_ids(&self) -> &BTreeMap<String, ReplicaId> {
3177        &self.replica_id_by_name_
3178    }
3179
3180    // `as` is ok to use to cast to a trait object.
3181    #[allow(clippy::as_conversions)]
3182    fn replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>> {
3183        self.replicas()
3184            .map(|replica| replica as &dyn CatalogClusterReplica)
3185            .collect()
3186    }
3187
3188    fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica<'_> {
3189        self.replica(id).expect("catalog out of sync")
3190    }
3191
3192    fn owner_id(&self) -> RoleId {
3193        self.owner_id
3194    }
3195
3196    fn privileges(&self) -> &PrivilegeMap {
3197        &self.privileges
3198    }
3199
3200    fn is_managed(&self) -> bool {
3201        self.is_managed()
3202    }
3203
3204    fn managed_size(&self) -> Option<&str> {
3205        match &self.config.variant {
3206            ClusterVariant::Managed(ClusterVariantManaged { size, .. }) => Some(size),
3207            _ => None,
3208        }
3209    }
3210
3211    fn schedule(&self) -> Option<&ClusterSchedule> {
3212        match &self.config.variant {
3213            ClusterVariant::Managed(ClusterVariantManaged { schedule, .. }) => Some(schedule),
3214            _ => None,
3215        }
3216    }
3217
3218    fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
3219        self.try_to_plan()
3220    }
3221}
3222
3223impl mz_sql::catalog::CatalogClusterReplica<'_> for ClusterReplica {
3224    fn name(&self) -> &str {
3225        &self.name
3226    }
3227
3228    fn cluster_id(&self) -> ClusterId {
3229        self.cluster_id
3230    }
3231
3232    fn replica_id(&self) -> ReplicaId {
3233        self.replica_id
3234    }
3235
3236    fn owner_id(&self) -> RoleId {
3237        self.owner_id
3238    }
3239
3240    fn internal(&self) -> bool {
3241        self.config.location.internal()
3242    }
3243}
3244
3245impl mz_sql::catalog::CatalogItem for CatalogEntry {
3246    fn name(&self) -> &QualifiedItemName {
3247        self.name()
3248    }
3249
3250    fn id(&self) -> CatalogItemId {
3251        self.id()
3252    }
3253
3254    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
3255        Box::new(self.global_ids())
3256    }
3257
3258    fn oid(&self) -> u32 {
3259        self.oid()
3260    }
3261
3262    fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
3263        self.func()
3264    }
3265
3266    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
3267        self.source_desc()
3268    }
3269
3270    fn connection(
3271        &self,
3272    ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
3273    {
3274        Ok(self.connection()?.details.to_connection())
3275    }
3276
3277    fn create_sql(&self) -> &str {
3278        match self.item() {
3279            CatalogItem::Table(Table { create_sql, .. }) => {
3280                create_sql.as_deref().unwrap_or("<builtin>")
3281            }
3282            CatalogItem::Source(Source { create_sql, .. }) => {
3283                create_sql.as_deref().unwrap_or("<builtin>")
3284            }
3285            CatalogItem::Sink(Sink { create_sql, .. }) => create_sql,
3286            CatalogItem::View(View { create_sql, .. }) => create_sql,
3287            CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) => create_sql,
3288            CatalogItem::Index(Index { create_sql, .. }) => create_sql,
3289            CatalogItem::Type(Type { create_sql, .. }) => {
3290                create_sql.as_deref().unwrap_or("<builtin>")
3291            }
3292            CatalogItem::Secret(Secret { create_sql, .. }) => create_sql,
3293            CatalogItem::Connection(Connection { create_sql, .. }) => create_sql,
3294            CatalogItem::Func(_) => "<builtin>",
3295            CatalogItem::Log(_) => "<builtin>",
3296            CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => create_sql,
3297        }
3298    }
3299
3300    fn item_type(&self) -> SqlCatalogItemType {
3301        self.item().typ()
3302    }
3303
3304    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
3305        if let CatalogItem::Index(Index { keys, on, .. }) = self.item() {
3306            Some((keys, *on))
3307        } else {
3308            None
3309        }
3310    }
3311
3312    fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
3313        if let CatalogItem::Table(Table {
3314            data_source: TableDataSource::TableWrites { defaults },
3315            ..
3316        }) = self.item()
3317        {
3318            Some(defaults.as_slice())
3319        } else {
3320            None
3321        }
3322    }
3323
3324    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
3325        if let CatalogItem::Type(Type { details, .. }) = self.item() {
3326            Some(details)
3327        } else {
3328            None
3329        }
3330    }
3331
3332    fn references(&self) -> &ResolvedIds {
3333        self.references()
3334    }
3335
3336    fn uses(&self) -> BTreeSet<CatalogItemId> {
3337        self.uses()
3338    }
3339
3340    fn referenced_by(&self) -> &[CatalogItemId] {
3341        self.referenced_by()
3342    }
3343
3344    fn used_by(&self) -> &[CatalogItemId] {
3345        self.used_by()
3346    }
3347
3348    fn subsource_details(
3349        &self,
3350    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
3351        self.subsource_details()
3352    }
3353
3354    fn source_export_details(
3355        &self,
3356    ) -> Option<(
3357        CatalogItemId,
3358        &UnresolvedItemName,
3359        &SourceExportDetails,
3360        &SourceExportDataConfig<ReferencedConnection>,
3361    )> {
3362        self.source_export_details()
3363    }
3364
3365    fn is_progress_source(&self) -> bool {
3366        self.is_progress_source()
3367    }
3368
3369    fn progress_id(&self) -> Option<CatalogItemId> {
3370        self.progress_id()
3371    }
3372
3373    fn owner_id(&self) -> RoleId {
3374        self.owner_id
3375    }
3376
3377    fn privileges(&self) -> &PrivilegeMap {
3378        &self.privileges
3379    }
3380
3381    fn cluster_id(&self) -> Option<ClusterId> {
3382        self.item().cluster_id()
3383    }
3384
3385    fn at_version(
3386        &self,
3387        version: RelationVersionSelector,
3388    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
3389        Box::new(CatalogCollectionEntry {
3390            entry: self.clone(),
3391            version,
3392        })
3393    }
3394
3395    fn latest_version(&self) -> Option<RelationVersion> {
3396        self.table().map(|t| t.desc.latest_version())
3397    }
3398}
3399
3400/// A single update to the catalog state.
3401#[derive(Debug)]
3402pub struct StateUpdate {
3403    pub kind: StateUpdateKind,
3404    pub ts: Timestamp,
3405    pub diff: StateDiff,
3406}
3407
3408/// The contents of a single state update.
3409///
3410/// Variants are listed in dependency order.
3411#[derive(Debug, Clone)]
3412pub enum StateUpdateKind {
3413    Role(durable::objects::Role),
3414    RoleAuth(durable::objects::RoleAuth),
3415    Database(durable::objects::Database),
3416    Schema(durable::objects::Schema),
3417    DefaultPrivilege(durable::objects::DefaultPrivilege),
3418    SystemPrivilege(MzAclItem),
3419    SystemConfiguration(durable::objects::SystemConfiguration),
3420    Cluster(durable::objects::Cluster),
3421    NetworkPolicy(durable::objects::NetworkPolicy),
3422    IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3423    ClusterReplica(durable::objects::ClusterReplica),
3424    SourceReferences(durable::objects::SourceReferences),
3425    SystemObjectMapping(durable::objects::SystemObjectMapping),
3426    // Temporary items are not actually updated via the durable catalog, but this allows us to
3427    // model them the same way as all other items.
3428    TemporaryItem(TemporaryItem),
3429    Item(durable::objects::Item),
3430    Comment(durable::objects::Comment),
3431    AuditLog(durable::objects::AuditLog),
3432    // Storage updates.
3433    StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3434    UnfinalizedShard(durable::objects::UnfinalizedShard),
3435}
3436
3437/// Valid diffs for catalog state updates.
3438#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
3439pub enum StateDiff {
3440    Retraction,
3441    Addition,
3442}
3443
3444impl From<StateDiff> for Diff {
3445    fn from(diff: StateDiff) -> Self {
3446        match diff {
3447            StateDiff::Retraction => Diff::MINUS_ONE,
3448            StateDiff::Addition => Diff::ONE,
3449        }
3450    }
3451}
3452impl TryFrom<Diff> for StateDiff {
3453    type Error = String;
3454
3455    fn try_from(diff: Diff) -> Result<Self, Self::Error> {
3456        match diff {
3457            Diff::MINUS_ONE => Ok(Self::Retraction),
3458            Diff::ONE => Ok(Self::Addition),
3459            diff => Err(format!("invalid diff {diff}")),
3460        }
3461    }
3462}
3463
3464/// Information needed to process an update to a temporary item.
3465#[derive(Debug, Clone)]
3466pub struct TemporaryItem {
3467    pub id: CatalogItemId,
3468    pub oid: u32,
3469    pub name: QualifiedItemName,
3470    pub item: CatalogItem,
3471    pub owner_id: RoleId,
3472    pub privileges: PrivilegeMap,
3473}
3474
3475impl From<CatalogEntry> for TemporaryItem {
3476    fn from(entry: CatalogEntry) -> Self {
3477        TemporaryItem {
3478            id: entry.id,
3479            oid: entry.oid,
3480            name: entry.name,
3481            item: entry.item,
3482            owner_id: entry.owner_id,
3483            privileges: entry.privileges,
3484        }
3485    }
3486}
3487
3488/// The same as [`StateUpdateKind`], but without `TemporaryItem` so we can derive [`Ord`].
3489#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
3490pub enum BootstrapStateUpdateKind {
3491    Role(durable::objects::Role),
3492    RoleAuth(durable::objects::RoleAuth),
3493    Database(durable::objects::Database),
3494    Schema(durable::objects::Schema),
3495    DefaultPrivilege(durable::objects::DefaultPrivilege),
3496    SystemPrivilege(MzAclItem),
3497    SystemConfiguration(durable::objects::SystemConfiguration),
3498    Cluster(durable::objects::Cluster),
3499    NetworkPolicy(durable::objects::NetworkPolicy),
3500    IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3501    ClusterReplica(durable::objects::ClusterReplica),
3502    SourceReferences(durable::objects::SourceReferences),
3503    SystemObjectMapping(durable::objects::SystemObjectMapping),
3504    Item(durable::objects::Item),
3505    Comment(durable::objects::Comment),
3506    AuditLog(durable::objects::AuditLog),
3507    // Storage updates.
3508    StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3509    UnfinalizedShard(durable::objects::UnfinalizedShard),
3510}
3511
3512impl From<BootstrapStateUpdateKind> for StateUpdateKind {
3513    fn from(value: BootstrapStateUpdateKind) -> Self {
3514        match value {
3515            BootstrapStateUpdateKind::Role(kind) => StateUpdateKind::Role(kind),
3516            BootstrapStateUpdateKind::RoleAuth(kind) => StateUpdateKind::RoleAuth(kind),
3517            BootstrapStateUpdateKind::Database(kind) => StateUpdateKind::Database(kind),
3518            BootstrapStateUpdateKind::Schema(kind) => StateUpdateKind::Schema(kind),
3519            BootstrapStateUpdateKind::DefaultPrivilege(kind) => {
3520                StateUpdateKind::DefaultPrivilege(kind)
3521            }
3522            BootstrapStateUpdateKind::SystemPrivilege(kind) => {
3523                StateUpdateKind::SystemPrivilege(kind)
3524            }
3525            BootstrapStateUpdateKind::SystemConfiguration(kind) => {
3526                StateUpdateKind::SystemConfiguration(kind)
3527            }
3528            BootstrapStateUpdateKind::SourceReferences(kind) => {
3529                StateUpdateKind::SourceReferences(kind)
3530            }
3531            BootstrapStateUpdateKind::Cluster(kind) => StateUpdateKind::Cluster(kind),
3532            BootstrapStateUpdateKind::NetworkPolicy(kind) => StateUpdateKind::NetworkPolicy(kind),
3533            BootstrapStateUpdateKind::IntrospectionSourceIndex(kind) => {
3534                StateUpdateKind::IntrospectionSourceIndex(kind)
3535            }
3536            BootstrapStateUpdateKind::ClusterReplica(kind) => StateUpdateKind::ClusterReplica(kind),
3537            BootstrapStateUpdateKind::SystemObjectMapping(kind) => {
3538                StateUpdateKind::SystemObjectMapping(kind)
3539            }
3540            BootstrapStateUpdateKind::Item(kind) => StateUpdateKind::Item(kind),
3541            BootstrapStateUpdateKind::Comment(kind) => StateUpdateKind::Comment(kind),
3542            BootstrapStateUpdateKind::AuditLog(kind) => StateUpdateKind::AuditLog(kind),
3543            BootstrapStateUpdateKind::StorageCollectionMetadata(kind) => {
3544                StateUpdateKind::StorageCollectionMetadata(kind)
3545            }
3546            BootstrapStateUpdateKind::UnfinalizedShard(kind) => {
3547                StateUpdateKind::UnfinalizedShard(kind)
3548            }
3549        }
3550    }
3551}
3552
3553impl TryFrom<StateUpdateKind> for BootstrapStateUpdateKind {
3554    type Error = TemporaryItem;
3555
3556    fn try_from(value: StateUpdateKind) -> Result<Self, Self::Error> {
3557        match value {
3558            StateUpdateKind::Role(kind) => Ok(BootstrapStateUpdateKind::Role(kind)),
3559            StateUpdateKind::RoleAuth(kind) => Ok(BootstrapStateUpdateKind::RoleAuth(kind)),
3560            StateUpdateKind::Database(kind) => Ok(BootstrapStateUpdateKind::Database(kind)),
3561            StateUpdateKind::Schema(kind) => Ok(BootstrapStateUpdateKind::Schema(kind)),
3562            StateUpdateKind::DefaultPrivilege(kind) => {
3563                Ok(BootstrapStateUpdateKind::DefaultPrivilege(kind))
3564            }
3565            StateUpdateKind::SystemPrivilege(kind) => {
3566                Ok(BootstrapStateUpdateKind::SystemPrivilege(kind))
3567            }
3568            StateUpdateKind::SystemConfiguration(kind) => {
3569                Ok(BootstrapStateUpdateKind::SystemConfiguration(kind))
3570            }
3571            StateUpdateKind::Cluster(kind) => Ok(BootstrapStateUpdateKind::Cluster(kind)),
3572            StateUpdateKind::NetworkPolicy(kind) => {
3573                Ok(BootstrapStateUpdateKind::NetworkPolicy(kind))
3574            }
3575            StateUpdateKind::IntrospectionSourceIndex(kind) => {
3576                Ok(BootstrapStateUpdateKind::IntrospectionSourceIndex(kind))
3577            }
3578            StateUpdateKind::ClusterReplica(kind) => {
3579                Ok(BootstrapStateUpdateKind::ClusterReplica(kind))
3580            }
3581            StateUpdateKind::SourceReferences(kind) => {
3582                Ok(BootstrapStateUpdateKind::SourceReferences(kind))
3583            }
3584            StateUpdateKind::SystemObjectMapping(kind) => {
3585                Ok(BootstrapStateUpdateKind::SystemObjectMapping(kind))
3586            }
3587            StateUpdateKind::TemporaryItem(kind) => Err(kind),
3588            StateUpdateKind::Item(kind) => Ok(BootstrapStateUpdateKind::Item(kind)),
3589            StateUpdateKind::Comment(kind) => Ok(BootstrapStateUpdateKind::Comment(kind)),
3590            StateUpdateKind::AuditLog(kind) => Ok(BootstrapStateUpdateKind::AuditLog(kind)),
3591            StateUpdateKind::StorageCollectionMetadata(kind) => {
3592                Ok(BootstrapStateUpdateKind::StorageCollectionMetadata(kind))
3593            }
3594            StateUpdateKind::UnfinalizedShard(kind) => {
3595                Ok(BootstrapStateUpdateKind::UnfinalizedShard(kind))
3596            }
3597        }
3598    }
3599}