mz_catalog/memory/
objects.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The current types used by the in-memory Catalog. Many of the objects in this module are
11//! extremely similar to the objects found in [`crate::durable::objects`] but in a format that is
12//! easier consumed by higher layers.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, LazyLock};
18
19use chrono::{DateTime, Utc};
20use mz_adapter_types::compaction::CompactionWindow;
21use mz_adapter_types::connection::ConnectionId;
22use mz_compute_client::logging::LogVariant;
23use mz_controller::clusters::{ClusterRole, ClusterStatus, ReplicaConfig, ReplicaLogging};
24use mz_controller_types::{ClusterId, ReplicaId};
25use mz_expr::{MirScalarExpr, OptimizedMirRelationExpr};
26use mz_ore::collections::CollectionExt;
27use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
28use mz_repr::network_policy_id::NetworkPolicyId;
29use mz_repr::optimize::OptimizerFeatureOverrides;
30use mz_repr::refresh_schedule::RefreshSchedule;
31use mz_repr::role_id::RoleId;
32use mz_repr::{
33    CatalogItemId, ColumnName, Diff, GlobalId, RelationDesc, RelationVersion,
34    RelationVersionSelector, SqlColumnType, Timestamp, VersionedRelationDesc,
35};
36use mz_sql::ast::display::AstDisplay;
37use mz_sql::ast::{
38    ColumnDef, ColumnOption, ColumnOptionDef, ColumnVersioned, Expr, Raw, RawDataType, Statement,
39    UnresolvedItemName, Value, WithOptionValue,
40};
41use mz_sql::catalog::{
42    CatalogClusterReplica, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem,
43    CatalogItemType as SqlCatalogItemType, CatalogItemType, CatalogSchema, CatalogTypeDetails,
44    DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, RoleAttributes, RoleMembership,
45    RoleVars, SystemObjectType,
46};
47use mz_sql::names::{
48    Aug, CommentObjectId, DatabaseId, DependencyIds, FullItemName, QualifiedItemName,
49    QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier,
50};
51use mz_sql::plan::{
52    ClusterSchedule, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, ConnectionDetails,
53    CreateClusterManagedPlan, CreateClusterPlan, CreateClusterVariant, CreateSourcePlan,
54    HirRelationExpr, NetworkPolicyRule, PlanError, WebhookBodyFormat, WebhookHeaders,
55    WebhookValidation,
56};
57use mz_sql::rbac;
58use mz_sql::session::vars::OwnedVarInput;
59use mz_storage_client::controller::IntrospectionType;
60use mz_storage_types::connections::inline::ReferencedConnection;
61use mz_storage_types::sinks::{SinkEnvelope, StorageSinkConnection};
62use mz_storage_types::sources::load_generator::LoadGenerator;
63use mz_storage_types::sources::{
64    GenericSourceConnection, SourceConnection, SourceDesc, SourceEnvelope, SourceExportDataConfig,
65    SourceExportDetails, Timeline,
66};
67use serde::ser::SerializeSeq;
68use serde::{Deserialize, Serialize};
69use timely::progress::Antichain;
70use tracing::debug;
71
72use crate::builtin::{MZ_CATALOG_SERVER_CLUSTER, MZ_SYSTEM_CLUSTER};
73use crate::durable;
74
75/// Used to update `self` from the input value while consuming the input value.
76pub trait UpdateFrom<T>: From<T> {
77    fn update_from(&mut self, from: T);
78}
79
80#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
81pub struct Database {
82    pub name: String,
83    pub id: DatabaseId,
84    pub oid: u32,
85    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
86    pub schemas_by_id: BTreeMap<SchemaId, Schema>,
87    pub schemas_by_name: BTreeMap<String, SchemaId>,
88    pub owner_id: RoleId,
89    pub privileges: PrivilegeMap,
90}
91
92impl From<Database> for durable::Database {
93    fn from(database: Database) -> durable::Database {
94        durable::Database {
95            id: database.id,
96            oid: database.oid,
97            name: database.name,
98            owner_id: database.owner_id,
99            privileges: database.privileges.into_all_values().collect(),
100        }
101    }
102}
103
104impl From<durable::Database> for Database {
105    fn from(
106        durable::Database {
107            id,
108            oid,
109            name,
110            owner_id,
111            privileges,
112        }: durable::Database,
113    ) -> Database {
114        Database {
115            id,
116            oid,
117            schemas_by_id: BTreeMap::new(),
118            schemas_by_name: BTreeMap::new(),
119            name,
120            owner_id,
121            privileges: PrivilegeMap::from_mz_acl_items(privileges),
122        }
123    }
124}
125
126impl UpdateFrom<durable::Database> for Database {
127    fn update_from(
128        &mut self,
129        durable::Database {
130            id,
131            oid,
132            name,
133            owner_id,
134            privileges,
135        }: durable::Database,
136    ) {
137        self.id = id;
138        self.oid = oid;
139        self.name = name;
140        self.owner_id = owner_id;
141        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
142    }
143}
144
145#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
146pub struct Schema {
147    pub name: QualifiedSchemaName,
148    pub id: SchemaSpecifier,
149    pub oid: u32,
150    pub items: BTreeMap<String, CatalogItemId>,
151    pub functions: BTreeMap<String, CatalogItemId>,
152    pub types: BTreeMap<String, CatalogItemId>,
153    pub owner_id: RoleId,
154    pub privileges: PrivilegeMap,
155}
156
157impl From<Schema> for durable::Schema {
158    fn from(schema: Schema) -> durable::Schema {
159        durable::Schema {
160            id: schema.id.into(),
161            oid: schema.oid,
162            name: schema.name.schema,
163            database_id: schema.name.database.id(),
164            owner_id: schema.owner_id,
165            privileges: schema.privileges.into_all_values().collect(),
166        }
167    }
168}
169
170impl From<durable::Schema> for Schema {
171    fn from(
172        durable::Schema {
173            id,
174            oid,
175            name,
176            database_id,
177            owner_id,
178            privileges,
179        }: durable::Schema,
180    ) -> Schema {
181        Schema {
182            name: QualifiedSchemaName {
183                database: database_id.into(),
184                schema: name,
185            },
186            id: id.into(),
187            oid,
188            items: BTreeMap::new(),
189            functions: BTreeMap::new(),
190            types: BTreeMap::new(),
191            owner_id,
192            privileges: PrivilegeMap::from_mz_acl_items(privileges),
193        }
194    }
195}
196
197impl UpdateFrom<durable::Schema> for Schema {
198    fn update_from(
199        &mut self,
200        durable::Schema {
201            id,
202            oid,
203            name,
204            database_id,
205            owner_id,
206            privileges,
207        }: durable::Schema,
208    ) {
209        self.name = QualifiedSchemaName {
210            database: database_id.into(),
211            schema: name,
212        };
213        self.id = id.into();
214        self.oid = oid;
215        self.owner_id = owner_id;
216        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
217    }
218}
219
220#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
221pub struct Role {
222    pub name: String,
223    pub id: RoleId,
224    pub oid: u32,
225    pub attributes: RoleAttributes,
226    pub membership: RoleMembership,
227    pub vars: RoleVars,
228}
229
230impl Role {
231    pub fn is_user(&self) -> bool {
232        self.id.is_user()
233    }
234
235    pub fn vars<'a>(&'a self) -> impl Iterator<Item = (&'a str, &'a OwnedVarInput)> {
236        self.vars.map.iter().map(|(name, val)| (name.as_str(), val))
237    }
238}
239
240impl From<Role> for durable::Role {
241    fn from(role: Role) -> durable::Role {
242        durable::Role {
243            id: role.id,
244            oid: role.oid,
245            name: role.name,
246            attributes: role.attributes,
247            membership: role.membership,
248            vars: role.vars,
249        }
250    }
251}
252
253impl From<durable::Role> for Role {
254    fn from(
255        durable::Role {
256            id,
257            oid,
258            name,
259            attributes,
260            membership,
261            vars,
262        }: durable::Role,
263    ) -> Self {
264        Role {
265            name,
266            id,
267            oid,
268            attributes,
269            membership,
270            vars,
271        }
272    }
273}
274
275impl UpdateFrom<durable::Role> for Role {
276    fn update_from(
277        &mut self,
278        durable::Role {
279            id,
280            oid,
281            name,
282            attributes,
283            membership,
284            vars,
285        }: durable::Role,
286    ) {
287        self.id = id;
288        self.oid = oid;
289        self.name = name;
290        self.attributes = attributes;
291        self.membership = membership;
292        self.vars = vars;
293    }
294}
295
296#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
297pub struct RoleAuth {
298    pub role_id: RoleId,
299    pub password_hash: Option<String>,
300    pub updated_at: u64,
301}
302
303impl From<RoleAuth> for durable::RoleAuth {
304    fn from(role_auth: RoleAuth) -> durable::RoleAuth {
305        durable::RoleAuth {
306            role_id: role_auth.role_id,
307            password_hash: role_auth.password_hash,
308            updated_at: role_auth.updated_at,
309        }
310    }
311}
312
313impl From<durable::RoleAuth> for RoleAuth {
314    fn from(
315        durable::RoleAuth {
316            role_id,
317            password_hash,
318            updated_at,
319        }: durable::RoleAuth,
320    ) -> RoleAuth {
321        RoleAuth {
322            role_id,
323            password_hash,
324            updated_at,
325        }
326    }
327}
328
329impl UpdateFrom<durable::RoleAuth> for RoleAuth {
330    fn update_from(&mut self, from: durable::RoleAuth) {
331        self.role_id = from.role_id;
332        self.password_hash = from.password_hash;
333    }
334}
335
336#[derive(Debug, Serialize, Clone, PartialEq)]
337pub struct Cluster {
338    pub name: String,
339    pub id: ClusterId,
340    pub config: ClusterConfig,
341    #[serde(skip)]
342    pub log_indexes: BTreeMap<LogVariant, GlobalId>,
343    /// Objects bound to this cluster. Does not include introspection source
344    /// indexes.
345    pub bound_objects: BTreeSet<CatalogItemId>,
346    pub replica_id_by_name_: BTreeMap<String, ReplicaId>,
347    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
348    pub replicas_by_id_: BTreeMap<ReplicaId, ClusterReplica>,
349    pub owner_id: RoleId,
350    pub privileges: PrivilegeMap,
351}
352
353impl Cluster {
354    /// The role of the cluster. Currently used to set alert severity.
355    pub fn role(&self) -> ClusterRole {
356        // NOTE - These roles power monitoring systems. Do not change
357        // them without talking to the cloud or observability groups.
358        if self.name == MZ_SYSTEM_CLUSTER.name {
359            ClusterRole::SystemCritical
360        } else if self.name == MZ_CATALOG_SERVER_CLUSTER.name {
361            ClusterRole::System
362        } else {
363            ClusterRole::User
364        }
365    }
366
367    /// Returns `true` if the cluster is a managed cluster.
368    pub fn is_managed(&self) -> bool {
369        matches!(self.config.variant, ClusterVariant::Managed { .. })
370    }
371
372    /// Lists the user replicas, which are those that do not have the internal flag set.
373    pub fn user_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
374        self.replicas().filter(|r| !r.config.location.internal())
375    }
376
377    /// Lists all replicas in the cluster
378    pub fn replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
379        self.replicas_by_id_.values()
380    }
381
382    /// Lookup a replica by ID.
383    pub fn replica(&self, replica_id: ReplicaId) -> Option<&ClusterReplica> {
384        self.replicas_by_id_.get(&replica_id)
385    }
386
387    /// Lookup a replica ID by name.
388    pub fn replica_id(&self, name: &str) -> Option<ReplicaId> {
389        self.replica_id_by_name_.get(name).copied()
390    }
391
392    /// Returns the availability zones of this cluster, if they exist.
393    pub fn availability_zones(&self) -> Option<&[String]> {
394        match &self.config.variant {
395            ClusterVariant::Managed(managed) => Some(&managed.availability_zones),
396            ClusterVariant::Unmanaged => None,
397        }
398    }
399
400    pub fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
401        let name = self.name.clone();
402        let variant = match &self.config.variant {
403            ClusterVariant::Managed(ClusterVariantManaged {
404                size,
405                availability_zones,
406                logging,
407                replication_factor,
408                optimizer_feature_overrides,
409                schedule,
410            }) => {
411                let introspection = match logging {
412                    ReplicaLogging {
413                        log_logging,
414                        interval: Some(interval),
415                    } => Some(ComputeReplicaIntrospectionConfig {
416                        debugging: *log_logging,
417                        interval: interval.clone(),
418                    }),
419                    ReplicaLogging {
420                        log_logging: _,
421                        interval: None,
422                    } => None,
423                };
424                let compute = ComputeReplicaConfig { introspection };
425                CreateClusterVariant::Managed(CreateClusterManagedPlan {
426                    replication_factor: replication_factor.clone(),
427                    size: size.clone(),
428                    availability_zones: availability_zones.clone(),
429                    compute,
430                    optimizer_feature_overrides: optimizer_feature_overrides.clone(),
431                    schedule: schedule.clone(),
432                })
433            }
434            ClusterVariant::Unmanaged => {
435                // Unmanaged clusters are deprecated, so hopefully we can remove
436                // them before we have to implement this.
437                return Err(PlanError::Unsupported {
438                    feature: "SHOW CREATE for unmanaged clusters".to_string(),
439                    discussion_no: None,
440                });
441            }
442        };
443        let workload_class = self.config.workload_class.clone();
444        Ok(CreateClusterPlan {
445            name,
446            variant,
447            workload_class,
448        })
449    }
450}
451
452impl From<Cluster> for durable::Cluster {
453    fn from(cluster: Cluster) -> durable::Cluster {
454        durable::Cluster {
455            id: cluster.id,
456            name: cluster.name,
457            owner_id: cluster.owner_id,
458            privileges: cluster.privileges.into_all_values().collect(),
459            config: cluster.config.into(),
460        }
461    }
462}
463
464impl From<durable::Cluster> for Cluster {
465    fn from(
466        durable::Cluster {
467            id,
468            name,
469            owner_id,
470            privileges,
471            config,
472        }: durable::Cluster,
473    ) -> Self {
474        Cluster {
475            name: name.clone(),
476            id,
477            bound_objects: BTreeSet::new(),
478            log_indexes: BTreeMap::new(),
479            replica_id_by_name_: BTreeMap::new(),
480            replicas_by_id_: BTreeMap::new(),
481            owner_id,
482            privileges: PrivilegeMap::from_mz_acl_items(privileges),
483            config: config.into(),
484        }
485    }
486}
487
488impl UpdateFrom<durable::Cluster> for Cluster {
489    fn update_from(
490        &mut self,
491        durable::Cluster {
492            id,
493            name,
494            owner_id,
495            privileges,
496            config,
497        }: durable::Cluster,
498    ) {
499        self.id = id;
500        self.name = name;
501        self.owner_id = owner_id;
502        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
503        self.config = config.into();
504    }
505}
506
507#[derive(Debug, Serialize, Clone, PartialEq)]
508pub struct ClusterReplica {
509    pub name: String,
510    pub cluster_id: ClusterId,
511    pub replica_id: ReplicaId,
512    pub config: ReplicaConfig,
513    pub owner_id: RoleId,
514}
515
516impl From<ClusterReplica> for durable::ClusterReplica {
517    fn from(replica: ClusterReplica) -> durable::ClusterReplica {
518        durable::ClusterReplica {
519            cluster_id: replica.cluster_id,
520            replica_id: replica.replica_id,
521            name: replica.name,
522            config: replica.config.into(),
523            owner_id: replica.owner_id,
524        }
525    }
526}
527
528#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
529pub struct ClusterReplicaProcessStatus {
530    pub status: ClusterStatus,
531    pub time: DateTime<Utc>,
532}
533
534#[derive(Debug, Serialize, Clone, PartialEq)]
535pub struct SourceReferences {
536    pub updated_at: u64,
537    pub references: Vec<SourceReference>,
538}
539
540#[derive(Debug, Serialize, Clone, PartialEq)]
541pub struct SourceReference {
542    pub name: String,
543    pub namespace: Option<String>,
544    pub columns: Vec<String>,
545}
546
547impl From<SourceReference> for durable::SourceReference {
548    fn from(source_reference: SourceReference) -> durable::SourceReference {
549        durable::SourceReference {
550            name: source_reference.name,
551            namespace: source_reference.namespace,
552            columns: source_reference.columns,
553        }
554    }
555}
556
557impl SourceReferences {
558    pub fn to_durable(self, source_id: CatalogItemId) -> durable::SourceReferences {
559        durable::SourceReferences {
560            source_id,
561            updated_at: self.updated_at,
562            references: self.references.into_iter().map(Into::into).collect(),
563        }
564    }
565}
566
567impl From<durable::SourceReference> for SourceReference {
568    fn from(source_reference: durable::SourceReference) -> SourceReference {
569        SourceReference {
570            name: source_reference.name,
571            namespace: source_reference.namespace,
572            columns: source_reference.columns,
573        }
574    }
575}
576
577impl From<durable::SourceReferences> for SourceReferences {
578    fn from(source_references: durable::SourceReferences) -> SourceReferences {
579        SourceReferences {
580            updated_at: source_references.updated_at,
581            references: source_references
582                .references
583                .into_iter()
584                .map(|source_reference| source_reference.into())
585                .collect(),
586        }
587    }
588}
589
590impl From<mz_sql::plan::SourceReference> for SourceReference {
591    fn from(source_reference: mz_sql::plan::SourceReference) -> SourceReference {
592        SourceReference {
593            name: source_reference.name,
594            namespace: source_reference.namespace,
595            columns: source_reference.columns,
596        }
597    }
598}
599
600impl From<mz_sql::plan::SourceReferences> for SourceReferences {
601    fn from(source_references: mz_sql::plan::SourceReferences) -> SourceReferences {
602        SourceReferences {
603            updated_at: source_references.updated_at,
604            references: source_references
605                .references
606                .into_iter()
607                .map(|source_reference| source_reference.into())
608                .collect(),
609        }
610    }
611}
612
613impl From<SourceReferences> for mz_sql::plan::SourceReferences {
614    fn from(source_references: SourceReferences) -> mz_sql::plan::SourceReferences {
615        mz_sql::plan::SourceReferences {
616            updated_at: source_references.updated_at,
617            references: source_references
618                .references
619                .into_iter()
620                .map(|source_reference| source_reference.into())
621                .collect(),
622        }
623    }
624}
625
626impl From<SourceReference> for mz_sql::plan::SourceReference {
627    fn from(source_reference: SourceReference) -> mz_sql::plan::SourceReference {
628        mz_sql::plan::SourceReference {
629            name: source_reference.name,
630            namespace: source_reference.namespace,
631            columns: source_reference.columns,
632        }
633    }
634}
635
636#[derive(Clone, Debug, Serialize)]
637pub struct CatalogEntry {
638    pub item: CatalogItem,
639    #[serde(skip)]
640    pub referenced_by: Vec<CatalogItemId>,
641    // TODO(database-issues#7922)––this should have an invariant tied to it that all
642    // dependents (i.e. entries in this field) have IDs greater than this
643    // entry's ID.
644    #[serde(skip)]
645    pub used_by: Vec<CatalogItemId>,
646    pub id: CatalogItemId,
647    pub oid: u32,
648    pub name: QualifiedItemName,
649    pub owner_id: RoleId,
650    pub privileges: PrivilegeMap,
651}
652
653/// A [`CatalogEntry`] that is associated with a specific "collection" of data.
654/// A single item in the catalog may be associated with multiple "collections".
655///
656/// Here "collection" generally means a pTVC, e.g. a Persist Shard, an Index, a
657/// currently running dataflow, etc.
658///
659/// Items in the Catalog have a stable name -> ID mapping, in other words for
660/// the entire lifetime of an object its [`CatalogItemId`] will _never_ change.
661/// Similarly, we need to maintain a stable mapping from [`GlobalId`] to pTVC.
662/// This presents a challenge when `ALTER`-ing an object, e.g. adding columns
663/// to a table. We can't just change the schema of the underlying Persist Shard
664/// because that would be rebinding the [`GlobalId`] of the pTVC. Instead we
665/// allocate a new [`GlobalId`] to refer to the new version of the table, and
666/// then the [`CatalogEntry`] tracks the [`GlobalId`] for each version.
667///
668/// TODO(ct): Add a note here if we end up using this for associating continual
669/// tasks with a single catalog item.
670#[derive(Clone, Debug)]
671pub struct CatalogCollectionEntry {
672    pub entry: CatalogEntry,
673    pub version: RelationVersionSelector,
674}
675
676impl CatalogCollectionEntry {
677    pub fn desc(&self, name: &FullItemName) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
678        self.item().desc(name, self.version)
679    }
680
681    pub fn desc_opt(&self) -> Option<Cow<'_, RelationDesc>> {
682        self.item().desc_opt(self.version)
683    }
684}
685
686impl mz_sql::catalog::CatalogCollectionItem for CatalogCollectionEntry {
687    fn desc(&self, name: &FullItemName) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
688        CatalogCollectionEntry::desc(self, name)
689    }
690
691    fn global_id(&self) -> GlobalId {
692        self.entry
693            .item()
694            .global_id_for_version(self.version)
695            .expect("catalog corruption, missing version!")
696    }
697}
698
699impl Deref for CatalogCollectionEntry {
700    type Target = CatalogEntry;
701
702    fn deref(&self) -> &CatalogEntry {
703        &self.entry
704    }
705}
706
707impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry {
708    fn name(&self) -> &QualifiedItemName {
709        self.entry.name()
710    }
711
712    fn id(&self) -> CatalogItemId {
713        self.entry.id()
714    }
715
716    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
717        Box::new(self.entry.global_ids())
718    }
719
720    fn oid(&self) -> u32 {
721        self.entry.oid()
722    }
723
724    fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
725        self.entry.func()
726    }
727
728    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
729        self.entry.source_desc()
730    }
731
732    fn connection(
733        &self,
734    ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
735    {
736        mz_sql::catalog::CatalogItem::connection(&self.entry)
737    }
738
739    fn create_sql(&self) -> &str {
740        self.entry.create_sql()
741    }
742
743    fn item_type(&self) -> SqlCatalogItemType {
744        self.entry.item_type()
745    }
746
747    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
748        self.entry.index_details()
749    }
750
751    fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
752        self.entry.writable_table_details()
753    }
754
755    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
756        self.entry.type_details()
757    }
758
759    fn references(&self) -> &ResolvedIds {
760        self.entry.references()
761    }
762
763    fn uses(&self) -> BTreeSet<CatalogItemId> {
764        self.entry.uses()
765    }
766
767    fn referenced_by(&self) -> &[CatalogItemId] {
768        self.entry.referenced_by()
769    }
770
771    fn used_by(&self) -> &[CatalogItemId] {
772        self.entry.used_by()
773    }
774
775    fn subsource_details(
776        &self,
777    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
778        self.entry.subsource_details()
779    }
780
781    fn source_export_details(
782        &self,
783    ) -> Option<(
784        CatalogItemId,
785        &UnresolvedItemName,
786        &SourceExportDetails,
787        &SourceExportDataConfig<ReferencedConnection>,
788    )> {
789        self.entry.source_export_details()
790    }
791
792    fn is_progress_source(&self) -> bool {
793        self.entry.is_progress_source()
794    }
795
796    fn progress_id(&self) -> Option<CatalogItemId> {
797        self.entry.progress_id()
798    }
799
800    fn owner_id(&self) -> RoleId {
801        *self.entry.owner_id()
802    }
803
804    fn privileges(&self) -> &PrivilegeMap {
805        self.entry.privileges()
806    }
807
808    fn cluster_id(&self) -> Option<ClusterId> {
809        self.entry.item().cluster_id()
810    }
811
812    fn at_version(
813        &self,
814        version: RelationVersionSelector,
815    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
816        Box::new(CatalogCollectionEntry {
817            entry: self.entry.clone(),
818            version,
819        })
820    }
821
822    fn latest_version(&self) -> Option<RelationVersion> {
823        self.entry.latest_version()
824    }
825}
826
827#[derive(Debug, Clone, Serialize)]
828pub enum CatalogItem {
829    Table(Table),
830    Source(Source),
831    Log(Log),
832    View(View),
833    MaterializedView(MaterializedView),
834    Sink(Sink),
835    Index(Index),
836    Type(Type),
837    Func(Func),
838    Secret(Secret),
839    Connection(Connection),
840    ContinualTask(ContinualTask),
841}
842
843impl From<CatalogEntry> for durable::Item {
844    fn from(entry: CatalogEntry) -> durable::Item {
845        let (create_sql, global_id, extra_versions) = entry.item.into_serialized();
846        durable::Item {
847            id: entry.id,
848            oid: entry.oid,
849            global_id,
850            schema_id: entry.name.qualifiers.schema_spec.into(),
851            name: entry.name.item,
852            create_sql,
853            owner_id: entry.owner_id,
854            privileges: entry.privileges.into_all_values().collect(),
855            extra_versions,
856        }
857    }
858}
859
860#[derive(Debug, Clone, Serialize)]
861pub struct Table {
862    /// Parse-able SQL that defines this table.
863    pub create_sql: Option<String>,
864    /// [`VersionedRelationDesc`] of this table, derived from the `create_sql`.
865    pub desc: VersionedRelationDesc,
866    /// Versions of this table, and the [`GlobalId`]s that refer to them.
867    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
868    pub collections: BTreeMap<RelationVersion, GlobalId>,
869    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
870    #[serde(skip)]
871    pub conn_id: Option<ConnectionId>,
872    /// Other catalog objects referenced by this table, e.g. custom types.
873    pub resolved_ids: ResolvedIds,
874    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
875    pub custom_logical_compaction_window: Option<CompactionWindow>,
876    /// Whether the table's logical compaction window is controlled by the ['metrics_retention']
877    /// session variable.
878    ///
879    /// ['metrics_retention']: mz_sql::session::vars::METRICS_RETENTION
880    pub is_retained_metrics_object: bool,
881    /// Where data for this table comes from, e.g. `INSERT` statements or an upstream source.
882    pub data_source: TableDataSource,
883}
884
885impl Table {
886    pub fn timeline(&self) -> Timeline {
887        match &self.data_source {
888            // The Coordinator controls insertions for writable tables
889            // (including system tables), so they are realtime.
890            TableDataSource::TableWrites { .. } => Timeline::EpochMilliseconds,
891            TableDataSource::DataSource { timeline, .. } => timeline.clone(),
892        }
893    }
894
895    /// Returns all of the [`GlobalId`]s that this [`Table`] can be referenced by.
896    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
897        self.collections.values().copied()
898    }
899
900    /// Returns the latest [`GlobalId`] for this [`Table`] which should be used for writes.
901    pub fn global_id_writes(&self) -> GlobalId {
902        *self
903            .collections
904            .last_key_value()
905            .expect("at least one version of a table")
906            .1
907    }
908
909    /// Returns all of the collections and their [`RelationDesc`]s associated with this [`Table`].
910    pub fn collection_descs(
911        &self,
912    ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
913        self.collections.iter().map(|(version, gid)| {
914            let desc = self
915                .desc
916                .at_version(RelationVersionSelector::Specific(*version));
917            (*gid, *version, desc)
918        })
919    }
920
921    /// Returns the [`RelationDesc`] for a specific [`GlobalId`].
922    pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
923        let (version, _gid) = self
924            .collections
925            .iter()
926            .find(|(_version, gid)| *gid == id)
927            .expect("GlobalId to exist");
928        self.desc
929            .at_version(RelationVersionSelector::Specific(*version))
930    }
931}
932
933#[derive(Clone, Debug, Serialize)]
934pub enum TableDataSource {
935    /// The table owns data created via INSERT/UPDATE/DELETE statements.
936    TableWrites {
937        #[serde(skip)]
938        defaults: Vec<Expr<Aug>>,
939    },
940
941    /// The table receives its data from the identified `DataSourceDesc`.
942    /// This table type does not support INSERT/UPDATE/DELETE statements.
943    DataSource {
944        desc: DataSourceDesc,
945        timeline: Timeline,
946    },
947}
948
949#[derive(Debug, Clone, Serialize)]
950pub enum DataSourceDesc {
951    /// Receives data from an external system
952    Ingestion {
953        desc: SourceDesc<ReferencedConnection>,
954        cluster_id: ClusterId,
955    },
956    /// Receives data from an external system
957    OldSyntaxIngestion {
958        desc: SourceDesc<ReferencedConnection>,
959        cluster_id: ClusterId,
960        // If we're dealing with an old syntax ingestion the progress id will be some other collection
961        // and the ingestion itself will have the data from an external reference
962        progress_subsource: CatalogItemId,
963        data_config: SourceExportDataConfig<ReferencedConnection>,
964        details: SourceExportDetails,
965    },
966    /// This source receives its data from the identified ingestion,
967    /// specifically the output identified by `external_reference`.
968    /// N.B. that `external_reference` should not be used to identify
969    /// anything downstream of purification, as the purification process
970    /// encodes source-specific identifiers into the `details` struct.
971    /// The `external_reference` field is only used here for displaying
972    /// human-readable names in system tables.
973    IngestionExport {
974        ingestion_id: CatalogItemId,
975        external_reference: UnresolvedItemName,
976        details: SourceExportDetails,
977        data_config: SourceExportDataConfig<ReferencedConnection>,
978    },
979    /// Receives introspection data from an internal system
980    Introspection(IntrospectionType),
981    /// Receives data from the source's reclocking/remapping operations.
982    Progress,
983    /// Receives data from HTTP requests.
984    Webhook {
985        /// Optional components used to validation a webhook request.
986        validate_using: Option<WebhookValidation>,
987        /// Describes how we deserialize the body of a webhook request.
988        body_format: WebhookBodyFormat,
989        /// Describes whether or not to include headers and how to map them.
990        headers: WebhookHeaders,
991        /// The cluster which this source is associated with.
992        cluster_id: ClusterId,
993    },
994}
995
996impl DataSourceDesc {
997    /// The key and value formats of the data source.
998    pub fn formats(&self) -> (Option<&str>, Option<&str>) {
999        match &self {
1000            DataSourceDesc::Ingestion { .. } => (None, None),
1001            DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1002                match &data_config.encoding.as_ref() {
1003                    Some(encoding) => match &encoding.key {
1004                        Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1005                        None => (None, Some(encoding.value.type_())),
1006                    },
1007                    None => (None, None),
1008                }
1009            }
1010            DataSourceDesc::IngestionExport { data_config, .. } => match &data_config.encoding {
1011                Some(encoding) => match &encoding.key {
1012                    Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1013                    None => (None, Some(encoding.value.type_())),
1014                },
1015                None => (None, None),
1016            },
1017            DataSourceDesc::Introspection(_)
1018            | DataSourceDesc::Webhook { .. }
1019            | DataSourceDesc::Progress => (None, None),
1020        }
1021    }
1022
1023    /// Envelope of the data source.
1024    pub fn envelope(&self) -> Option<&str> {
1025        // Note how "none"/"append-only" is different from `None`. Source
1026        // sources don't have an envelope (internal logs, for example), while
1027        // other sources have an envelope that we call the "NONE"-envelope.
1028
1029        fn envelope_string(envelope: &SourceEnvelope) -> &str {
1030            match envelope {
1031                SourceEnvelope::None(_) => "none",
1032                SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
1033                    mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert",
1034                    mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
1035                        // NOTE(aljoscha): Should we somehow mark that this is
1036                        // using upsert internally? See note above about
1037                        // DEBEZIUM.
1038                        "debezium"
1039                    }
1040                    mz_storage_types::sources::envelope::UpsertStyle::ValueErrInline { .. } => {
1041                        "upsert-value-err-inline"
1042                    }
1043                },
1044                SourceEnvelope::CdcV2 => {
1045                    // TODO(aljoscha): Should we even report this? It's
1046                    // currently not exposed.
1047                    "materialize"
1048                }
1049            }
1050        }
1051
1052        match self {
1053            // NOTE(aljoscha): We could move the block for ingestions into
1054            // `SourceEnvelope` itself, but that one feels more like an internal
1055            // thing and adapter should own how we represent envelopes as a
1056            // string? It would not be hard to convince me otherwise, though.
1057            DataSourceDesc::Ingestion { .. } => None,
1058            DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1059                Some(envelope_string(&data_config.envelope))
1060            }
1061            DataSourceDesc::IngestionExport { data_config, .. } => {
1062                Some(envelope_string(&data_config.envelope))
1063            }
1064            DataSourceDesc::Introspection(_)
1065            | DataSourceDesc::Webhook { .. }
1066            | DataSourceDesc::Progress => None,
1067        }
1068    }
1069}
1070
1071#[derive(Debug, Clone, Serialize)]
1072pub struct Source {
1073    /// Parse-able SQL that defines this table.
1074    pub create_sql: Option<String>,
1075    /// [`GlobalId`] used to reference this source from outside the catalog.
1076    pub global_id: GlobalId,
1077    // TODO: Unskip: currently blocked on some inner BTreeMap<X, _> problems.
1078    #[serde(skip)]
1079    pub data_source: DataSourceDesc,
1080    /// [`RelationDesc`] of this source, derived from the `create_sql`.
1081    pub desc: RelationDesc,
1082    /// The timeline this source exists on.
1083    pub timeline: Timeline,
1084    /// Other catalog objects referenced by this table, e.g. custom types.
1085    pub resolved_ids: ResolvedIds,
1086    /// This value is ignored for subsources, i.e. for
1087    /// [`DataSourceDesc::IngestionExport`]. Instead, it uses the primary
1088    /// sources logical compaction window.
1089    pub custom_logical_compaction_window: Option<CompactionWindow>,
1090    /// Whether the source's logical compaction window is controlled by
1091    /// METRICS_RETENTION
1092    pub is_retained_metrics_object: bool,
1093}
1094
1095impl Source {
1096    /// Creates a new `Source`.
1097    ///
1098    /// # Panics
1099    /// - If an ingestion-based plan is not given a cluster_id.
1100    /// - If a non-ingestion-based source has a defined cluster config in its plan.
1101    /// - If a non-ingestion-based source is given a cluster_id.
1102    pub fn new(
1103        plan: CreateSourcePlan,
1104        global_id: GlobalId,
1105        resolved_ids: ResolvedIds,
1106        custom_logical_compaction_window: Option<CompactionWindow>,
1107        is_retained_metrics_object: bool,
1108    ) -> Source {
1109        Source {
1110            create_sql: Some(plan.source.create_sql),
1111            data_source: match plan.source.data_source {
1112                mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1113                    desc,
1114                    cluster_id: plan
1115                        .in_cluster
1116                        .expect("ingestion-based sources must be given a cluster ID"),
1117                },
1118                mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1119                    desc,
1120                    progress_subsource,
1121                    data_config,
1122                    details,
1123                } => DataSourceDesc::OldSyntaxIngestion {
1124                    desc,
1125                    cluster_id: plan
1126                        .in_cluster
1127                        .expect("ingestion-based sources must be given a cluster ID"),
1128                    progress_subsource,
1129                    data_config,
1130                    details,
1131                },
1132                mz_sql::plan::DataSourceDesc::Progress => {
1133                    assert!(
1134                        plan.in_cluster.is_none(),
1135                        "subsources must not have a host config or cluster_id defined"
1136                    );
1137                    DataSourceDesc::Progress
1138                }
1139                mz_sql::plan::DataSourceDesc::IngestionExport {
1140                    ingestion_id,
1141                    external_reference,
1142                    details,
1143                    data_config,
1144                } => {
1145                    assert!(
1146                        plan.in_cluster.is_none(),
1147                        "subsources must not have a host config or cluster_id defined"
1148                    );
1149                    DataSourceDesc::IngestionExport {
1150                        ingestion_id,
1151                        external_reference,
1152                        details,
1153                        data_config,
1154                    }
1155                }
1156                mz_sql::plan::DataSourceDesc::Webhook {
1157                    validate_using,
1158                    body_format,
1159                    headers,
1160                    cluster_id,
1161                } => {
1162                    mz_ore::soft_assert_or_log!(
1163                        cluster_id.is_none(),
1164                        "cluster_id set at Source level for Webhooks"
1165                    );
1166                    DataSourceDesc::Webhook {
1167                        validate_using,
1168                        body_format,
1169                        headers,
1170                        cluster_id: plan
1171                            .in_cluster
1172                            .expect("webhook sources must be given a cluster ID"),
1173                    }
1174                }
1175            },
1176            desc: plan.source.desc,
1177            global_id,
1178            timeline: plan.timeline,
1179            resolved_ids,
1180            custom_logical_compaction_window: plan
1181                .source
1182                .compaction_window
1183                .or(custom_logical_compaction_window),
1184            is_retained_metrics_object,
1185        }
1186    }
1187
1188    /// Type of the source.
1189    pub fn source_type(&self) -> &str {
1190        match &self.data_source {
1191            DataSourceDesc::Ingestion { desc, .. }
1192            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.name(),
1193            DataSourceDesc::Progress => "progress",
1194            DataSourceDesc::IngestionExport { .. } => "subsource",
1195            DataSourceDesc::Introspection(_) => "source",
1196            DataSourceDesc::Webhook { .. } => "webhook",
1197        }
1198    }
1199
1200    /// Connection ID of the source, if one exists.
1201    pub fn connection_id(&self) -> Option<CatalogItemId> {
1202        match &self.data_source {
1203            DataSourceDesc::Ingestion { desc, .. }
1204            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.connection_id(),
1205            DataSourceDesc::IngestionExport { .. }
1206            | DataSourceDesc::Introspection(_)
1207            | DataSourceDesc::Webhook { .. }
1208            | DataSourceDesc::Progress => None,
1209        }
1210    }
1211
1212    /// The single [`GlobalId`] that refers to this Source.
1213    pub fn global_id(&self) -> GlobalId {
1214        self.global_id
1215    }
1216
1217    /// The expensive resource that each source consumes is persist shards. To
1218    /// prevent abuse, we want to prevent users from creating sources that use an
1219    /// unbounded number of persist shards. But we also don't want to count
1220    /// persist shards that are mandated by teh system (e.g., the progress
1221    /// shard) so that future versions of Materialize can introduce additional
1222    /// per-source shards (e.g., a per-source status shard) without impacting
1223    /// the limit calculation.
1224    pub fn user_controllable_persist_shard_count(&self) -> i64 {
1225        match &self.data_source {
1226            DataSourceDesc::Ingestion { .. } => 0,
1227            DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1228                match &desc.connection {
1229                    // These multi-output sources do not use their primary
1230                    // source's data shard, so we don't include it in accounting
1231                    // for users.
1232                    GenericSourceConnection::Postgres(_)
1233                    | GenericSourceConnection::MySql(_)
1234                    | GenericSourceConnection::SqlServer(_) => 0,
1235                    GenericSourceConnection::LoadGenerator(lg) => match lg.load_generator {
1236                        // Load generators that output data in their primary shard
1237                        LoadGenerator::Clock
1238                        | LoadGenerator::Counter { .. }
1239                        | LoadGenerator::Datums
1240                        | LoadGenerator::KeyValue(_) => 1,
1241                        LoadGenerator::Auction
1242                        | LoadGenerator::Marketing
1243                        | LoadGenerator::Tpch { .. } => 0,
1244                    },
1245                    GenericSourceConnection::Kafka(_) => 1,
1246                }
1247            }
1248            //  DataSourceDesc::IngestionExport represents a subsource, which
1249            //  use a data shard.
1250            DataSourceDesc::IngestionExport { .. } => 1,
1251            DataSourceDesc::Webhook { .. } => 1,
1252            // Introspection and progress subsources are not under the user's control, so shouldn't
1253            // count toward their quota.
1254            DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => 0,
1255        }
1256    }
1257}
1258
1259#[derive(Debug, Clone, Serialize)]
1260pub struct Log {
1261    /// The category of data this log stores.
1262    pub variant: LogVariant,
1263    /// [`GlobalId`] used to reference this log from outside the catalog.
1264    pub global_id: GlobalId,
1265}
1266
1267impl Log {
1268    /// The single [`GlobalId`] that refers to this Log.
1269    pub fn global_id(&self) -> GlobalId {
1270        self.global_id
1271    }
1272}
1273
1274#[derive(Debug, Clone, Serialize)]
1275pub struct Sink {
1276    /// Parse-able SQL that defines this sink.
1277    pub create_sql: String,
1278    /// [`GlobalId`] used to reference this sink from outside the catalog, e.g storage.
1279    pub global_id: GlobalId,
1280    /// Collection we read into this sink.
1281    pub from: GlobalId,
1282    /// Connection to the external service we're sinking into, e.g. Kafka.
1283    pub connection: StorageSinkConnection<ReferencedConnection>,
1284    /// Envelope we use to sink into the external system.
1285    ///
1286    /// TODO(guswynn): this probably should just be in the `connection`.
1287    pub envelope: SinkEnvelope,
1288    /// Emit an initial snapshot into the sink.
1289    pub with_snapshot: bool,
1290    /// Used to fence other writes into this sink as we evolve the upstream materialized view.
1291    pub version: u64,
1292    /// Other catalog objects this sink references.
1293    pub resolved_ids: ResolvedIds,
1294    /// Cluster this sink runs on.
1295    pub cluster_id: ClusterId,
1296}
1297
1298impl Sink {
1299    pub fn sink_type(&self) -> &str {
1300        self.connection.name()
1301    }
1302
1303    /// Envelope of the sink.
1304    pub fn envelope(&self) -> Option<&str> {
1305        match &self.envelope {
1306            SinkEnvelope::Debezium => Some("debezium"),
1307            SinkEnvelope::Upsert => Some("upsert"),
1308        }
1309    }
1310
1311    /// Output a combined format string of the sink. For legacy reasons
1312    /// if the key-format is none or the key & value formats are
1313    /// both the same (either avro or json), we return the value format name,
1314    /// otherwise we return a composite name.
1315    pub fn combined_format(&self) -> Option<Cow<'_, str>> {
1316        match &self.connection {
1317            StorageSinkConnection::Kafka(connection) => Some(connection.format.get_format_name()),
1318            _ => None,
1319        }
1320    }
1321
1322    /// Output distinct key_format and value_format of the sink.
1323    pub fn formats(&self) -> Option<(Option<&str>, &str)> {
1324        match &self.connection {
1325            StorageSinkConnection::Kafka(connection) => {
1326                let key_format = connection
1327                    .format
1328                    .key_format
1329                    .as_ref()
1330                    .map(|f| f.get_format_name());
1331                let value_format = connection.format.value_format.get_format_name();
1332                Some((key_format, value_format))
1333            }
1334            _ => None,
1335        }
1336    }
1337
1338    pub fn connection_id(&self) -> Option<CatalogItemId> {
1339        self.connection.connection_id()
1340    }
1341
1342    /// The single [`GlobalId`] that this Sink can be referenced by.
1343    pub fn global_id(&self) -> GlobalId {
1344        self.global_id
1345    }
1346}
1347
1348#[derive(Debug, Clone, Serialize)]
1349pub struct View {
1350    /// Parse-able SQL that defines this view.
1351    pub create_sql: String,
1352    /// [`GlobalId`] used to reference this view from outside the catalog, e.g. compute.
1353    pub global_id: GlobalId,
1354    /// Unoptimized high-level expression from parsing the `create_sql`.
1355    pub raw_expr: Arc<HirRelationExpr>,
1356    /// Optimized mid-level expression from (locally) optimizing the `raw_expr`.
1357    pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1358    /// Columns of this view.
1359    pub desc: RelationDesc,
1360    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
1361    pub conn_id: Option<ConnectionId>,
1362    /// Other catalog objects that are referenced by this view, determined at name resolution.
1363    pub resolved_ids: ResolvedIds,
1364    /// All of the catalog objects that are referenced by this view.
1365    pub dependencies: DependencyIds,
1366}
1367
1368impl View {
1369    /// The single [`GlobalId`] this [`View`] can be referenced by.
1370    pub fn global_id(&self) -> GlobalId {
1371        self.global_id
1372    }
1373}
1374
1375#[derive(Debug, Clone, Serialize)]
1376pub struct MaterializedView {
1377    /// Parse-able SQL that defines this materialized view.
1378    pub create_sql: String,
1379    /// Versions of this materialized view, and the [`GlobalId`]s that refer to them.
1380    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
1381    pub collections: BTreeMap<RelationVersion, GlobalId>,
1382    /// Raw high-level expression from planning, derived from the `create_sql`.
1383    pub raw_expr: Arc<HirRelationExpr>,
1384    /// Optimized mid-level expression, derived from the `raw_expr`.
1385    pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1386    /// [`VersionedRelationDesc`] of this materialized view, derived from the `create_sql`.
1387    pub desc: VersionedRelationDesc,
1388    /// Other catalog items that this materialized view references, determined at name resolution.
1389    pub resolved_ids: ResolvedIds,
1390    /// All of the catalog objects that are referenced by this view.
1391    pub dependencies: DependencyIds,
1392    /// Cluster that this materialized view runs on.
1393    pub cluster_id: ClusterId,
1394    /// Column indexes that we assert are not `NULL`.
1395    ///
1396    /// TODO(parkmycar): Switch this to use the `ColumnIdx` type.
1397    pub non_null_assertions: Vec<usize>,
1398    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
1399    pub custom_logical_compaction_window: Option<CompactionWindow>,
1400    /// Schedule to refresh this materialized view, e.g. set via `REFRESH EVERY` option.
1401    pub refresh_schedule: Option<RefreshSchedule>,
1402    /// The initial `as_of` of the storage collection associated with the materialized view.
1403    ///
1404    /// Note: This doesn't change upon restarts.
1405    /// (The dataflow's initial `as_of` can be different.)
1406    pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1407}
1408
1409impl MaterializedView {
1410    /// Returns all [`GlobalId`]s that this [`MaterializedView`] can be referenced by.
1411    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1412        self.collections.values().copied()
1413    }
1414
1415    /// The latest [`GlobalId`] for this [`MaterializedView`] which represents the writing
1416    /// version.
1417    pub fn global_id_writes(&self) -> GlobalId {
1418        *self
1419            .collections
1420            .last_key_value()
1421            .expect("at least one version of a materialized view")
1422            .1
1423    }
1424
1425    /// Returns all collections and their [`RelationDesc`]s associated with this [`MaterializedView`].
1426    pub fn collection_descs(
1427        &self,
1428    ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
1429        self.collections.iter().map(|(version, gid)| {
1430            let desc = self
1431                .desc
1432                .at_version(RelationVersionSelector::Specific(*version));
1433            (*gid, *version, desc)
1434        })
1435    }
1436
1437    /// Returns the [`RelationDesc`] for a specific [`GlobalId`].
1438    pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
1439        let (version, _gid) = self
1440            .collections
1441            .iter()
1442            .find(|(_version, gid)| *gid == id)
1443            .expect("GlobalId to exist");
1444        self.desc
1445            .at_version(RelationVersionSelector::Specific(*version))
1446    }
1447}
1448
1449#[derive(Debug, Clone, Serialize)]
1450pub struct Index {
1451    /// Parse-able SQL that defines this table.
1452    pub create_sql: String,
1453    /// [`GlobalId`] used to reference this index from outside the catalog, e.g. compute.
1454    pub global_id: GlobalId,
1455    /// The [`GlobalId`] this Index is on.
1456    pub on: GlobalId,
1457    /// Keys of the index.
1458    pub keys: Arc<[MirScalarExpr]>,
1459    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
1460    pub conn_id: Option<ConnectionId>,
1461    /// Other catalog objects referenced by this index, e.g. the object we're indexing.
1462    pub resolved_ids: ResolvedIds,
1463    /// Cluster this index is installed on.
1464    pub cluster_id: ClusterId,
1465    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
1466    pub custom_logical_compaction_window: Option<CompactionWindow>,
1467    /// Whether the table's logical compaction window is controlled by the ['metrics_retention']
1468    /// session variable.
1469    ///
1470    /// ['metrics_retention']: mz_sql::session::vars::METRICS_RETENTION
1471    pub is_retained_metrics_object: bool,
1472}
1473
1474impl Index {
1475    /// The [`GlobalId`] that refers to this Index.
1476    pub fn global_id(&self) -> GlobalId {
1477        self.global_id
1478    }
1479}
1480
1481#[derive(Debug, Clone, Serialize)]
1482pub struct Type {
1483    /// Parse-able SQL that defines this type.
1484    pub create_sql: Option<String>,
1485    /// [`GlobalId`] used to reference this type from outside the catalog.
1486    pub global_id: GlobalId,
1487    #[serde(skip)]
1488    pub details: CatalogTypeDetails<IdReference>,
1489    pub desc: Option<RelationDesc>,
1490    /// Other catalog objects referenced by this type.
1491    pub resolved_ids: ResolvedIds,
1492}
1493
1494#[derive(Debug, Clone, Serialize)]
1495pub struct Func {
1496    /// Static definition of the function.
1497    #[serde(skip)]
1498    pub inner: &'static mz_sql::func::Func,
1499    /// [`GlobalId`] used to reference this function from outside the catalog.
1500    pub global_id: GlobalId,
1501}
1502
1503#[derive(Debug, Clone, Serialize)]
1504pub struct Secret {
1505    /// Parse-able SQL that defines this secret.
1506    pub create_sql: String,
1507    /// [`GlobalId`] used to reference this secret from outside the catalog.
1508    pub global_id: GlobalId,
1509}
1510
1511#[derive(Debug, Clone, Serialize)]
1512pub struct Connection {
1513    /// Parse-able SQL that defines this connection.
1514    pub create_sql: String,
1515    /// [`GlobalId`] used to reference this connection from the storage layer.
1516    pub global_id: GlobalId,
1517    /// The kind of connection.
1518    pub details: ConnectionDetails,
1519    /// Other objects this connection depends on.
1520    pub resolved_ids: ResolvedIds,
1521}
1522
1523impl Connection {
1524    /// The single [`GlobalId`] used to reference this connection.
1525    pub fn global_id(&self) -> GlobalId {
1526        self.global_id
1527    }
1528}
1529
1530#[derive(Debug, Clone, Serialize)]
1531pub struct ContinualTask {
1532    /// Parse-able SQL that defines this continual task.
1533    pub create_sql: String,
1534    /// [`GlobalId`] used to reference this continual task from outside the catalog.
1535    pub global_id: GlobalId,
1536    /// [`GlobalId`] of the collection that we read into this continual task.
1537    pub input_id: GlobalId,
1538    pub with_snapshot: bool,
1539    /// ContinualTasks are self-referential. We make this work by using a
1540    /// placeholder `LocalId` for the CT itself through name resolution and
1541    /// planning. Then we fill in the real `GlobalId` before constructing this
1542    /// catalog item.
1543    pub raw_expr: Arc<HirRelationExpr>,
1544    /// Columns for this continual task.
1545    pub desc: RelationDesc,
1546    /// Other catalog items that this continual task references, determined at name resolution.
1547    pub resolved_ids: ResolvedIds,
1548    /// All of the catalog objects that are referenced by this continual task.
1549    pub dependencies: DependencyIds,
1550    /// Cluster that this continual task runs on.
1551    pub cluster_id: ClusterId,
1552    /// See the comment on [MaterializedView::initial_as_of].
1553    pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1554}
1555
1556impl ContinualTask {
1557    /// The single [`GlobalId`] used to reference this continual task.
1558    pub fn global_id(&self) -> GlobalId {
1559        self.global_id
1560    }
1561}
1562
1563#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
1564pub struct NetworkPolicy {
1565    pub name: String,
1566    pub id: NetworkPolicyId,
1567    pub oid: u32,
1568    pub rules: Vec<NetworkPolicyRule>,
1569    pub owner_id: RoleId,
1570    pub privileges: PrivilegeMap,
1571}
1572
1573impl From<NetworkPolicy> for durable::NetworkPolicy {
1574    fn from(policy: NetworkPolicy) -> durable::NetworkPolicy {
1575        durable::NetworkPolicy {
1576            id: policy.id,
1577            oid: policy.oid,
1578            name: policy.name,
1579            rules: policy.rules,
1580            owner_id: policy.owner_id,
1581            privileges: policy.privileges.into_all_values().collect(),
1582        }
1583    }
1584}
1585
1586impl From<durable::NetworkPolicy> for NetworkPolicy {
1587    fn from(
1588        durable::NetworkPolicy {
1589            id,
1590            oid,
1591            name,
1592            rules,
1593            owner_id,
1594            privileges,
1595        }: durable::NetworkPolicy,
1596    ) -> Self {
1597        NetworkPolicy {
1598            id,
1599            oid,
1600            name,
1601            rules,
1602            owner_id,
1603            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1604        }
1605    }
1606}
1607
1608impl UpdateFrom<durable::NetworkPolicy> for NetworkPolicy {
1609    fn update_from(
1610        &mut self,
1611        durable::NetworkPolicy {
1612            id,
1613            oid,
1614            name,
1615            rules,
1616            owner_id,
1617            privileges,
1618        }: durable::NetworkPolicy,
1619    ) {
1620        self.id = id;
1621        self.oid = oid;
1622        self.name = name;
1623        self.rules = rules;
1624        self.owner_id = owner_id;
1625        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1626    }
1627}
1628
1629impl CatalogItem {
1630    /// Returns a string indicating the type of this catalog entry.
1631    pub fn typ(&self) -> mz_sql::catalog::CatalogItemType {
1632        match self {
1633            CatalogItem::Table(_) => CatalogItemType::Table,
1634            CatalogItem::Source(_) => CatalogItemType::Source,
1635            CatalogItem::Log(_) => CatalogItemType::Source,
1636            CatalogItem::Sink(_) => CatalogItemType::Sink,
1637            CatalogItem::View(_) => CatalogItemType::View,
1638            CatalogItem::MaterializedView(_) => CatalogItemType::MaterializedView,
1639            CatalogItem::Index(_) => CatalogItemType::Index,
1640            CatalogItem::Type(_) => CatalogItemType::Type,
1641            CatalogItem::Func(_) => CatalogItemType::Func,
1642            CatalogItem::Secret(_) => CatalogItemType::Secret,
1643            CatalogItem::Connection(_) => CatalogItemType::Connection,
1644            CatalogItem::ContinualTask(_) => CatalogItemType::ContinualTask,
1645        }
1646    }
1647
1648    /// Returns the [`GlobalId`]s that reference this item, if any.
1649    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1650        let gid = match self {
1651            CatalogItem::Source(source) => source.global_id,
1652            CatalogItem::Log(log) => log.global_id,
1653            CatalogItem::Sink(sink) => sink.global_id,
1654            CatalogItem::View(view) => view.global_id,
1655            CatalogItem::MaterializedView(mv) => {
1656                return itertools::Either::Left(mv.collections.values().copied());
1657            }
1658            CatalogItem::ContinualTask(ct) => ct.global_id,
1659            CatalogItem::Index(index) => index.global_id,
1660            CatalogItem::Func(func) => func.global_id,
1661            CatalogItem::Type(ty) => ty.global_id,
1662            CatalogItem::Secret(secret) => secret.global_id,
1663            CatalogItem::Connection(conn) => conn.global_id,
1664            CatalogItem::Table(table) => {
1665                return itertools::Either::Left(table.collections.values().copied());
1666            }
1667        };
1668        itertools::Either::Right(std::iter::once(gid))
1669    }
1670
1671    /// Returns the most up-to-date [`GlobalId`] for this item.
1672    ///
1673    /// Note: The only type of object that can have multiple [`GlobalId`]s are tables.
1674    pub fn latest_global_id(&self) -> GlobalId {
1675        match self {
1676            CatalogItem::Source(source) => source.global_id,
1677            CatalogItem::Log(log) => log.global_id,
1678            CatalogItem::Sink(sink) => sink.global_id,
1679            CatalogItem::View(view) => view.global_id,
1680            CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
1681            CatalogItem::ContinualTask(ct) => ct.global_id,
1682            CatalogItem::Index(index) => index.global_id,
1683            CatalogItem::Func(func) => func.global_id,
1684            CatalogItem::Type(ty) => ty.global_id,
1685            CatalogItem::Secret(secret) => secret.global_id,
1686            CatalogItem::Connection(conn) => conn.global_id,
1687            CatalogItem::Table(table) => table.global_id_writes(),
1688        }
1689    }
1690
1691    /// Whether this item represents a storage collection.
1692    pub fn is_storage_collection(&self) -> bool {
1693        match self {
1694            CatalogItem::Table(_)
1695            | CatalogItem::Source(_)
1696            | CatalogItem::MaterializedView(_)
1697            | CatalogItem::Sink(_)
1698            | CatalogItem::ContinualTask(_) => true,
1699            CatalogItem::Log(_)
1700            | CatalogItem::View(_)
1701            | CatalogItem::Index(_)
1702            | CatalogItem::Type(_)
1703            | CatalogItem::Func(_)
1704            | CatalogItem::Secret(_)
1705            | CatalogItem::Connection(_) => false,
1706        }
1707    }
1708
1709    pub fn desc(
1710        &self,
1711        name: &FullItemName,
1712        version: RelationVersionSelector,
1713    ) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
1714        self.desc_opt(version)
1715            .ok_or_else(|| SqlCatalogError::InvalidDependency {
1716                name: name.to_string(),
1717                typ: self.typ(),
1718            })
1719    }
1720
1721    pub fn desc_opt(&self, version: RelationVersionSelector) -> Option<Cow<'_, RelationDesc>> {
1722        match &self {
1723            CatalogItem::Source(src) => Some(Cow::Borrowed(&src.desc)),
1724            CatalogItem::Log(log) => Some(Cow::Owned(log.variant.desc())),
1725            CatalogItem::Table(tbl) => Some(Cow::Owned(tbl.desc.at_version(version))),
1726            CatalogItem::View(view) => Some(Cow::Borrowed(&view.desc)),
1727            CatalogItem::MaterializedView(mview) => {
1728                Some(Cow::Owned(mview.desc.at_version(version)))
1729            }
1730            CatalogItem::Type(typ) => typ.desc.as_ref().map(Cow::Borrowed),
1731            CatalogItem::ContinualTask(ct) => Some(Cow::Borrowed(&ct.desc)),
1732            CatalogItem::Func(_)
1733            | CatalogItem::Index(_)
1734            | CatalogItem::Sink(_)
1735            | CatalogItem::Secret(_)
1736            | CatalogItem::Connection(_) => None,
1737        }
1738    }
1739
1740    pub fn func(
1741        &self,
1742        entry: &CatalogEntry,
1743    ) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
1744        match &self {
1745            CatalogItem::Func(func) => Ok(func.inner),
1746            _ => Err(SqlCatalogError::UnexpectedType {
1747                name: entry.name().item.to_string(),
1748                actual_type: entry.item_type(),
1749                expected_type: CatalogItemType::Func,
1750            }),
1751        }
1752    }
1753
1754    pub fn source_desc(
1755        &self,
1756        entry: &CatalogEntry,
1757    ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
1758        match &self {
1759            CatalogItem::Source(source) => match &source.data_source {
1760                DataSourceDesc::Ingestion { desc, .. }
1761                | DataSourceDesc::OldSyntaxIngestion { desc, .. } => Ok(Some(desc)),
1762                DataSourceDesc::IngestionExport { .. }
1763                | DataSourceDesc::Introspection(_)
1764                | DataSourceDesc::Webhook { .. }
1765                | DataSourceDesc::Progress => Ok(None),
1766            },
1767            _ => Err(SqlCatalogError::UnexpectedType {
1768                name: entry.name().item.to_string(),
1769                actual_type: entry.item_type(),
1770                expected_type: CatalogItemType::Source,
1771            }),
1772        }
1773    }
1774
1775    /// Reports whether this catalog entry is a progress source.
1776    pub fn is_progress_source(&self) -> bool {
1777        matches!(
1778            self,
1779            CatalogItem::Source(Source {
1780                data_source: DataSourceDesc::Progress,
1781                ..
1782            })
1783        )
1784    }
1785
1786    /// Collects the identifiers of the objects that were encountered when resolving names in the
1787    /// item's DDL statement.
1788    pub fn references(&self) -> &ResolvedIds {
1789        static EMPTY: LazyLock<ResolvedIds> = LazyLock::new(ResolvedIds::empty);
1790        match self {
1791            CatalogItem::Func(_) => &*EMPTY,
1792            CatalogItem::Index(idx) => &idx.resolved_ids,
1793            CatalogItem::Sink(sink) => &sink.resolved_ids,
1794            CatalogItem::Source(source) => &source.resolved_ids,
1795            CatalogItem::Log(_) => &*EMPTY,
1796            CatalogItem::Table(table) => &table.resolved_ids,
1797            CatalogItem::Type(typ) => &typ.resolved_ids,
1798            CatalogItem::View(view) => &view.resolved_ids,
1799            CatalogItem::MaterializedView(mview) => &mview.resolved_ids,
1800            CatalogItem::Secret(_) => &*EMPTY,
1801            CatalogItem::Connection(connection) => &connection.resolved_ids,
1802            CatalogItem::ContinualTask(ct) => &ct.resolved_ids,
1803        }
1804    }
1805
1806    /// Collects the identifiers of the objects used by this [`CatalogItem`].
1807    ///
1808    /// Like [`CatalogItem::references()`] but also includes objects that are not directly
1809    /// referenced. For example this will include any catalog objects used to implement functions
1810    /// and casts in the item.
1811    pub fn uses(&self) -> BTreeSet<CatalogItemId> {
1812        let mut uses: BTreeSet<_> = self.references().items().copied().collect();
1813        match self {
1814            // TODO(jkosh44) This isn't really correct for functions. They may use other objects in
1815            // their implementation. However, currently there's no way to get that information.
1816            CatalogItem::Func(_) => {}
1817            CatalogItem::Index(_) => {}
1818            CatalogItem::Sink(_) => {}
1819            CatalogItem::Source(_) => {}
1820            CatalogItem::Log(_) => {}
1821            CatalogItem::Table(_) => {}
1822            CatalogItem::Type(_) => {}
1823            CatalogItem::View(view) => uses.extend(view.dependencies.0.iter().copied()),
1824            CatalogItem::MaterializedView(mview) => {
1825                uses.extend(mview.dependencies.0.iter().copied())
1826            }
1827            CatalogItem::ContinualTask(ct) => uses.extend(ct.dependencies.0.iter().copied()),
1828            CatalogItem::Secret(_) => {}
1829            CatalogItem::Connection(_) => {}
1830        }
1831        uses
1832    }
1833
1834    /// Returns the connection ID that this item belongs to, if this item is
1835    /// temporary.
1836    pub fn conn_id(&self) -> Option<&ConnectionId> {
1837        match self {
1838            CatalogItem::View(view) => view.conn_id.as_ref(),
1839            CatalogItem::Index(index) => index.conn_id.as_ref(),
1840            CatalogItem::Table(table) => table.conn_id.as_ref(),
1841            CatalogItem::Log(_)
1842            | CatalogItem::Source(_)
1843            | CatalogItem::Sink(_)
1844            | CatalogItem::MaterializedView(_)
1845            | CatalogItem::Secret(_)
1846            | CatalogItem::Type(_)
1847            | CatalogItem::Func(_)
1848            | CatalogItem::Connection(_)
1849            | CatalogItem::ContinualTask(_) => None,
1850        }
1851    }
1852
1853    /// Indicates whether this item is temporary or not.
1854    pub fn is_temporary(&self) -> bool {
1855        self.conn_id().is_some()
1856    }
1857
1858    pub fn rename_schema_refs(
1859        &self,
1860        database_name: &str,
1861        cur_schema_name: &str,
1862        new_schema_name: &str,
1863    ) -> Result<CatalogItem, (String, String)> {
1864        let do_rewrite = |create_sql: String| -> Result<String, (String, String)> {
1865            let mut create_stmt = mz_sql::parse::parse(&create_sql)
1866                .expect("invalid create sql persisted to catalog")
1867                .into_element()
1868                .ast;
1869
1870            // Rename all references to cur_schema_name.
1871            mz_sql::ast::transform::create_stmt_rename_schema_refs(
1872                &mut create_stmt,
1873                database_name,
1874                cur_schema_name,
1875                new_schema_name,
1876            )?;
1877
1878            Ok(create_stmt.to_ast_string_stable())
1879        };
1880
1881        match self {
1882            CatalogItem::Table(i) => {
1883                let mut i = i.clone();
1884                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1885                Ok(CatalogItem::Table(i))
1886            }
1887            CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
1888            CatalogItem::Source(i) => {
1889                let mut i = i.clone();
1890                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1891                Ok(CatalogItem::Source(i))
1892            }
1893            CatalogItem::Sink(i) => {
1894                let mut i = i.clone();
1895                i.create_sql = do_rewrite(i.create_sql)?;
1896                Ok(CatalogItem::Sink(i))
1897            }
1898            CatalogItem::View(i) => {
1899                let mut i = i.clone();
1900                i.create_sql = do_rewrite(i.create_sql)?;
1901                Ok(CatalogItem::View(i))
1902            }
1903            CatalogItem::MaterializedView(i) => {
1904                let mut i = i.clone();
1905                i.create_sql = do_rewrite(i.create_sql)?;
1906                Ok(CatalogItem::MaterializedView(i))
1907            }
1908            CatalogItem::Index(i) => {
1909                let mut i = i.clone();
1910                i.create_sql = do_rewrite(i.create_sql)?;
1911                Ok(CatalogItem::Index(i))
1912            }
1913            CatalogItem::Secret(i) => {
1914                let mut i = i.clone();
1915                i.create_sql = do_rewrite(i.create_sql)?;
1916                Ok(CatalogItem::Secret(i))
1917            }
1918            CatalogItem::Connection(i) => {
1919                let mut i = i.clone();
1920                i.create_sql = do_rewrite(i.create_sql)?;
1921                Ok(CatalogItem::Connection(i))
1922            }
1923            CatalogItem::Type(i) => {
1924                let mut i = i.clone();
1925                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1926                Ok(CatalogItem::Type(i))
1927            }
1928            CatalogItem::Func(i) => Ok(CatalogItem::Func(i.clone())),
1929            CatalogItem::ContinualTask(i) => {
1930                let mut i = i.clone();
1931                i.create_sql = do_rewrite(i.create_sql)?;
1932                Ok(CatalogItem::ContinualTask(i))
1933            }
1934        }
1935    }
1936
1937    /// Returns a clone of `self` with all instances of `from` renamed to `to`
1938    /// (with the option of including the item's own name) or errors if request
1939    /// is ambiguous.
1940    pub fn rename_item_refs(
1941        &self,
1942        from: FullItemName,
1943        to_item_name: String,
1944        rename_self: bool,
1945    ) -> Result<CatalogItem, String> {
1946        let do_rewrite = |create_sql: String| -> Result<String, String> {
1947            let mut create_stmt = mz_sql::parse::parse(&create_sql)
1948                .expect("invalid create sql persisted to catalog")
1949                .into_element()
1950                .ast;
1951            if rename_self {
1952                mz_sql::ast::transform::create_stmt_rename(&mut create_stmt, to_item_name.clone());
1953            }
1954            // Determination of what constitutes an ambiguous request is done here.
1955            mz_sql::ast::transform::create_stmt_rename_refs(&mut create_stmt, from, to_item_name)?;
1956            Ok(create_stmt.to_ast_string_stable())
1957        };
1958
1959        match self {
1960            CatalogItem::Table(i) => {
1961                let mut i = i.clone();
1962                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1963                Ok(CatalogItem::Table(i))
1964            }
1965            CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
1966            CatalogItem::Source(i) => {
1967                let mut i = i.clone();
1968                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1969                Ok(CatalogItem::Source(i))
1970            }
1971            CatalogItem::Sink(i) => {
1972                let mut i = i.clone();
1973                i.create_sql = do_rewrite(i.create_sql)?;
1974                Ok(CatalogItem::Sink(i))
1975            }
1976            CatalogItem::View(i) => {
1977                let mut i = i.clone();
1978                i.create_sql = do_rewrite(i.create_sql)?;
1979                Ok(CatalogItem::View(i))
1980            }
1981            CatalogItem::MaterializedView(i) => {
1982                let mut i = i.clone();
1983                i.create_sql = do_rewrite(i.create_sql)?;
1984                Ok(CatalogItem::MaterializedView(i))
1985            }
1986            CatalogItem::Index(i) => {
1987                let mut i = i.clone();
1988                i.create_sql = do_rewrite(i.create_sql)?;
1989                Ok(CatalogItem::Index(i))
1990            }
1991            CatalogItem::Secret(i) => {
1992                let mut i = i.clone();
1993                i.create_sql = do_rewrite(i.create_sql)?;
1994                Ok(CatalogItem::Secret(i))
1995            }
1996            CatalogItem::Func(_) | CatalogItem::Type(_) => {
1997                unreachable!("{}s cannot be renamed", self.typ())
1998            }
1999            CatalogItem::Connection(i) => {
2000                let mut i = i.clone();
2001                i.create_sql = do_rewrite(i.create_sql)?;
2002                Ok(CatalogItem::Connection(i))
2003            }
2004            CatalogItem::ContinualTask(i) => {
2005                let mut i = i.clone();
2006                i.create_sql = do_rewrite(i.create_sql)?;
2007                Ok(CatalogItem::ContinualTask(i))
2008            }
2009        }
2010    }
2011
2012    /// Updates the retain history for an item. Returns the previous retain history value. Returns
2013    /// an error if this item does not support retain history.
2014    pub fn update_retain_history(
2015        &mut self,
2016        value: Option<Value>,
2017        window: CompactionWindow,
2018    ) -> Result<Option<WithOptionValue<Raw>>, ()> {
2019        let update = |mut ast: &mut Statement<Raw>| {
2020            // Each statement type has unique option types. This macro handles them commonly.
2021            macro_rules! update_retain_history {
2022                ( $stmt:ident, $opt:ident, $name:ident ) => {{
2023                    // Replace or add the option.
2024                    let pos = $stmt
2025                        .with_options
2026                        .iter()
2027                        // In case there are ever multiple, look for the last one.
2028                        .rposition(|o| o.name == mz_sql_parser::ast::$name::RetainHistory);
2029                    if let Some(value) = value {
2030                        let next = mz_sql_parser::ast::$opt {
2031                            name: mz_sql_parser::ast::$name::RetainHistory,
2032                            value: Some(WithOptionValue::RetainHistoryFor(value)),
2033                        };
2034                        if let Some(idx) = pos {
2035                            let previous = $stmt.with_options[idx].clone();
2036                            $stmt.with_options[idx] = next;
2037                            previous.value
2038                        } else {
2039                            $stmt.with_options.push(next);
2040                            None
2041                        }
2042                    } else {
2043                        if let Some(idx) = pos {
2044                            $stmt.with_options.swap_remove(idx).value
2045                        } else {
2046                            None
2047                        }
2048                    }
2049                }};
2050            }
2051            let previous = match &mut ast {
2052                Statement::CreateTable(stmt) => {
2053                    update_retain_history!(stmt, TableOption, TableOptionName)
2054                }
2055                Statement::CreateIndex(stmt) => {
2056                    update_retain_history!(stmt, IndexOption, IndexOptionName)
2057                }
2058                Statement::CreateSource(stmt) => {
2059                    update_retain_history!(stmt, CreateSourceOption, CreateSourceOptionName)
2060                }
2061                Statement::CreateMaterializedView(stmt) => {
2062                    update_retain_history!(stmt, MaterializedViewOption, MaterializedViewOptionName)
2063                }
2064                _ => {
2065                    return Err(());
2066                }
2067            };
2068            Ok(previous)
2069        };
2070
2071        let res = self.update_sql(update)?;
2072        let cw = self
2073            .custom_logical_compaction_window_mut()
2074            .expect("item must have compaction window");
2075        *cw = Some(window);
2076        Ok(res)
2077    }
2078
2079    pub fn add_column(
2080        &mut self,
2081        name: ColumnName,
2082        typ: SqlColumnType,
2083        sql: RawDataType,
2084    ) -> Result<RelationVersion, PlanError> {
2085        let CatalogItem::Table(table) = self else {
2086            return Err(PlanError::Unsupported {
2087                feature: "adding columns to a non-Table".to_string(),
2088                discussion_no: None,
2089            });
2090        };
2091        let next_version = table.desc.add_column(name.clone(), typ);
2092
2093        let update = |mut ast: &mut Statement<Raw>| match &mut ast {
2094            Statement::CreateTable(stmt) => {
2095                let version = ColumnOptionDef {
2096                    name: None,
2097                    option: ColumnOption::Versioned {
2098                        action: ColumnVersioned::Added,
2099                        version: next_version.into(),
2100                    },
2101                };
2102                let column = ColumnDef {
2103                    name: name.into(),
2104                    data_type: sql,
2105                    collation: None,
2106                    options: vec![version],
2107                };
2108                stmt.columns.push(column);
2109                Ok(())
2110            }
2111            _ => Err(()),
2112        };
2113
2114        self.update_sql(update)
2115            .map_err(|()| PlanError::Unstructured("expected CREATE TABLE statement".to_string()))?;
2116        Ok(next_version)
2117    }
2118
2119    /// Updates the create_sql field of this item. Returns an error if this is a builtin item,
2120    /// otherwise returns f's result.
2121    pub fn update_sql<F, T>(&mut self, f: F) -> Result<T, ()>
2122    where
2123        F: FnOnce(&mut Statement<Raw>) -> Result<T, ()>,
2124    {
2125        let create_sql = match self {
2126            CatalogItem::Table(Table { create_sql, .. })
2127            | CatalogItem::Type(Type { create_sql, .. })
2128            | CatalogItem::Source(Source { create_sql, .. }) => create_sql.as_mut(),
2129            CatalogItem::Sink(Sink { create_sql, .. })
2130            | CatalogItem::View(View { create_sql, .. })
2131            | CatalogItem::MaterializedView(MaterializedView { create_sql, .. })
2132            | CatalogItem::Index(Index { create_sql, .. })
2133            | CatalogItem::Secret(Secret { create_sql, .. })
2134            | CatalogItem::Connection(Connection { create_sql, .. })
2135            | CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => Some(create_sql),
2136            CatalogItem::Func(_) | CatalogItem::Log(_) => None,
2137        };
2138        let Some(create_sql) = create_sql else {
2139            return Err(());
2140        };
2141        let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
2142            .expect("non-system items must be parseable")
2143            .into_element()
2144            .ast;
2145        debug!("rewrite: {}", ast.to_ast_string_redacted());
2146        let t = f(&mut ast)?;
2147        *create_sql = ast.to_ast_string_stable();
2148        debug!("rewrote: {}", ast.to_ast_string_redacted());
2149        Ok(t)
2150    }
2151
2152    /// If the object is considered a "compute object"
2153    /// (i.e., it is managed by the compute controller),
2154    /// this function returns its cluster ID. Otherwise, it returns nothing.
2155    ///
2156    /// This function differs from `cluster_id` because while all
2157    /// compute objects run on a cluster, the converse is not true.
2158    pub fn is_compute_object_on_cluster(&self) -> Option<ClusterId> {
2159        match self {
2160            CatalogItem::Index(index) => Some(index.cluster_id),
2161            CatalogItem::Table(_)
2162            | CatalogItem::Source(_)
2163            | CatalogItem::Log(_)
2164            | CatalogItem::View(_)
2165            | CatalogItem::MaterializedView(_)
2166            | CatalogItem::Sink(_)
2167            | CatalogItem::Type(_)
2168            | CatalogItem::Func(_)
2169            | CatalogItem::Secret(_)
2170            | CatalogItem::Connection(_)
2171            | CatalogItem::ContinualTask(_) => None,
2172        }
2173    }
2174
2175    pub fn cluster_id(&self) -> Option<ClusterId> {
2176        match self {
2177            CatalogItem::MaterializedView(mv) => Some(mv.cluster_id),
2178            CatalogItem::Index(index) => Some(index.cluster_id),
2179            CatalogItem::Source(source) => match &source.data_source {
2180                DataSourceDesc::Ingestion { cluster_id, .. }
2181                | DataSourceDesc::OldSyntaxIngestion { cluster_id, .. } => Some(*cluster_id),
2182                // This is somewhat of a lie because the export runs on the same
2183                // cluster as its ingestion but we don't yet have a way of
2184                // cross-referencing the items
2185                DataSourceDesc::IngestionExport { .. } => None,
2186                DataSourceDesc::Webhook { cluster_id, .. } => Some(*cluster_id),
2187                DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => None,
2188            },
2189            CatalogItem::Sink(sink) => Some(sink.cluster_id),
2190            CatalogItem::ContinualTask(ct) => Some(ct.cluster_id),
2191            CatalogItem::Table(_)
2192            | CatalogItem::Log(_)
2193            | CatalogItem::View(_)
2194            | CatalogItem::Type(_)
2195            | CatalogItem::Func(_)
2196            | CatalogItem::Secret(_)
2197            | CatalogItem::Connection(_) => None,
2198        }
2199    }
2200
2201    /// The custom compaction window, if any has been set. This does not reflect any propagated
2202    /// compaction window (i.e., source -> subsource).
2203    pub fn custom_logical_compaction_window(&self) -> Option<CompactionWindow> {
2204        match self {
2205            CatalogItem::Table(table) => table.custom_logical_compaction_window,
2206            CatalogItem::Source(source) => source.custom_logical_compaction_window,
2207            CatalogItem::Index(index) => index.custom_logical_compaction_window,
2208            CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
2209            CatalogItem::Log(_)
2210            | CatalogItem::View(_)
2211            | CatalogItem::Sink(_)
2212            | CatalogItem::Type(_)
2213            | CatalogItem::Func(_)
2214            | CatalogItem::Secret(_)
2215            | CatalogItem::Connection(_)
2216            | CatalogItem::ContinualTask(_) => None,
2217        }
2218    }
2219
2220    /// Mutable access to the custom compaction window, or None if this type does not support custom
2221    /// compaction windows. This does not reflect any propagated compaction window (i.e., source ->
2222    /// subsource).
2223    pub fn custom_logical_compaction_window_mut(
2224        &mut self,
2225    ) -> Option<&mut Option<CompactionWindow>> {
2226        let cw = match self {
2227            CatalogItem::Table(table) => &mut table.custom_logical_compaction_window,
2228            CatalogItem::Source(source) => &mut source.custom_logical_compaction_window,
2229            CatalogItem::Index(index) => &mut index.custom_logical_compaction_window,
2230            CatalogItem::MaterializedView(mview) => &mut mview.custom_logical_compaction_window,
2231            CatalogItem::Log(_)
2232            | CatalogItem::View(_)
2233            | CatalogItem::Sink(_)
2234            | CatalogItem::Type(_)
2235            | CatalogItem::Func(_)
2236            | CatalogItem::Secret(_)
2237            | CatalogItem::Connection(_)
2238            | CatalogItem::ContinualTask(_) => return None,
2239        };
2240        Some(cw)
2241    }
2242
2243    /// The initial compaction window, for objects that have one; that is, tables, sources, indexes,
2244    /// and MVs. This does not reflect any propagated compaction window (i.e., source -> subsource).
2245    ///
2246    /// If `custom_logical_compaction_window()` returns something, use that.  Otherwise, use a
2247    /// sensible default (currently 1s).
2248    ///
2249    /// For objects that do not have the concept of compaction window, return None.
2250    pub fn initial_logical_compaction_window(&self) -> Option<CompactionWindow> {
2251        let custom_logical_compaction_window = match self {
2252            CatalogItem::Table(_)
2253            | CatalogItem::Source(_)
2254            | CatalogItem::Index(_)
2255            | CatalogItem::MaterializedView(_)
2256            | CatalogItem::ContinualTask(_) => self.custom_logical_compaction_window(),
2257            CatalogItem::Log(_)
2258            | CatalogItem::View(_)
2259            | CatalogItem::Sink(_)
2260            | CatalogItem::Type(_)
2261            | CatalogItem::Func(_)
2262            | CatalogItem::Secret(_)
2263            | CatalogItem::Connection(_) => return None,
2264        };
2265        Some(custom_logical_compaction_window.unwrap_or(CompactionWindow::Default))
2266    }
2267
2268    /// Whether the item's logical compaction window
2269    /// is controlled by the METRICS_RETENTION
2270    /// system var.
2271    pub fn is_retained_metrics_object(&self) -> bool {
2272        match self {
2273            CatalogItem::Table(table) => table.is_retained_metrics_object,
2274            CatalogItem::Source(source) => source.is_retained_metrics_object,
2275            CatalogItem::Index(index) => index.is_retained_metrics_object,
2276            CatalogItem::Log(_)
2277            | CatalogItem::View(_)
2278            | CatalogItem::MaterializedView(_)
2279            | CatalogItem::Sink(_)
2280            | CatalogItem::Type(_)
2281            | CatalogItem::Func(_)
2282            | CatalogItem::Secret(_)
2283            | CatalogItem::Connection(_)
2284            | CatalogItem::ContinualTask(_) => false,
2285        }
2286    }
2287
2288    pub fn to_serialized(&self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2289        match self {
2290            CatalogItem::Table(table) => {
2291                let create_sql = table
2292                    .create_sql
2293                    .clone()
2294                    .expect("builtin tables cannot be serialized");
2295                let mut collections = table.collections.clone();
2296                let global_id = collections
2297                    .remove(&RelationVersion::root())
2298                    .expect("at least one version");
2299                (create_sql, global_id, collections)
2300            }
2301            CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2302            CatalogItem::Source(source) => {
2303                assert!(
2304                    !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2305                    "cannot serialize introspection/builtin sources",
2306                );
2307                let create_sql = source
2308                    .create_sql
2309                    .clone()
2310                    .expect("builtin sources cannot be serialized");
2311                (create_sql, source.global_id, BTreeMap::new())
2312            }
2313            CatalogItem::View(view) => (view.create_sql.clone(), view.global_id, BTreeMap::new()),
2314            CatalogItem::MaterializedView(mview) => {
2315                let mut collections = mview.collections.clone();
2316                let global_id = collections
2317                    .remove(&RelationVersion::root())
2318                    .expect("at least one version");
2319                (mview.create_sql.clone(), global_id, collections)
2320            }
2321            CatalogItem::Index(index) => {
2322                (index.create_sql.clone(), index.global_id, BTreeMap::new())
2323            }
2324            CatalogItem::Sink(sink) => (sink.create_sql.clone(), sink.global_id, BTreeMap::new()),
2325            CatalogItem::Type(typ) => {
2326                let create_sql = typ
2327                    .create_sql
2328                    .clone()
2329                    .expect("builtin types cannot be serialized");
2330                (create_sql, typ.global_id, BTreeMap::new())
2331            }
2332            CatalogItem::Secret(secret) => {
2333                (secret.create_sql.clone(), secret.global_id, BTreeMap::new())
2334            }
2335            CatalogItem::Connection(connection) => (
2336                connection.create_sql.clone(),
2337                connection.global_id,
2338                BTreeMap::new(),
2339            ),
2340            CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2341            CatalogItem::ContinualTask(ct) => {
2342                (ct.create_sql.clone(), ct.global_id, BTreeMap::new())
2343            }
2344        }
2345    }
2346
2347    pub fn into_serialized(self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2348        match self {
2349            CatalogItem::Table(mut table) => {
2350                let create_sql = table
2351                    .create_sql
2352                    .expect("builtin tables cannot be serialized");
2353                let global_id = table
2354                    .collections
2355                    .remove(&RelationVersion::root())
2356                    .expect("at least one version");
2357                (create_sql, global_id, table.collections)
2358            }
2359            CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2360            CatalogItem::Source(source) => {
2361                assert!(
2362                    !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2363                    "cannot serialize introspection/builtin sources",
2364                );
2365                let create_sql = source
2366                    .create_sql
2367                    .expect("builtin sources cannot be serialized");
2368                (create_sql, source.global_id, BTreeMap::new())
2369            }
2370            CatalogItem::View(view) => (view.create_sql, view.global_id, BTreeMap::new()),
2371            CatalogItem::MaterializedView(mut mview) => {
2372                let global_id = mview
2373                    .collections
2374                    .remove(&RelationVersion::root())
2375                    .expect("at least one version");
2376                (mview.create_sql, global_id, mview.collections)
2377            }
2378            CatalogItem::Index(index) => (index.create_sql, index.global_id, BTreeMap::new()),
2379            CatalogItem::Sink(sink) => (sink.create_sql, sink.global_id, BTreeMap::new()),
2380            CatalogItem::Type(typ) => {
2381                let create_sql = typ.create_sql.expect("builtin types cannot be serialized");
2382                (create_sql, typ.global_id, BTreeMap::new())
2383            }
2384            CatalogItem::Secret(secret) => (secret.create_sql, secret.global_id, BTreeMap::new()),
2385            CatalogItem::Connection(connection) => {
2386                (connection.create_sql, connection.global_id, BTreeMap::new())
2387            }
2388            CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2389            CatalogItem::ContinualTask(ct) => (ct.create_sql, ct.global_id, BTreeMap::new()),
2390        }
2391    }
2392
2393    /// Returns a global ID for a specific version selector. Returns `None` if the item does
2394    /// not have versions or if the version does not exist.
2395    pub fn global_id_for_version(&self, version: RelationVersionSelector) -> Option<GlobalId> {
2396        let collections = match self {
2397            CatalogItem::MaterializedView(mv) => &mv.collections,
2398            CatalogItem::Table(table) => &table.collections,
2399            CatalogItem::Source(source) => return Some(source.global_id),
2400            CatalogItem::Log(log) => return Some(log.global_id),
2401            CatalogItem::View(view) => return Some(view.global_id),
2402            CatalogItem::Sink(sink) => return Some(sink.global_id),
2403            CatalogItem::Index(index) => return Some(index.global_id),
2404            CatalogItem::Type(ty) => return Some(ty.global_id),
2405            CatalogItem::Func(func) => return Some(func.global_id),
2406            CatalogItem::Secret(secret) => return Some(secret.global_id),
2407            CatalogItem::Connection(conn) => return Some(conn.global_id),
2408            CatalogItem::ContinualTask(ct) => return Some(ct.global_id),
2409        };
2410        match version {
2411            RelationVersionSelector::Latest => collections.values().last().copied(),
2412            RelationVersionSelector::Specific(version) => collections.get(&version).copied(),
2413        }
2414    }
2415}
2416
2417impl CatalogEntry {
2418    /// Reports the latest [`RelationDesc`] of the rows produced by this [`CatalogEntry`],
2419    /// returning an error if this [`CatalogEntry`] does not produce rows.
2420    ///
2421    /// If you need to get the [`RelationDesc`] for a specific version, see [`CatalogItem::desc`].
2422    pub fn desc_latest(
2423        &self,
2424        name: &FullItemName,
2425    ) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
2426        self.item.desc(name, RelationVersionSelector::Latest)
2427    }
2428
2429    /// Reports the latest [`RelationDesc`] of the rows produced by this [`CatalogEntry`], if it
2430    /// produces rows.
2431    pub fn desc_opt_latest(&self) -> Option<Cow<'_, RelationDesc>> {
2432        self.item.desc_opt(RelationVersionSelector::Latest)
2433    }
2434
2435    /// Reports if the item has columns.
2436    pub fn has_columns(&self) -> bool {
2437        self.desc_opt_latest().is_some()
2438    }
2439
2440    /// Returns the [`mz_sql::func::Func`] associated with this `CatalogEntry`.
2441    pub fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
2442        self.item.func(self)
2443    }
2444
2445    /// Returns the inner [`Index`] if this entry is an index, else `None`.
2446    pub fn index(&self) -> Option<&Index> {
2447        match self.item() {
2448            CatalogItem::Index(idx) => Some(idx),
2449            _ => None,
2450        }
2451    }
2452
2453    /// Returns the inner [`MaterializedView`] if this entry is a materialized view, else `None`.
2454    pub fn materialized_view(&self) -> Option<&MaterializedView> {
2455        match self.item() {
2456            CatalogItem::MaterializedView(mv) => Some(mv),
2457            _ => None,
2458        }
2459    }
2460
2461    /// Returns the inner [`Table`] if this entry is a table, else `None`.
2462    pub fn table(&self) -> Option<&Table> {
2463        match self.item() {
2464            CatalogItem::Table(tbl) => Some(tbl),
2465            _ => None,
2466        }
2467    }
2468
2469    /// Returns the inner [`Source`] if this entry is a source, else `None`.
2470    pub fn source(&self) -> Option<&Source> {
2471        match self.item() {
2472            CatalogItem::Source(src) => Some(src),
2473            _ => None,
2474        }
2475    }
2476
2477    /// Returns the inner [`Sink`] if this entry is a sink, else `None`.
2478    pub fn sink(&self) -> Option<&Sink> {
2479        match self.item() {
2480            CatalogItem::Sink(sink) => Some(sink),
2481            _ => None,
2482        }
2483    }
2484
2485    /// Returns the inner [`Secret`] if this entry is a secret, else `None`.
2486    pub fn secret(&self) -> Option<&Secret> {
2487        match self.item() {
2488            CatalogItem::Secret(secret) => Some(secret),
2489            _ => None,
2490        }
2491    }
2492
2493    pub fn connection(&self) -> Result<&Connection, SqlCatalogError> {
2494        match self.item() {
2495            CatalogItem::Connection(connection) => Ok(connection),
2496            _ => {
2497                let db_name = match self.name().qualifiers.database_spec {
2498                    ResolvedDatabaseSpecifier::Ambient => "".to_string(),
2499                    ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
2500                };
2501                Err(SqlCatalogError::UnknownConnection(format!(
2502                    "{}{}.{}",
2503                    db_name,
2504                    self.name().qualifiers.schema_spec,
2505                    self.name().item
2506                )))
2507            }
2508        }
2509    }
2510
2511    /// Returns the [`mz_storage_types::sources::SourceDesc`] associated with
2512    /// this `CatalogEntry`, if any.
2513    pub fn source_desc(
2514        &self,
2515    ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
2516        self.item.source_desc(self)
2517    }
2518
2519    /// Reports whether this catalog entry is a connection.
2520    pub fn is_connection(&self) -> bool {
2521        matches!(self.item(), CatalogItem::Connection(_))
2522    }
2523
2524    /// Reports whether this catalog entry is a table.
2525    pub fn is_table(&self) -> bool {
2526        matches!(self.item(), CatalogItem::Table(_))
2527    }
2528
2529    /// Reports whether this catalog entry is a source. Note that this includes
2530    /// subsources.
2531    pub fn is_source(&self) -> bool {
2532        matches!(self.item(), CatalogItem::Source(_))
2533    }
2534
2535    /// Reports whether this catalog entry is a subsource and, if it is, the
2536    /// ingestion it is an export of, as well as the item it exports.
2537    pub fn subsource_details(
2538        &self,
2539    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
2540        match &self.item() {
2541            CatalogItem::Source(source) => match &source.data_source {
2542                DataSourceDesc::IngestionExport {
2543                    ingestion_id,
2544                    external_reference,
2545                    details,
2546                    data_config: _,
2547                } => Some((*ingestion_id, external_reference, details)),
2548                _ => None,
2549            },
2550            _ => None,
2551        }
2552    }
2553
2554    /// Reports whether this catalog entry is a source export and, if it is, the
2555    /// ingestion it is an export of, as well as the item it exports.
2556    pub fn source_export_details(
2557        &self,
2558    ) -> Option<(
2559        CatalogItemId,
2560        &UnresolvedItemName,
2561        &SourceExportDetails,
2562        &SourceExportDataConfig<ReferencedConnection>,
2563    )> {
2564        match &self.item() {
2565            CatalogItem::Source(source) => match &source.data_source {
2566                DataSourceDesc::IngestionExport {
2567                    ingestion_id,
2568                    external_reference,
2569                    details,
2570                    data_config,
2571                } => Some((*ingestion_id, external_reference, details, data_config)),
2572                _ => None,
2573            },
2574            CatalogItem::Table(table) => match &table.data_source {
2575                TableDataSource::DataSource {
2576                    desc:
2577                        DataSourceDesc::IngestionExport {
2578                            ingestion_id,
2579                            external_reference,
2580                            details,
2581                            data_config,
2582                        },
2583                    timeline: _,
2584                } => Some((*ingestion_id, external_reference, details, data_config)),
2585                _ => None,
2586            },
2587            _ => None,
2588        }
2589    }
2590
2591    /// Reports whether this catalog entry is a progress source.
2592    pub fn is_progress_source(&self) -> bool {
2593        self.item().is_progress_source()
2594    }
2595
2596    /// Returns the `GlobalId` of all of this entry's progress ID.
2597    pub fn progress_id(&self) -> Option<CatalogItemId> {
2598        match &self.item() {
2599            CatalogItem::Source(source) => match &source.data_source {
2600                DataSourceDesc::Ingestion { .. } => Some(self.id),
2601                DataSourceDesc::OldSyntaxIngestion {
2602                    progress_subsource, ..
2603                } => Some(*progress_subsource),
2604                DataSourceDesc::IngestionExport { .. }
2605                | DataSourceDesc::Introspection(_)
2606                | DataSourceDesc::Progress
2607                | DataSourceDesc::Webhook { .. } => None,
2608            },
2609            CatalogItem::Table(_)
2610            | CatalogItem::Log(_)
2611            | CatalogItem::View(_)
2612            | CatalogItem::MaterializedView(_)
2613            | CatalogItem::Sink(_)
2614            | CatalogItem::Index(_)
2615            | CatalogItem::Type(_)
2616            | CatalogItem::Func(_)
2617            | CatalogItem::Secret(_)
2618            | CatalogItem::Connection(_)
2619            | CatalogItem::ContinualTask(_) => None,
2620        }
2621    }
2622
2623    /// Reports whether this catalog entry is a sink.
2624    pub fn is_sink(&self) -> bool {
2625        matches!(self.item(), CatalogItem::Sink(_))
2626    }
2627
2628    /// Reports whether this catalog entry is a materialized view.
2629    pub fn is_materialized_view(&self) -> bool {
2630        matches!(self.item(), CatalogItem::MaterializedView(_))
2631    }
2632
2633    /// Reports whether this catalog entry is a view.
2634    pub fn is_view(&self) -> bool {
2635        matches!(self.item(), CatalogItem::View(_))
2636    }
2637
2638    /// Reports whether this catalog entry is a secret.
2639    pub fn is_secret(&self) -> bool {
2640        matches!(self.item(), CatalogItem::Secret(_))
2641    }
2642
2643    /// Reports whether this catalog entry is an introspection source.
2644    pub fn is_introspection_source(&self) -> bool {
2645        matches!(self.item(), CatalogItem::Log(_))
2646    }
2647
2648    /// Reports whether this catalog entry is an index.
2649    pub fn is_index(&self) -> bool {
2650        matches!(self.item(), CatalogItem::Index(_))
2651    }
2652
2653    /// Reports whether this catalog entry is a continual task.
2654    pub fn is_continual_task(&self) -> bool {
2655        matches!(self.item(), CatalogItem::ContinualTask(_))
2656    }
2657
2658    /// Reports whether this catalog entry can be treated as a relation, it can produce rows.
2659    pub fn is_relation(&self) -> bool {
2660        mz_sql::catalog::ObjectType::from(self.item_type()).is_relation()
2661    }
2662
2663    /// Collects the identifiers of the objects that were encountered when
2664    /// resolving names in the item's DDL statement.
2665    pub fn references(&self) -> &ResolvedIds {
2666        self.item.references()
2667    }
2668
2669    /// Collects the identifiers of the objects used by this [`CatalogEntry`].
2670    ///
2671    /// Like [`CatalogEntry::references()`] but also includes objects that are not directly
2672    /// referenced. For example this will include any catalog objects used to implement functions
2673    /// and casts in the item.
2674    pub fn uses(&self) -> BTreeSet<CatalogItemId> {
2675        self.item.uses()
2676    }
2677
2678    /// Returns the `CatalogItem` associated with this catalog entry.
2679    pub fn item(&self) -> &CatalogItem {
2680        &self.item
2681    }
2682
2683    /// Returns the [`CatalogItemId`] of this catalog entry.
2684    pub fn id(&self) -> CatalogItemId {
2685        self.id
2686    }
2687
2688    /// Returns all of the [`GlobalId`]s associated with this item.
2689    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2690        self.item().global_ids()
2691    }
2692
2693    pub fn latest_global_id(&self) -> GlobalId {
2694        self.item().latest_global_id()
2695    }
2696
2697    /// Returns the OID of this catalog entry.
2698    pub fn oid(&self) -> u32 {
2699        self.oid
2700    }
2701
2702    /// Returns the fully qualified name of this catalog entry.
2703    pub fn name(&self) -> &QualifiedItemName {
2704        &self.name
2705    }
2706
2707    /// Returns the identifiers of the dataflows that are directly referenced by this dataflow.
2708    pub fn referenced_by(&self) -> &[CatalogItemId] {
2709        &self.referenced_by
2710    }
2711
2712    /// Returns the identifiers of the dataflows that depend upon this dataflow.
2713    pub fn used_by(&self) -> &[CatalogItemId] {
2714        &self.used_by
2715    }
2716
2717    /// Returns the connection ID that this item belongs to, if this item is
2718    /// temporary.
2719    pub fn conn_id(&self) -> Option<&ConnectionId> {
2720        self.item.conn_id()
2721    }
2722
2723    /// Returns the role ID of the entry owner.
2724    pub fn owner_id(&self) -> &RoleId {
2725        &self.owner_id
2726    }
2727
2728    /// Returns the privileges of the entry.
2729    pub fn privileges(&self) -> &PrivilegeMap {
2730        &self.privileges
2731    }
2732}
2733
2734#[derive(Debug, Clone, Default)]
2735pub struct CommentsMap {
2736    map: BTreeMap<CommentObjectId, BTreeMap<Option<usize>, String>>,
2737}
2738
2739impl CommentsMap {
2740    pub fn update_comment(
2741        &mut self,
2742        object_id: CommentObjectId,
2743        sub_component: Option<usize>,
2744        comment: Option<String>,
2745    ) -> Option<String> {
2746        let object_comments = self.map.entry(object_id).or_default();
2747
2748        // Either replace the existing comment, or remove it if comment is None/NULL.
2749        let (empty, prev) = if let Some(comment) = comment {
2750            let prev = object_comments.insert(sub_component, comment);
2751            (false, prev)
2752        } else {
2753            let prev = object_comments.remove(&sub_component);
2754            (object_comments.is_empty(), prev)
2755        };
2756
2757        // Cleanup entries that are now empty.
2758        if empty {
2759            self.map.remove(&object_id);
2760        }
2761
2762        // Return the previous comment, if there was one, for easy removal.
2763        prev
2764    }
2765
2766    /// Remove all comments for `object_id` from the map.
2767    ///
2768    /// Generally there is one comment for a given [`CommentObjectId`], but in the case of
2769    /// relations you can also have comments on the individual columns. Dropping the comments for a
2770    /// relation will also drop all of the comments on any columns.
2771    pub fn drop_comments(
2772        &mut self,
2773        object_ids: &BTreeSet<CommentObjectId>,
2774    ) -> Vec<(CommentObjectId, Option<usize>, String)> {
2775        let mut removed_comments = Vec::new();
2776
2777        for object_id in object_ids {
2778            if let Some(comments) = self.map.remove(object_id) {
2779                let removed = comments
2780                    .into_iter()
2781                    .map(|(sub_comp, comment)| (object_id.clone(), sub_comp, comment));
2782                removed_comments.extend(removed);
2783            }
2784        }
2785
2786        removed_comments
2787    }
2788
2789    pub fn iter(&self) -> impl Iterator<Item = (CommentObjectId, Option<usize>, &str)> {
2790        self.map
2791            .iter()
2792            .map(|(id, comments)| {
2793                comments
2794                    .iter()
2795                    .map(|(pos, comment)| (*id, *pos, comment.as_str()))
2796            })
2797            .flatten()
2798    }
2799
2800    pub fn get_object_comments(
2801        &self,
2802        object_id: CommentObjectId,
2803    ) -> Option<&BTreeMap<Option<usize>, String>> {
2804        self.map.get(&object_id)
2805    }
2806}
2807
2808impl Serialize for CommentsMap {
2809    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2810    where
2811        S: serde::Serializer,
2812    {
2813        let comment_count = self
2814            .map
2815            .iter()
2816            .map(|(_object_id, comments)| comments.len())
2817            .sum();
2818
2819        let mut seq = serializer.serialize_seq(Some(comment_count))?;
2820        for (object_id, sub) in &self.map {
2821            for (sub_component, comment) in sub {
2822                seq.serialize_element(&(
2823                    format!("{object_id:?}"),
2824                    format!("{sub_component:?}"),
2825                    comment,
2826                ))?;
2827            }
2828        }
2829        seq.end()
2830    }
2831}
2832
2833#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
2834pub struct DefaultPrivileges {
2835    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
2836    privileges: BTreeMap<DefaultPrivilegeObject, RoleDefaultPrivileges>,
2837}
2838
2839// Use a new type here because otherwise we have two levels of BTreeMap, both needing
2840// map_key_to_string.
2841#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
2842struct RoleDefaultPrivileges(
2843    /// Denormalized, the key is the grantee Role.
2844    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
2845    BTreeMap<RoleId, DefaultPrivilegeAclItem>,
2846);
2847
2848impl Deref for RoleDefaultPrivileges {
2849    type Target = BTreeMap<RoleId, DefaultPrivilegeAclItem>;
2850
2851    fn deref(&self) -> &Self::Target {
2852        &self.0
2853    }
2854}
2855
2856impl DerefMut for RoleDefaultPrivileges {
2857    fn deref_mut(&mut self) -> &mut Self::Target {
2858        &mut self.0
2859    }
2860}
2861
2862impl DefaultPrivileges {
2863    /// Add a new default privilege into the set of all default privileges.
2864    pub fn grant(&mut self, object: DefaultPrivilegeObject, privilege: DefaultPrivilegeAclItem) {
2865        if privilege.acl_mode.is_empty() {
2866            return;
2867        }
2868
2869        let privileges = self.privileges.entry(object).or_default();
2870        if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
2871            default_privilege.acl_mode |= privilege.acl_mode;
2872        } else {
2873            privileges.insert(privilege.grantee, privilege);
2874        }
2875    }
2876
2877    /// Revoke a default privilege from the set of all default privileges.
2878    pub fn revoke(&mut self, object: &DefaultPrivilegeObject, privilege: &DefaultPrivilegeAclItem) {
2879        if let Some(privileges) = self.privileges.get_mut(object) {
2880            if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
2881                default_privilege.acl_mode =
2882                    default_privilege.acl_mode.difference(privilege.acl_mode);
2883                if default_privilege.acl_mode.is_empty() {
2884                    privileges.remove(&privilege.grantee);
2885                }
2886            }
2887            if privileges.is_empty() {
2888                self.privileges.remove(object);
2889            }
2890        }
2891    }
2892
2893    /// Get the privileges that will be granted on all objects matching `object` to `grantee`, if
2894    /// any exist.
2895    pub fn get_privileges_for_grantee(
2896        &self,
2897        object: &DefaultPrivilegeObject,
2898        grantee: &RoleId,
2899    ) -> Option<&AclMode> {
2900        self.privileges
2901            .get(object)
2902            .and_then(|privileges| privileges.get(grantee))
2903            .map(|privilege| &privilege.acl_mode)
2904    }
2905
2906    /// Get all default privileges that apply to the provided object details.
2907    pub fn get_applicable_privileges(
2908        &self,
2909        role_id: RoleId,
2910        database_id: Option<DatabaseId>,
2911        schema_id: Option<SchemaId>,
2912        object_type: mz_sql::catalog::ObjectType,
2913    ) -> impl Iterator<Item = DefaultPrivilegeAclItem> + '_ {
2914        // Privileges consider all relations to be of type table due to PostgreSQL compatibility. We
2915        // don't require the caller to worry about that and we will map their `object_type` to the
2916        // correct type for privileges.
2917        let privilege_object_type = if object_type.is_relation() {
2918            mz_sql::catalog::ObjectType::Table
2919        } else {
2920            object_type
2921        };
2922        let valid_acl_mode = rbac::all_object_privileges(SystemObjectType::Object(object_type));
2923
2924        // Collect all entries that apply to the provided object details.
2925        // If either `database_id` or `schema_id` are `None`, then we might end up with duplicate
2926        // entries in the vec below. That's OK because we consolidate the results after.
2927        [
2928            DefaultPrivilegeObject {
2929                role_id,
2930                database_id,
2931                schema_id,
2932                object_type: privilege_object_type,
2933            },
2934            DefaultPrivilegeObject {
2935                role_id,
2936                database_id,
2937                schema_id: None,
2938                object_type: privilege_object_type,
2939            },
2940            DefaultPrivilegeObject {
2941                role_id,
2942                database_id: None,
2943                schema_id: None,
2944                object_type: privilege_object_type,
2945            },
2946            DefaultPrivilegeObject {
2947                role_id: RoleId::Public,
2948                database_id,
2949                schema_id,
2950                object_type: privilege_object_type,
2951            },
2952            DefaultPrivilegeObject {
2953                role_id: RoleId::Public,
2954                database_id,
2955                schema_id: None,
2956                object_type: privilege_object_type,
2957            },
2958            DefaultPrivilegeObject {
2959                role_id: RoleId::Public,
2960                database_id: None,
2961                schema_id: None,
2962                object_type: privilege_object_type,
2963            },
2964        ]
2965        .into_iter()
2966        .filter_map(|object| self.privileges.get(&object))
2967        .flat_map(|acl_map| acl_map.values())
2968        // Consolidate privileges with a common grantee.
2969        .fold(
2970            BTreeMap::new(),
2971            |mut accum, DefaultPrivilegeAclItem { grantee, acl_mode }| {
2972                let accum_acl_mode = accum.entry(grantee).or_insert_with(AclMode::empty);
2973                *accum_acl_mode |= *acl_mode;
2974                accum
2975            },
2976        )
2977        .into_iter()
2978        // Restrict the acl_mode to only privileges valid for the provided object type. If the
2979        // default privilege has an object type of Table, then it may contain privileges valid for
2980        // tables but not other relations. If the passed in object type is another relation, then
2981        // we need to remove any privilege that is not valid for the specified relation.
2982        .map(move |(grantee, acl_mode)| (grantee, acl_mode & valid_acl_mode))
2983        // Filter out empty privileges.
2984        .filter(|(_, acl_mode)| !acl_mode.is_empty())
2985        .map(|(grantee, acl_mode)| DefaultPrivilegeAclItem {
2986            grantee: *grantee,
2987            acl_mode,
2988        })
2989    }
2990
2991    pub fn iter(
2992        &self,
2993    ) -> impl Iterator<
2994        Item = (
2995            &DefaultPrivilegeObject,
2996            impl Iterator<Item = &DefaultPrivilegeAclItem>,
2997        ),
2998    > {
2999        self.privileges
3000            .iter()
3001            .map(|(object, acl_map)| (object, acl_map.values()))
3002    }
3003}
3004
3005#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3006pub struct ClusterConfig {
3007    pub variant: ClusterVariant,
3008    pub workload_class: Option<String>,
3009}
3010
3011impl ClusterConfig {
3012    pub fn features(&self) -> Option<&OptimizerFeatureOverrides> {
3013        match &self.variant {
3014            ClusterVariant::Managed(managed) => Some(&managed.optimizer_feature_overrides),
3015            ClusterVariant::Unmanaged => None,
3016        }
3017    }
3018}
3019
3020impl From<ClusterConfig> for durable::ClusterConfig {
3021    fn from(config: ClusterConfig) -> Self {
3022        Self {
3023            variant: config.variant.into(),
3024            workload_class: config.workload_class,
3025        }
3026    }
3027}
3028
3029impl From<durable::ClusterConfig> for ClusterConfig {
3030    fn from(config: durable::ClusterConfig) -> Self {
3031        Self {
3032            variant: config.variant.into(),
3033            workload_class: config.workload_class,
3034        }
3035    }
3036}
3037
3038#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3039pub struct ClusterVariantManaged {
3040    pub size: String,
3041    pub availability_zones: Vec<String>,
3042    pub logging: ReplicaLogging,
3043    pub replication_factor: u32,
3044    pub optimizer_feature_overrides: OptimizerFeatureOverrides,
3045    pub schedule: ClusterSchedule,
3046}
3047
3048impl From<ClusterVariantManaged> for durable::ClusterVariantManaged {
3049    fn from(managed: ClusterVariantManaged) -> Self {
3050        Self {
3051            size: managed.size,
3052            availability_zones: managed.availability_zones,
3053            logging: managed.logging,
3054            replication_factor: managed.replication_factor,
3055            optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3056            schedule: managed.schedule,
3057        }
3058    }
3059}
3060
3061impl From<durable::ClusterVariantManaged> for ClusterVariantManaged {
3062    fn from(managed: durable::ClusterVariantManaged) -> Self {
3063        Self {
3064            size: managed.size,
3065            availability_zones: managed.availability_zones,
3066            logging: managed.logging,
3067            replication_factor: managed.replication_factor,
3068            optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3069            schedule: managed.schedule,
3070        }
3071    }
3072}
3073
3074#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3075pub enum ClusterVariant {
3076    Managed(ClusterVariantManaged),
3077    Unmanaged,
3078}
3079
3080impl From<ClusterVariant> for durable::ClusterVariant {
3081    fn from(variant: ClusterVariant) -> Self {
3082        match variant {
3083            ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3084            ClusterVariant::Unmanaged => Self::Unmanaged,
3085        }
3086    }
3087}
3088
3089impl From<durable::ClusterVariant> for ClusterVariant {
3090    fn from(variant: durable::ClusterVariant) -> Self {
3091        match variant {
3092            durable::ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3093            durable::ClusterVariant::Unmanaged => Self::Unmanaged,
3094        }
3095    }
3096}
3097
3098impl mz_sql::catalog::CatalogDatabase for Database {
3099    fn name(&self) -> &str {
3100        &self.name
3101    }
3102
3103    fn id(&self) -> DatabaseId {
3104        self.id
3105    }
3106
3107    fn has_schemas(&self) -> bool {
3108        !self.schemas_by_name.is_empty()
3109    }
3110
3111    fn schema_ids(&self) -> &BTreeMap<String, SchemaId> {
3112        &self.schemas_by_name
3113    }
3114
3115    // `as` is ok to use to cast to a trait object.
3116    #[allow(clippy::as_conversions)]
3117    fn schemas(&self) -> Vec<&dyn CatalogSchema> {
3118        self.schemas_by_id
3119            .values()
3120            .map(|schema| schema as &dyn CatalogSchema)
3121            .collect()
3122    }
3123
3124    fn owner_id(&self) -> RoleId {
3125        self.owner_id
3126    }
3127
3128    fn privileges(&self) -> &PrivilegeMap {
3129        &self.privileges
3130    }
3131}
3132
3133impl mz_sql::catalog::CatalogSchema for Schema {
3134    fn database(&self) -> &ResolvedDatabaseSpecifier {
3135        &self.name.database
3136    }
3137
3138    fn name(&self) -> &QualifiedSchemaName {
3139        &self.name
3140    }
3141
3142    fn id(&self) -> &SchemaSpecifier {
3143        &self.id
3144    }
3145
3146    fn has_items(&self) -> bool {
3147        !self.items.is_empty()
3148    }
3149
3150    fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_> {
3151        Box::new(
3152            self.items
3153                .values()
3154                .chain(self.functions.values())
3155                .chain(self.types.values())
3156                .copied(),
3157        )
3158    }
3159
3160    fn owner_id(&self) -> RoleId {
3161        self.owner_id
3162    }
3163
3164    fn privileges(&self) -> &PrivilegeMap {
3165        &self.privileges
3166    }
3167}
3168
3169impl mz_sql::catalog::CatalogRole for Role {
3170    fn name(&self) -> &str {
3171        &self.name
3172    }
3173
3174    fn id(&self) -> RoleId {
3175        self.id
3176    }
3177
3178    fn membership(&self) -> &BTreeMap<RoleId, RoleId> {
3179        &self.membership.map
3180    }
3181
3182    fn attributes(&self) -> &RoleAttributes {
3183        &self.attributes
3184    }
3185
3186    fn vars(&self) -> &BTreeMap<String, OwnedVarInput> {
3187        &self.vars.map
3188    }
3189}
3190
3191impl mz_sql::catalog::CatalogNetworkPolicy for NetworkPolicy {
3192    fn name(&self) -> &str {
3193        &self.name
3194    }
3195
3196    fn id(&self) -> NetworkPolicyId {
3197        self.id
3198    }
3199
3200    fn owner_id(&self) -> RoleId {
3201        self.owner_id
3202    }
3203
3204    fn privileges(&self) -> &PrivilegeMap {
3205        &self.privileges
3206    }
3207}
3208
3209impl mz_sql::catalog::CatalogCluster<'_> for Cluster {
3210    fn name(&self) -> &str {
3211        &self.name
3212    }
3213
3214    fn id(&self) -> ClusterId {
3215        self.id
3216    }
3217
3218    fn bound_objects(&self) -> &BTreeSet<CatalogItemId> {
3219        &self.bound_objects
3220    }
3221
3222    fn replica_ids(&self) -> &BTreeMap<String, ReplicaId> {
3223        &self.replica_id_by_name_
3224    }
3225
3226    // `as` is ok to use to cast to a trait object.
3227    #[allow(clippy::as_conversions)]
3228    fn replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>> {
3229        self.replicas()
3230            .map(|replica| replica as &dyn CatalogClusterReplica)
3231            .collect()
3232    }
3233
3234    fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica<'_> {
3235        self.replica(id).expect("catalog out of sync")
3236    }
3237
3238    fn owner_id(&self) -> RoleId {
3239        self.owner_id
3240    }
3241
3242    fn privileges(&self) -> &PrivilegeMap {
3243        &self.privileges
3244    }
3245
3246    fn is_managed(&self) -> bool {
3247        self.is_managed()
3248    }
3249
3250    fn managed_size(&self) -> Option<&str> {
3251        match &self.config.variant {
3252            ClusterVariant::Managed(ClusterVariantManaged { size, .. }) => Some(size),
3253            _ => None,
3254        }
3255    }
3256
3257    fn schedule(&self) -> Option<&ClusterSchedule> {
3258        match &self.config.variant {
3259            ClusterVariant::Managed(ClusterVariantManaged { schedule, .. }) => Some(schedule),
3260            _ => None,
3261        }
3262    }
3263
3264    fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
3265        self.try_to_plan()
3266    }
3267}
3268
3269impl mz_sql::catalog::CatalogClusterReplica<'_> for ClusterReplica {
3270    fn name(&self) -> &str {
3271        &self.name
3272    }
3273
3274    fn cluster_id(&self) -> ClusterId {
3275        self.cluster_id
3276    }
3277
3278    fn replica_id(&self) -> ReplicaId {
3279        self.replica_id
3280    }
3281
3282    fn owner_id(&self) -> RoleId {
3283        self.owner_id
3284    }
3285
3286    fn internal(&self) -> bool {
3287        self.config.location.internal()
3288    }
3289}
3290
3291impl mz_sql::catalog::CatalogItem for CatalogEntry {
3292    fn name(&self) -> &QualifiedItemName {
3293        self.name()
3294    }
3295
3296    fn id(&self) -> CatalogItemId {
3297        self.id()
3298    }
3299
3300    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
3301        Box::new(self.global_ids())
3302    }
3303
3304    fn oid(&self) -> u32 {
3305        self.oid()
3306    }
3307
3308    fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
3309        self.func()
3310    }
3311
3312    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
3313        self.source_desc()
3314    }
3315
3316    fn connection(
3317        &self,
3318    ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
3319    {
3320        Ok(self.connection()?.details.to_connection())
3321    }
3322
3323    fn create_sql(&self) -> &str {
3324        match self.item() {
3325            CatalogItem::Table(Table { create_sql, .. }) => {
3326                create_sql.as_deref().unwrap_or("<builtin>")
3327            }
3328            CatalogItem::Source(Source { create_sql, .. }) => {
3329                create_sql.as_deref().unwrap_or("<builtin>")
3330            }
3331            CatalogItem::Sink(Sink { create_sql, .. }) => create_sql,
3332            CatalogItem::View(View { create_sql, .. }) => create_sql,
3333            CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) => create_sql,
3334            CatalogItem::Index(Index { create_sql, .. }) => create_sql,
3335            CatalogItem::Type(Type { create_sql, .. }) => {
3336                create_sql.as_deref().unwrap_or("<builtin>")
3337            }
3338            CatalogItem::Secret(Secret { create_sql, .. }) => create_sql,
3339            CatalogItem::Connection(Connection { create_sql, .. }) => create_sql,
3340            CatalogItem::Func(_) => "<builtin>",
3341            CatalogItem::Log(_) => "<builtin>",
3342            CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => create_sql,
3343        }
3344    }
3345
3346    fn item_type(&self) -> SqlCatalogItemType {
3347        self.item().typ()
3348    }
3349
3350    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
3351        if let CatalogItem::Index(Index { keys, on, .. }) = self.item() {
3352            Some((keys, *on))
3353        } else {
3354            None
3355        }
3356    }
3357
3358    fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
3359        if let CatalogItem::Table(Table {
3360            data_source: TableDataSource::TableWrites { defaults },
3361            ..
3362        }) = self.item()
3363        {
3364            Some(defaults.as_slice())
3365        } else {
3366            None
3367        }
3368    }
3369
3370    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
3371        if let CatalogItem::Type(Type { details, .. }) = self.item() {
3372            Some(details)
3373        } else {
3374            None
3375        }
3376    }
3377
3378    fn references(&self) -> &ResolvedIds {
3379        self.references()
3380    }
3381
3382    fn uses(&self) -> BTreeSet<CatalogItemId> {
3383        self.uses()
3384    }
3385
3386    fn referenced_by(&self) -> &[CatalogItemId] {
3387        self.referenced_by()
3388    }
3389
3390    fn used_by(&self) -> &[CatalogItemId] {
3391        self.used_by()
3392    }
3393
3394    fn subsource_details(
3395        &self,
3396    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
3397        self.subsource_details()
3398    }
3399
3400    fn source_export_details(
3401        &self,
3402    ) -> Option<(
3403        CatalogItemId,
3404        &UnresolvedItemName,
3405        &SourceExportDetails,
3406        &SourceExportDataConfig<ReferencedConnection>,
3407    )> {
3408        self.source_export_details()
3409    }
3410
3411    fn is_progress_source(&self) -> bool {
3412        self.is_progress_source()
3413    }
3414
3415    fn progress_id(&self) -> Option<CatalogItemId> {
3416        self.progress_id()
3417    }
3418
3419    fn owner_id(&self) -> RoleId {
3420        self.owner_id
3421    }
3422
3423    fn privileges(&self) -> &PrivilegeMap {
3424        &self.privileges
3425    }
3426
3427    fn cluster_id(&self) -> Option<ClusterId> {
3428        self.item().cluster_id()
3429    }
3430
3431    fn at_version(
3432        &self,
3433        version: RelationVersionSelector,
3434    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
3435        Box::new(CatalogCollectionEntry {
3436            entry: self.clone(),
3437            version,
3438        })
3439    }
3440
3441    fn latest_version(&self) -> Option<RelationVersion> {
3442        self.table().map(|t| t.desc.latest_version())
3443    }
3444}
3445
3446/// A single update to the catalog state.
3447#[derive(Debug)]
3448pub struct StateUpdate {
3449    pub kind: StateUpdateKind,
3450    pub ts: Timestamp,
3451    pub diff: StateDiff,
3452}
3453
3454/// The contents of a single state update.
3455///
3456/// Variants are listed in dependency order.
3457#[derive(Debug, Clone)]
3458pub enum StateUpdateKind {
3459    Role(durable::objects::Role),
3460    RoleAuth(durable::objects::RoleAuth),
3461    Database(durable::objects::Database),
3462    Schema(durable::objects::Schema),
3463    DefaultPrivilege(durable::objects::DefaultPrivilege),
3464    SystemPrivilege(MzAclItem),
3465    SystemConfiguration(durable::objects::SystemConfiguration),
3466    Cluster(durable::objects::Cluster),
3467    NetworkPolicy(durable::objects::NetworkPolicy),
3468    IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3469    ClusterReplica(durable::objects::ClusterReplica),
3470    SourceReferences(durable::objects::SourceReferences),
3471    SystemObjectMapping(durable::objects::SystemObjectMapping),
3472    // Temporary items are not actually updated via the durable catalog, but this allows us to
3473    // model them the same way as all other items.
3474    TemporaryItem(TemporaryItem),
3475    Item(durable::objects::Item),
3476    Comment(durable::objects::Comment),
3477    AuditLog(durable::objects::AuditLog),
3478    // Storage updates.
3479    StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3480    UnfinalizedShard(durable::objects::UnfinalizedShard),
3481}
3482
3483/// Valid diffs for catalog state updates.
3484#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
3485pub enum StateDiff {
3486    Retraction,
3487    Addition,
3488}
3489
3490impl From<StateDiff> for Diff {
3491    fn from(diff: StateDiff) -> Self {
3492        match diff {
3493            StateDiff::Retraction => Diff::MINUS_ONE,
3494            StateDiff::Addition => Diff::ONE,
3495        }
3496    }
3497}
3498impl TryFrom<Diff> for StateDiff {
3499    type Error = String;
3500
3501    fn try_from(diff: Diff) -> Result<Self, Self::Error> {
3502        match diff {
3503            Diff::MINUS_ONE => Ok(Self::Retraction),
3504            Diff::ONE => Ok(Self::Addition),
3505            diff => Err(format!("invalid diff {diff}")),
3506        }
3507    }
3508}
3509
3510/// Information needed to process an update to a temporary item.
3511#[derive(Debug, Clone)]
3512pub struct TemporaryItem {
3513    pub id: CatalogItemId,
3514    pub oid: u32,
3515    pub name: QualifiedItemName,
3516    pub item: CatalogItem,
3517    pub owner_id: RoleId,
3518    pub privileges: PrivilegeMap,
3519}
3520
3521impl From<CatalogEntry> for TemporaryItem {
3522    fn from(entry: CatalogEntry) -> Self {
3523        TemporaryItem {
3524            id: entry.id,
3525            oid: entry.oid,
3526            name: entry.name,
3527            item: entry.item,
3528            owner_id: entry.owner_id,
3529            privileges: entry.privileges,
3530        }
3531    }
3532}
3533
3534/// The same as [`StateUpdateKind`], but without `TemporaryItem` so we can derive [`Ord`].
3535#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
3536pub enum BootstrapStateUpdateKind {
3537    Role(durable::objects::Role),
3538    RoleAuth(durable::objects::RoleAuth),
3539    Database(durable::objects::Database),
3540    Schema(durable::objects::Schema),
3541    DefaultPrivilege(durable::objects::DefaultPrivilege),
3542    SystemPrivilege(MzAclItem),
3543    SystemConfiguration(durable::objects::SystemConfiguration),
3544    Cluster(durable::objects::Cluster),
3545    NetworkPolicy(durable::objects::NetworkPolicy),
3546    IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3547    ClusterReplica(durable::objects::ClusterReplica),
3548    SourceReferences(durable::objects::SourceReferences),
3549    SystemObjectMapping(durable::objects::SystemObjectMapping),
3550    Item(durable::objects::Item),
3551    Comment(durable::objects::Comment),
3552    AuditLog(durable::objects::AuditLog),
3553    // Storage updates.
3554    StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3555    UnfinalizedShard(durable::objects::UnfinalizedShard),
3556}
3557
3558impl From<BootstrapStateUpdateKind> for StateUpdateKind {
3559    fn from(value: BootstrapStateUpdateKind) -> Self {
3560        match value {
3561            BootstrapStateUpdateKind::Role(kind) => StateUpdateKind::Role(kind),
3562            BootstrapStateUpdateKind::RoleAuth(kind) => StateUpdateKind::RoleAuth(kind),
3563            BootstrapStateUpdateKind::Database(kind) => StateUpdateKind::Database(kind),
3564            BootstrapStateUpdateKind::Schema(kind) => StateUpdateKind::Schema(kind),
3565            BootstrapStateUpdateKind::DefaultPrivilege(kind) => {
3566                StateUpdateKind::DefaultPrivilege(kind)
3567            }
3568            BootstrapStateUpdateKind::SystemPrivilege(kind) => {
3569                StateUpdateKind::SystemPrivilege(kind)
3570            }
3571            BootstrapStateUpdateKind::SystemConfiguration(kind) => {
3572                StateUpdateKind::SystemConfiguration(kind)
3573            }
3574            BootstrapStateUpdateKind::SourceReferences(kind) => {
3575                StateUpdateKind::SourceReferences(kind)
3576            }
3577            BootstrapStateUpdateKind::Cluster(kind) => StateUpdateKind::Cluster(kind),
3578            BootstrapStateUpdateKind::NetworkPolicy(kind) => StateUpdateKind::NetworkPolicy(kind),
3579            BootstrapStateUpdateKind::IntrospectionSourceIndex(kind) => {
3580                StateUpdateKind::IntrospectionSourceIndex(kind)
3581            }
3582            BootstrapStateUpdateKind::ClusterReplica(kind) => StateUpdateKind::ClusterReplica(kind),
3583            BootstrapStateUpdateKind::SystemObjectMapping(kind) => {
3584                StateUpdateKind::SystemObjectMapping(kind)
3585            }
3586            BootstrapStateUpdateKind::Item(kind) => StateUpdateKind::Item(kind),
3587            BootstrapStateUpdateKind::Comment(kind) => StateUpdateKind::Comment(kind),
3588            BootstrapStateUpdateKind::AuditLog(kind) => StateUpdateKind::AuditLog(kind),
3589            BootstrapStateUpdateKind::StorageCollectionMetadata(kind) => {
3590                StateUpdateKind::StorageCollectionMetadata(kind)
3591            }
3592            BootstrapStateUpdateKind::UnfinalizedShard(kind) => {
3593                StateUpdateKind::UnfinalizedShard(kind)
3594            }
3595        }
3596    }
3597}
3598
3599impl TryFrom<StateUpdateKind> for BootstrapStateUpdateKind {
3600    type Error = TemporaryItem;
3601
3602    fn try_from(value: StateUpdateKind) -> Result<Self, Self::Error> {
3603        match value {
3604            StateUpdateKind::Role(kind) => Ok(BootstrapStateUpdateKind::Role(kind)),
3605            StateUpdateKind::RoleAuth(kind) => Ok(BootstrapStateUpdateKind::RoleAuth(kind)),
3606            StateUpdateKind::Database(kind) => Ok(BootstrapStateUpdateKind::Database(kind)),
3607            StateUpdateKind::Schema(kind) => Ok(BootstrapStateUpdateKind::Schema(kind)),
3608            StateUpdateKind::DefaultPrivilege(kind) => {
3609                Ok(BootstrapStateUpdateKind::DefaultPrivilege(kind))
3610            }
3611            StateUpdateKind::SystemPrivilege(kind) => {
3612                Ok(BootstrapStateUpdateKind::SystemPrivilege(kind))
3613            }
3614            StateUpdateKind::SystemConfiguration(kind) => {
3615                Ok(BootstrapStateUpdateKind::SystemConfiguration(kind))
3616            }
3617            StateUpdateKind::Cluster(kind) => Ok(BootstrapStateUpdateKind::Cluster(kind)),
3618            StateUpdateKind::NetworkPolicy(kind) => {
3619                Ok(BootstrapStateUpdateKind::NetworkPolicy(kind))
3620            }
3621            StateUpdateKind::IntrospectionSourceIndex(kind) => {
3622                Ok(BootstrapStateUpdateKind::IntrospectionSourceIndex(kind))
3623            }
3624            StateUpdateKind::ClusterReplica(kind) => {
3625                Ok(BootstrapStateUpdateKind::ClusterReplica(kind))
3626            }
3627            StateUpdateKind::SourceReferences(kind) => {
3628                Ok(BootstrapStateUpdateKind::SourceReferences(kind))
3629            }
3630            StateUpdateKind::SystemObjectMapping(kind) => {
3631                Ok(BootstrapStateUpdateKind::SystemObjectMapping(kind))
3632            }
3633            StateUpdateKind::TemporaryItem(kind) => Err(kind),
3634            StateUpdateKind::Item(kind) => Ok(BootstrapStateUpdateKind::Item(kind)),
3635            StateUpdateKind::Comment(kind) => Ok(BootstrapStateUpdateKind::Comment(kind)),
3636            StateUpdateKind::AuditLog(kind) => Ok(BootstrapStateUpdateKind::AuditLog(kind)),
3637            StateUpdateKind::StorageCollectionMetadata(kind) => {
3638                Ok(BootstrapStateUpdateKind::StorageCollectionMetadata(kind))
3639            }
3640            StateUpdateKind::UnfinalizedShard(kind) => {
3641                Ok(BootstrapStateUpdateKind::UnfinalizedShard(kind))
3642            }
3643        }
3644    }
3645}