Skip to main content

mz_catalog/memory/
objects.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The current types used by the in-memory Catalog. Many of the objects in this module are
11//! extremely similar to the objects found in [`crate::durable::objects`] but in a format that is
12//! easier consumed by higher layers.
13
14use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use chrono::{DateTime, Utc};
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_compute_client::logging::LogVariant;
24use mz_compute_types::dataflows::DataflowDescription;
25use mz_compute_types::plan::Plan as ComputePlan;
26use mz_controller::clusters::{ClusterRole, ClusterStatus, ReplicaConfig, ReplicaLogging};
27use mz_controller_types::{ClusterId, ReplicaId};
28use mz_expr::{MirScalarExpr, OptimizedMirRelationExpr};
29use mz_ore::collections::CollectionExt;
30use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
31use mz_repr::network_policy_id::NetworkPolicyId;
32use mz_repr::optimize::OptimizerFeatureOverrides;
33use mz_repr::refresh_schedule::RefreshSchedule;
34use mz_repr::role_id::RoleId;
35use mz_repr::{
36    CatalogItemId, ColumnName, Diff, GlobalId, RelationDesc, RelationVersion,
37    RelationVersionSelector, SqlColumnType, Timestamp, VersionedRelationDesc,
38};
39use mz_sql::ast::display::AstDisplay;
40use mz_sql::ast::{
41    ColumnDef, ColumnOption, ColumnOptionDef, ColumnVersioned, Expr, Raw, RawDataType, Statement,
42    UnresolvedItemName, Value, WithOptionValue,
43};
44use mz_sql::catalog::{
45    CatalogClusterReplica, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem,
46    CatalogItemType as SqlCatalogItemType, CatalogItemType, CatalogSchema, CatalogType,
47    CatalogTypeDetails, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference,
48    RoleAttributes, RoleMembership, RoleVars, SystemObjectType,
49};
50use mz_sql::names::{
51    Aug, CommentObjectId, DatabaseId, DependencyIds, FullItemName, QualifiedItemName,
52    QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier,
53};
54use mz_sql::plan::{
55    AutoScalingStrategy, ClusterSchedule, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig,
56    ConnectionDetails, CreateClusterManagedPlan, CreateClusterPlan, CreateClusterVariant,
57    CreateSourcePlan, HirRelationExpr, NetworkPolicyRule, OnTimeoutAction, PlanError,
58    WebhookBodyFormat, WebhookHeaders, WebhookValidation,
59};
60use mz_sql::rbac;
61use mz_sql::session::vars::OwnedVarInput;
62use mz_storage_client::controller::IntrospectionType;
63use mz_storage_types::connections::inline::ReferencedConnection;
64use mz_storage_types::sinks::{SinkEnvelope, StorageSinkConnection};
65use mz_storage_types::sources::load_generator::LoadGenerator;
66use mz_storage_types::sources::{
67    GenericSourceConnection, SourceConnection, SourceDesc, SourceEnvelope, SourceExportDataConfig,
68    SourceExportDetails, Timeline,
69};
70use mz_transform::dataflow::DataflowMetainfo;
71use mz_transform::notice::OptimizerNotice;
72use serde::ser::SerializeSeq;
73use serde::{Deserialize, Serialize};
74use timely::progress::Antichain;
75use tracing::debug;
76
77use crate::builtin::{MZ_CATALOG_SERVER_CLUSTER, MZ_SYSTEM_CLUSTER};
78use crate::durable;
79use crate::durable::objects::item_type;
80
81/// Used to update `self` from the input value while consuming the input value.
82pub trait UpdateFrom<T>: From<T> {
83    fn update_from(&mut self, from: T);
84}
85
86#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
87pub struct Database {
88    pub name: String,
89    pub id: DatabaseId,
90    pub oid: u32,
91    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
92    pub schemas_by_id: BTreeMap<SchemaId, Schema>,
93    pub schemas_by_name: BTreeMap<String, SchemaId>,
94    pub owner_id: RoleId,
95    pub privileges: PrivilegeMap,
96}
97
98impl From<Database> for durable::Database {
99    fn from(database: Database) -> durable::Database {
100        durable::Database {
101            id: database.id,
102            oid: database.oid,
103            name: database.name,
104            owner_id: database.owner_id,
105            privileges: database.privileges.into_all_values().collect(),
106        }
107    }
108}
109
110impl From<durable::Database> for Database {
111    fn from(
112        durable::Database {
113            id,
114            oid,
115            name,
116            owner_id,
117            privileges,
118        }: durable::Database,
119    ) -> Database {
120        Database {
121            id,
122            oid,
123            schemas_by_id: BTreeMap::new(),
124            schemas_by_name: BTreeMap::new(),
125            name,
126            owner_id,
127            privileges: PrivilegeMap::from_mz_acl_items(privileges),
128        }
129    }
130}
131
132impl UpdateFrom<durable::Database> for Database {
133    fn update_from(
134        &mut self,
135        durable::Database {
136            id,
137            oid,
138            name,
139            owner_id,
140            privileges,
141        }: durable::Database,
142    ) {
143        self.id = id;
144        self.oid = oid;
145        self.name = name;
146        self.owner_id = owner_id;
147        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
148    }
149}
150
151#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
152pub struct Schema {
153    pub name: QualifiedSchemaName,
154    pub id: SchemaSpecifier,
155    pub oid: u32,
156    pub items: BTreeMap<String, CatalogItemId>,
157    pub functions: BTreeMap<String, CatalogItemId>,
158    pub types: BTreeMap<String, CatalogItemId>,
159    pub owner_id: RoleId,
160    pub privileges: PrivilegeMap,
161}
162
163impl From<Schema> for durable::Schema {
164    fn from(schema: Schema) -> durable::Schema {
165        durable::Schema {
166            id: schema.id.into(),
167            oid: schema.oid,
168            name: schema.name.schema,
169            database_id: schema.name.database.id(),
170            owner_id: schema.owner_id,
171            privileges: schema.privileges.into_all_values().collect(),
172        }
173    }
174}
175
176impl From<durable::Schema> for Schema {
177    fn from(
178        durable::Schema {
179            id,
180            oid,
181            name,
182            database_id,
183            owner_id,
184            privileges,
185        }: durable::Schema,
186    ) -> Schema {
187        Schema {
188            name: QualifiedSchemaName {
189                database: database_id.into(),
190                schema: name,
191            },
192            id: id.into(),
193            oid,
194            items: BTreeMap::new(),
195            functions: BTreeMap::new(),
196            types: BTreeMap::new(),
197            owner_id,
198            privileges: PrivilegeMap::from_mz_acl_items(privileges),
199        }
200    }
201}
202
203impl UpdateFrom<durable::Schema> for Schema {
204    fn update_from(
205        &mut self,
206        durable::Schema {
207            id,
208            oid,
209            name,
210            database_id,
211            owner_id,
212            privileges,
213        }: durable::Schema,
214    ) {
215        self.name = QualifiedSchemaName {
216            database: database_id.into(),
217            schema: name,
218        };
219        self.id = id.into();
220        self.oid = oid;
221        self.owner_id = owner_id;
222        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
223    }
224}
225
226#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
227pub struct Role {
228    pub name: String,
229    pub id: RoleId,
230    pub oid: u32,
231    pub attributes: RoleAttributes,
232    pub membership: RoleMembership,
233    pub vars: RoleVars,
234}
235
236impl Role {
237    pub fn is_user(&self) -> bool {
238        self.id.is_user()
239    }
240
241    pub fn vars<'a>(&'a self) -> impl Iterator<Item = (&'a str, &'a OwnedVarInput)> {
242        self.vars.map.iter().map(|(name, val)| (name.as_str(), val))
243    }
244}
245
246impl From<Role> for durable::Role {
247    fn from(role: Role) -> durable::Role {
248        durable::Role {
249            id: role.id,
250            oid: role.oid,
251            name: role.name,
252            attributes: role.attributes,
253            membership: role.membership,
254            vars: role.vars,
255        }
256    }
257}
258
259impl From<durable::Role> for Role {
260    fn from(
261        durable::Role {
262            id,
263            oid,
264            name,
265            attributes,
266            membership,
267            vars,
268        }: durable::Role,
269    ) -> Self {
270        Role {
271            name,
272            id,
273            oid,
274            attributes,
275            membership,
276            vars,
277        }
278    }
279}
280
281impl UpdateFrom<durable::Role> for Role {
282    fn update_from(
283        &mut self,
284        durable::Role {
285            id,
286            oid,
287            name,
288            attributes,
289            membership,
290            vars,
291        }: durable::Role,
292    ) {
293        self.id = id;
294        self.oid = oid;
295        self.name = name;
296        self.attributes = attributes;
297        self.membership = membership;
298        self.vars = vars;
299    }
300}
301
302#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
303pub struct RoleAuth {
304    pub role_id: RoleId,
305    pub password_hash: Option<String>,
306    pub updated_at: u64,
307}
308
309impl From<RoleAuth> for durable::RoleAuth {
310    fn from(role_auth: RoleAuth) -> durable::RoleAuth {
311        durable::RoleAuth {
312            role_id: role_auth.role_id,
313            password_hash: role_auth.password_hash,
314            updated_at: role_auth.updated_at,
315        }
316    }
317}
318
319impl From<durable::RoleAuth> for RoleAuth {
320    fn from(
321        durable::RoleAuth {
322            role_id,
323            password_hash,
324            updated_at,
325        }: durable::RoleAuth,
326    ) -> RoleAuth {
327        RoleAuth {
328            role_id,
329            password_hash,
330            updated_at,
331        }
332    }
333}
334
335impl UpdateFrom<durable::RoleAuth> for RoleAuth {
336    fn update_from(&mut self, from: durable::RoleAuth) {
337        self.role_id = from.role_id;
338        self.password_hash = from.password_hash;
339        self.updated_at = from.updated_at;
340    }
341}
342
343#[derive(Debug, Serialize, Clone, PartialEq)]
344pub struct Cluster {
345    pub name: String,
346    pub id: ClusterId,
347    pub config: ClusterConfig,
348    #[serde(skip)]
349    pub log_indexes: BTreeMap<LogVariant, GlobalId>,
350    /// Objects bound to this cluster. Does not include introspection source
351    /// indexes.
352    pub bound_objects: BTreeSet<CatalogItemId>,
353    pub replica_id_by_name_: BTreeMap<String, ReplicaId>,
354    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
355    pub replicas_by_id_: BTreeMap<ReplicaId, ClusterReplica>,
356    pub owner_id: RoleId,
357    pub privileges: PrivilegeMap,
358}
359
360impl Cluster {
361    /// The role of the cluster. Currently used to set alert severity.
362    pub fn role(&self) -> ClusterRole {
363        // NOTE - These roles power monitoring systems. Do not change
364        // them without talking to the cloud or observability groups.
365        if self.name == MZ_SYSTEM_CLUSTER.name {
366            ClusterRole::SystemCritical
367        } else if self.name == MZ_CATALOG_SERVER_CLUSTER.name {
368            ClusterRole::System
369        } else {
370            ClusterRole::User
371        }
372    }
373
374    /// Returns `true` if the cluster is a managed cluster.
375    pub fn is_managed(&self) -> bool {
376        matches!(self.config.variant, ClusterVariant::Managed { .. })
377    }
378
379    /// Lists the user replicas, which are those that do not have the internal flag set.
380    pub fn user_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
381        self.replicas().filter(|r| !r.config.location.internal())
382    }
383
384    /// Lists all replicas in the cluster
385    pub fn replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
386        self.replicas_by_id_.values()
387    }
388
389    /// Lookup a replica by ID.
390    pub fn replica(&self, replica_id: ReplicaId) -> Option<&ClusterReplica> {
391        self.replicas_by_id_.get(&replica_id)
392    }
393
394    /// Lookup a replica ID by name.
395    pub fn replica_id(&self, name: &str) -> Option<ReplicaId> {
396        self.replica_id_by_name_.get(name).copied()
397    }
398
399    /// Returns the availability zones of this cluster, if they exist.
400    pub fn availability_zones(&self) -> Option<&[String]> {
401        match &self.config.variant {
402            ClusterVariant::Managed(managed) => Some(&managed.availability_zones),
403            ClusterVariant::Unmanaged => None,
404        }
405    }
406
407    pub fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
408        let name = self.name.clone();
409        let variant = match &self.config.variant {
410            ClusterVariant::Managed(ClusterVariantManaged {
411                size,
412                availability_zones,
413                logging,
414                replication_factor,
415                optimizer_feature_overrides,
416                schedule,
417                // Not surfaced in the create plan: these are controller-managed
418                // runtime state and (for the policy) a separate SQL surface.
419                auto_scaling_strategy: _,
420                reconfiguration: _,
421                burst: _,
422            }) => {
423                let introspection = match logging {
424                    ReplicaLogging {
425                        log_logging,
426                        interval: Some(interval),
427                    } => Some(ComputeReplicaIntrospectionConfig {
428                        debugging: *log_logging,
429                        interval: interval.clone(),
430                    }),
431                    ReplicaLogging {
432                        log_logging: _,
433                        interval: None,
434                    } => None,
435                };
436                let compute = ComputeReplicaConfig { introspection };
437                CreateClusterVariant::Managed(CreateClusterManagedPlan {
438                    replication_factor: replication_factor.clone(),
439                    size: size.clone(),
440                    availability_zones: availability_zones.clone(),
441                    compute,
442                    optimizer_feature_overrides: optimizer_feature_overrides.clone(),
443                    schedule: schedule.clone(),
444                })
445            }
446            ClusterVariant::Unmanaged => {
447                // Unmanaged clusters are deprecated, so hopefully we can remove
448                // them before we have to implement this.
449                return Err(PlanError::Unsupported {
450                    feature: "SHOW CREATE for unmanaged clusters".to_string(),
451                    discussion_no: None,
452                });
453            }
454        };
455        let workload_class = self.config.workload_class.clone();
456        Ok(CreateClusterPlan {
457            name,
458            variant,
459            workload_class,
460        })
461    }
462}
463
464impl From<Cluster> for durable::Cluster {
465    fn from(cluster: Cluster) -> durable::Cluster {
466        durable::Cluster {
467            id: cluster.id,
468            name: cluster.name,
469            owner_id: cluster.owner_id,
470            privileges: cluster.privileges.into_all_values().collect(),
471            config: cluster.config.into(),
472        }
473    }
474}
475
476impl From<durable::Cluster> for Cluster {
477    fn from(
478        durable::Cluster {
479            id,
480            name,
481            owner_id,
482            privileges,
483            config,
484        }: durable::Cluster,
485    ) -> Self {
486        Cluster {
487            name: name.clone(),
488            id,
489            bound_objects: BTreeSet::new(),
490            log_indexes: BTreeMap::new(),
491            replica_id_by_name_: BTreeMap::new(),
492            replicas_by_id_: BTreeMap::new(),
493            owner_id,
494            privileges: PrivilegeMap::from_mz_acl_items(privileges),
495            config: config.into(),
496        }
497    }
498}
499
500impl UpdateFrom<durable::Cluster> for Cluster {
501    fn update_from(
502        &mut self,
503        durable::Cluster {
504            id,
505            name,
506            owner_id,
507            privileges,
508            config,
509        }: durable::Cluster,
510    ) {
511        self.id = id;
512        self.name = name;
513        self.owner_id = owner_id;
514        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
515        self.config = config.into();
516    }
517}
518
519#[derive(Debug, Serialize, Clone, PartialEq)]
520pub struct ClusterReplica {
521    pub name: String,
522    pub cluster_id: ClusterId,
523    pub replica_id: ReplicaId,
524    pub config: ReplicaConfig,
525    pub owner_id: RoleId,
526}
527
528impl From<ClusterReplica> for durable::ClusterReplica {
529    fn from(replica: ClusterReplica) -> durable::ClusterReplica {
530        durable::ClusterReplica {
531            cluster_id: replica.cluster_id,
532            replica_id: replica.replica_id,
533            name: replica.name,
534            config: replica.config.into(),
535            owner_id: replica.owner_id,
536        }
537    }
538}
539
540#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
541pub struct ClusterReplicaProcessStatus {
542    pub status: ClusterStatus,
543    pub time: DateTime<Utc>,
544}
545
546#[derive(Debug, Serialize, Clone, PartialEq)]
547pub struct SourceReferences {
548    pub updated_at: u64,
549    pub references: Vec<SourceReference>,
550}
551
552#[derive(Debug, Serialize, Clone, PartialEq)]
553pub struct SourceReference {
554    pub name: String,
555    pub namespace: Option<String>,
556    pub columns: Vec<String>,
557}
558
559impl From<SourceReference> for durable::SourceReference {
560    fn from(source_reference: SourceReference) -> durable::SourceReference {
561        durable::SourceReference {
562            name: source_reference.name,
563            namespace: source_reference.namespace,
564            columns: source_reference.columns,
565        }
566    }
567}
568
569impl SourceReferences {
570    pub fn to_durable(self, source_id: CatalogItemId) -> durable::SourceReferences {
571        durable::SourceReferences {
572            source_id,
573            updated_at: self.updated_at,
574            references: self.references.into_iter().map(Into::into).collect(),
575        }
576    }
577}
578
579impl From<durable::SourceReference> for SourceReference {
580    fn from(source_reference: durable::SourceReference) -> SourceReference {
581        SourceReference {
582            name: source_reference.name,
583            namespace: source_reference.namespace,
584            columns: source_reference.columns,
585        }
586    }
587}
588
589impl From<durable::SourceReferences> for SourceReferences {
590    fn from(source_references: durable::SourceReferences) -> SourceReferences {
591        SourceReferences {
592            updated_at: source_references.updated_at,
593            references: source_references
594                .references
595                .into_iter()
596                .map(|source_reference| source_reference.into())
597                .collect(),
598        }
599    }
600}
601
602impl From<mz_sql::plan::SourceReference> for SourceReference {
603    fn from(source_reference: mz_sql::plan::SourceReference) -> SourceReference {
604        SourceReference {
605            name: source_reference.name,
606            namespace: source_reference.namespace,
607            columns: source_reference.columns,
608        }
609    }
610}
611
612impl From<mz_sql::plan::SourceReferences> for SourceReferences {
613    fn from(source_references: mz_sql::plan::SourceReferences) -> SourceReferences {
614        SourceReferences {
615            updated_at: source_references.updated_at,
616            references: source_references
617                .references
618                .into_iter()
619                .map(|source_reference| source_reference.into())
620                .collect(),
621        }
622    }
623}
624
625impl From<SourceReferences> for mz_sql::plan::SourceReferences {
626    fn from(source_references: SourceReferences) -> mz_sql::plan::SourceReferences {
627        mz_sql::plan::SourceReferences {
628            updated_at: source_references.updated_at,
629            references: source_references
630                .references
631                .into_iter()
632                .map(|source_reference| source_reference.into())
633                .collect(),
634        }
635    }
636}
637
638impl From<SourceReference> for mz_sql::plan::SourceReference {
639    fn from(source_reference: SourceReference) -> mz_sql::plan::SourceReference {
640        mz_sql::plan::SourceReference {
641            name: source_reference.name,
642            namespace: source_reference.namespace,
643            columns: source_reference.columns,
644        }
645    }
646}
647
648#[derive(Clone, Debug, Serialize)]
649pub struct CatalogEntry {
650    pub item: CatalogItem,
651    #[serde(skip)]
652    pub referenced_by: Vec<CatalogItemId>,
653    // TODO(database-issues#7922)––this should have an invariant tied to it that all
654    // dependents (i.e. entries in this field) have IDs greater than this
655    // entry's ID.
656    #[serde(skip)]
657    pub used_by: Vec<CatalogItemId>,
658    pub id: CatalogItemId,
659    pub oid: u32,
660    pub name: QualifiedItemName,
661    pub owner_id: RoleId,
662    pub privileges: PrivilegeMap,
663}
664
665/// A [`CatalogEntry`] that is associated with a specific "collection" of data.
666/// A single item in the catalog may be associated with multiple "collections".
667///
668/// Here "collection" generally means a pTVC, e.g. a Persist Shard, an Index, a
669/// currently running dataflow, etc.
670///
671/// Items in the Catalog have a stable name -> ID mapping, in other words for
672/// the entire lifetime of an object its [`CatalogItemId`] will _never_ change.
673/// Similarly, we need to maintain a stable mapping from [`GlobalId`] to pTVC.
674/// This presents a challenge when `ALTER`-ing an object, e.g. adding columns
675/// to a table. We can't just change the schema of the underlying Persist Shard
676/// because that would be rebinding the [`GlobalId`] of the pTVC. Instead we
677/// allocate a new [`GlobalId`] to refer to the new version of the table, and
678/// then the [`CatalogEntry`] tracks the [`GlobalId`] for each version.
679#[derive(Clone, Debug)]
680pub struct CatalogCollectionEntry {
681    pub entry: CatalogEntry,
682    pub version: RelationVersionSelector,
683}
684
685impl CatalogCollectionEntry {
686    pub fn relation_desc(&self) -> Option<Cow<'_, RelationDesc>> {
687        self.item().relation_desc(self.version)
688    }
689}
690
691impl mz_sql::catalog::CatalogCollectionItem for CatalogCollectionEntry {
692    fn relation_desc(&self) -> Option<Cow<'_, RelationDesc>> {
693        self.item().relation_desc(self.version)
694    }
695
696    fn global_id(&self) -> GlobalId {
697        self.entry
698            .item()
699            .global_id_for_version(self.version)
700            .expect("catalog corruption, missing version!")
701    }
702}
703
704impl Deref for CatalogCollectionEntry {
705    type Target = CatalogEntry;
706
707    fn deref(&self) -> &CatalogEntry {
708        &self.entry
709    }
710}
711
712impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry {
713    fn name(&self) -> &QualifiedItemName {
714        self.entry.name()
715    }
716
717    fn id(&self) -> CatalogItemId {
718        self.entry.id()
719    }
720
721    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
722        Box::new(self.entry.global_ids())
723    }
724
725    fn oid(&self) -> u32 {
726        self.entry.oid()
727    }
728
729    fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
730        self.entry.func()
731    }
732
733    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
734        self.entry.source_desc()
735    }
736
737    fn connection(
738        &self,
739    ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
740    {
741        mz_sql::catalog::CatalogItem::connection(&self.entry)
742    }
743
744    fn create_sql(&self) -> &str {
745        self.entry.create_sql()
746    }
747
748    fn item_type(&self) -> SqlCatalogItemType {
749        self.entry.item_type()
750    }
751
752    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
753        self.entry.index_details()
754    }
755
756    fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
757        self.entry.writable_table_details()
758    }
759
760    fn replacement_target(&self) -> Option<CatalogItemId> {
761        self.entry.replacement_target()
762    }
763
764    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
765        self.entry.type_details()
766    }
767
768    fn references(&self) -> &ResolvedIds {
769        self.entry.references()
770    }
771
772    fn uses(&self) -> BTreeSet<CatalogItemId> {
773        self.entry.uses()
774    }
775
776    fn referenced_by(&self) -> &[CatalogItemId] {
777        self.entry.referenced_by()
778    }
779
780    fn used_by(&self) -> &[CatalogItemId] {
781        self.entry.used_by()
782    }
783
784    fn subsource_details(
785        &self,
786    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
787        self.entry.subsource_details()
788    }
789
790    fn source_export_details(
791        &self,
792    ) -> Option<(
793        CatalogItemId,
794        &UnresolvedItemName,
795        &SourceExportDetails,
796        &SourceExportDataConfig<ReferencedConnection>,
797    )> {
798        self.entry.source_export_details()
799    }
800
801    fn is_progress_source(&self) -> bool {
802        self.entry.is_progress_source()
803    }
804
805    fn progress_id(&self) -> Option<CatalogItemId> {
806        self.entry.progress_id()
807    }
808
809    fn owner_id(&self) -> RoleId {
810        *self.entry.owner_id()
811    }
812
813    fn privileges(&self) -> &PrivilegeMap {
814        self.entry.privileges()
815    }
816
817    fn cluster_id(&self) -> Option<ClusterId> {
818        self.entry.item().cluster_id()
819    }
820
821    fn at_version(
822        &self,
823        version: RelationVersionSelector,
824    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
825        Box::new(CatalogCollectionEntry {
826            entry: self.entry.clone(),
827            version,
828        })
829    }
830
831    fn latest_version(&self) -> Option<RelationVersion> {
832        self.entry.latest_version()
833    }
834}
835
836#[derive(Debug, Clone, Serialize)]
837pub enum CatalogItem {
838    Table(Table),
839    Source(Source),
840    Log(Log),
841    View(View),
842    MaterializedView(MaterializedView),
843    Sink(Sink),
844    Index(Index),
845    Type(Type),
846    Func(Func),
847    Secret(Secret),
848    Connection(Connection),
849}
850
851impl From<CatalogEntry> for durable::Item {
852    fn from(entry: CatalogEntry) -> durable::Item {
853        let (create_sql, global_id, extra_versions) = entry.item.into_serialized();
854        durable::Item {
855            id: entry.id,
856            oid: entry.oid,
857            global_id,
858            schema_id: entry.name.qualifiers.schema_spec.into(),
859            name: entry.name.item,
860            create_sql,
861            owner_id: entry.owner_id,
862            privileges: entry.privileges.into_all_values().collect(),
863            extra_versions,
864        }
865    }
866}
867
868#[derive(Debug, Clone, Serialize)]
869pub struct Table {
870    /// Parse-able SQL that defines this table.
871    pub create_sql: Option<String>,
872    /// [`VersionedRelationDesc`] of this table, derived from the `create_sql`.
873    pub desc: VersionedRelationDesc,
874    /// Versions of this table, and the [`GlobalId`]s that refer to them.
875    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
876    pub collections: BTreeMap<RelationVersion, GlobalId>,
877    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
878    #[serde(skip)]
879    pub conn_id: Option<ConnectionId>,
880    /// Other catalog objects referenced by this table, e.g. custom types.
881    pub resolved_ids: ResolvedIds,
882    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
883    pub custom_logical_compaction_window: Option<CompactionWindow>,
884    /// Whether the table's logical compaction window is controlled by the ['metrics_retention']
885    /// session variable.
886    ///
887    /// ['metrics_retention']: mz_sql::session::vars::METRICS_RETENTION
888    pub is_retained_metrics_object: bool,
889    /// Where data for this table comes from, e.g. `INSERT` statements or an upstream source.
890    pub data_source: TableDataSource,
891}
892
893impl Table {
894    pub fn timeline(&self) -> Timeline {
895        match &self.data_source {
896            // The Coordinator controls insertions for writable tables
897            // (including system tables), so they are realtime.
898            TableDataSource::TableWrites { .. } => Timeline::EpochMilliseconds,
899            TableDataSource::DataSource { timeline, .. } => timeline.clone(),
900        }
901    }
902
903    /// Returns all of the [`GlobalId`]s that this [`Table`] can be referenced by.
904    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
905        self.collections.values().copied()
906    }
907
908    /// Returns the latest [`GlobalId`] for this [`Table`] which should be used for writes.
909    pub fn global_id_writes(&self) -> GlobalId {
910        *self
911            .collections
912            .last_key_value()
913            .expect("at least one version of a table")
914            .1
915    }
916
917    /// Returns all of the collections and their [`RelationDesc`]s associated with this [`Table`].
918    pub fn collection_descs(
919        &self,
920    ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
921        self.collections.iter().map(|(version, gid)| {
922            let desc = self
923                .desc
924                .at_version(RelationVersionSelector::Specific(*version));
925            (*gid, *version, desc)
926        })
927    }
928
929    /// Returns the [`RelationDesc`] for a specific [`GlobalId`].
930    pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
931        let (version, _gid) = self
932            .collections
933            .iter()
934            .find(|(_version, gid)| *gid == id)
935            .expect("GlobalId to exist");
936        self.desc
937            .at_version(RelationVersionSelector::Specific(*version))
938    }
939}
940
941#[derive(Clone, Debug, Serialize)]
942pub enum TableDataSource {
943    /// The table owns data created via INSERT/UPDATE/DELETE statements.
944    TableWrites {
945        #[serde(skip)]
946        defaults: Vec<Expr<Aug>>,
947    },
948
949    /// The table receives its data from the identified `DataSourceDesc`.
950    /// This table type does not support INSERT/UPDATE/DELETE statements.
951    DataSource {
952        desc: DataSourceDesc,
953        timeline: Timeline,
954    },
955}
956
957#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
958pub enum DataSourceDesc {
959    /// Receives data from an external system
960    Ingestion {
961        desc: SourceDesc<ReferencedConnection>,
962        cluster_id: ClusterId,
963    },
964    /// Receives data from an external system
965    OldSyntaxIngestion {
966        desc: SourceDesc<ReferencedConnection>,
967        cluster_id: ClusterId,
968        // If we're dealing with an old syntax ingestion the progress id will be some other collection
969        // and the ingestion itself will have the data from an external reference
970        progress_subsource: CatalogItemId,
971        data_config: SourceExportDataConfig<ReferencedConnection>,
972        details: SourceExportDetails,
973    },
974    /// This source receives its data from the identified ingestion,
975    /// specifically the output identified by `external_reference`.
976    /// N.B. that `external_reference` should not be used to identify
977    /// anything downstream of purification, as the purification process
978    /// encodes source-specific identifiers into the `details` struct.
979    /// The `external_reference` field is only used here for displaying
980    /// human-readable names in system tables.
981    IngestionExport {
982        ingestion_id: CatalogItemId,
983        external_reference: UnresolvedItemName,
984        details: SourceExportDetails,
985        data_config: SourceExportDataConfig<ReferencedConnection>,
986    },
987    /// Receives introspection data from an internal system
988    Introspection(IntrospectionType),
989    /// Receives data from the source's reclocking/remapping operations.
990    Progress,
991    /// Receives data from HTTP requests.
992    Webhook {
993        /// Optional components used to validation a webhook request.
994        validate_using: Option<WebhookValidation>,
995        /// Describes how we deserialize the body of a webhook request.
996        body_format: WebhookBodyFormat,
997        /// Describes whether or not to include headers and how to map them.
998        headers: WebhookHeaders,
999        /// The cluster which this source is associated with.
1000        cluster_id: ClusterId,
1001    },
1002    /// Exposes the contents of the catalog shard.
1003    Catalog,
1004}
1005
1006impl From<IntrospectionType> for DataSourceDesc {
1007    fn from(typ: IntrospectionType) -> Self {
1008        Self::Introspection(typ)
1009    }
1010}
1011
1012impl DataSourceDesc {
1013    /// The key and value formats of the data source.
1014    pub fn formats(&self) -> (Option<&str>, Option<&str>) {
1015        match &self {
1016            DataSourceDesc::Ingestion { .. } => (None, None),
1017            DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1018                match &data_config.encoding.as_ref() {
1019                    Some(encoding) => match &encoding.key {
1020                        Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1021                        None => (None, Some(encoding.value.type_())),
1022                    },
1023                    None => (None, None),
1024                }
1025            }
1026            DataSourceDesc::IngestionExport { data_config, .. } => match &data_config.encoding {
1027                Some(encoding) => match &encoding.key {
1028                    Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1029                    None => (None, Some(encoding.value.type_())),
1030                },
1031                None => (None, None),
1032            },
1033            DataSourceDesc::Introspection(_)
1034            | DataSourceDesc::Webhook { .. }
1035            | DataSourceDesc::Progress
1036            | DataSourceDesc::Catalog => (None, None),
1037        }
1038    }
1039
1040    /// Envelope of the data source.
1041    pub fn envelope(&self) -> Option<&str> {
1042        // Note how "none"/"append-only" is different from `None`. Source
1043        // sources don't have an envelope (internal logs, for example), while
1044        // other sources have an envelope that we call the "NONE"-envelope.
1045
1046        fn envelope_string(envelope: &SourceEnvelope) -> &str {
1047            match envelope {
1048                SourceEnvelope::None(_) => "none",
1049                SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
1050                    mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert",
1051                    mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
1052                        // NOTE(aljoscha): Should we somehow mark that this is
1053                        // using upsert internally? See note above about
1054                        // DEBEZIUM.
1055                        "debezium"
1056                    }
1057                    mz_storage_types::sources::envelope::UpsertStyle::ValueErrInline { .. } => {
1058                        "upsert-value-err-inline"
1059                    }
1060                },
1061                SourceEnvelope::CdcV2 => {
1062                    // TODO(aljoscha): Should we even report this? It's
1063                    // currently not exposed.
1064                    "materialize"
1065                }
1066            }
1067        }
1068
1069        match self {
1070            // NOTE(aljoscha): We could move the block for ingestions into
1071            // `SourceEnvelope` itself, but that one feels more like an internal
1072            // thing and adapter should own how we represent envelopes as a
1073            // string? It would not be hard to convince me otherwise, though.
1074            DataSourceDesc::Ingestion { .. } => None,
1075            DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1076                Some(envelope_string(&data_config.envelope))
1077            }
1078            DataSourceDesc::IngestionExport { data_config, .. } => {
1079                Some(envelope_string(&data_config.envelope))
1080            }
1081            DataSourceDesc::Introspection(_)
1082            | DataSourceDesc::Webhook { .. }
1083            | DataSourceDesc::Progress
1084            | DataSourceDesc::Catalog => None,
1085        }
1086    }
1087}
1088
1089#[derive(Debug, Clone, Serialize)]
1090pub struct Source {
1091    /// Parse-able SQL that defines this table.
1092    pub create_sql: Option<String>,
1093    /// [`GlobalId`] used to reference this source from outside the catalog.
1094    pub global_id: GlobalId,
1095    // TODO: Unskip: currently blocked on some inner BTreeMap<X, _> problems.
1096    #[serde(skip)]
1097    pub data_source: DataSourceDesc,
1098    /// [`RelationDesc`] of this source, derived from the `create_sql`.
1099    pub desc: RelationDesc,
1100    /// The timeline this source exists on.
1101    pub timeline: Timeline,
1102    /// Other catalog objects referenced by this table, e.g. custom types.
1103    pub resolved_ids: ResolvedIds,
1104    /// This value is ignored for subsources, i.e. for
1105    /// [`DataSourceDesc::IngestionExport`]. Instead, it uses the primary
1106    /// sources logical compaction window.
1107    pub custom_logical_compaction_window: Option<CompactionWindow>,
1108    /// Whether the source's logical compaction window is controlled by
1109    /// METRICS_RETENTION
1110    pub is_retained_metrics_object: bool,
1111}
1112
1113impl Source {
1114    /// Creates a new `Source`.
1115    ///
1116    /// # Panics
1117    /// - If an ingestion-based plan is not given a cluster_id.
1118    /// - If a non-ingestion-based source has a defined cluster config in its plan.
1119    /// - If a non-ingestion-based source is given a cluster_id.
1120    pub fn new(
1121        plan: CreateSourcePlan,
1122        global_id: GlobalId,
1123        resolved_ids: ResolvedIds,
1124        custom_logical_compaction_window: Option<CompactionWindow>,
1125        is_retained_metrics_object: bool,
1126    ) -> Source {
1127        Source {
1128            create_sql: Some(plan.source.create_sql),
1129            data_source: match plan.source.data_source {
1130                mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1131                    desc,
1132                    cluster_id: plan
1133                        .in_cluster
1134                        .expect("ingestion-based sources must be given a cluster ID"),
1135                },
1136                mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1137                    desc,
1138                    progress_subsource,
1139                    data_config,
1140                    details,
1141                } => DataSourceDesc::OldSyntaxIngestion {
1142                    desc,
1143                    cluster_id: plan
1144                        .in_cluster
1145                        .expect("ingestion-based sources must be given a cluster ID"),
1146                    progress_subsource,
1147                    data_config,
1148                    details,
1149                },
1150                mz_sql::plan::DataSourceDesc::Progress => {
1151                    assert!(
1152                        plan.in_cluster.is_none(),
1153                        "subsources must not have a host config or cluster_id defined"
1154                    );
1155                    DataSourceDesc::Progress
1156                }
1157                mz_sql::plan::DataSourceDesc::IngestionExport {
1158                    ingestion_id,
1159                    external_reference,
1160                    details,
1161                    data_config,
1162                } => {
1163                    assert!(
1164                        plan.in_cluster.is_none(),
1165                        "subsources must not have a host config or cluster_id defined"
1166                    );
1167                    DataSourceDesc::IngestionExport {
1168                        ingestion_id,
1169                        external_reference,
1170                        details,
1171                        data_config,
1172                    }
1173                }
1174                mz_sql::plan::DataSourceDesc::Webhook {
1175                    validate_using,
1176                    body_format,
1177                    headers,
1178                    cluster_id,
1179                } => {
1180                    mz_ore::soft_assert_or_log!(
1181                        cluster_id.is_none(),
1182                        "cluster_id set at Source level for Webhooks"
1183                    );
1184                    DataSourceDesc::Webhook {
1185                        validate_using,
1186                        body_format,
1187                        headers,
1188                        cluster_id: plan
1189                            .in_cluster
1190                            .expect("webhook sources must be given a cluster ID"),
1191                    }
1192                }
1193            },
1194            desc: plan.source.desc,
1195            global_id,
1196            timeline: plan.timeline,
1197            resolved_ids,
1198            custom_logical_compaction_window: plan
1199                .source
1200                .compaction_window
1201                .or(custom_logical_compaction_window),
1202            is_retained_metrics_object,
1203        }
1204    }
1205
1206    /// Type of the source.
1207    pub fn source_type(&self) -> &str {
1208        match &self.data_source {
1209            DataSourceDesc::Ingestion { desc, .. }
1210            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.name(),
1211            DataSourceDesc::Progress => "progress",
1212            DataSourceDesc::IngestionExport { .. } => "subsource",
1213            DataSourceDesc::Introspection(_) | DataSourceDesc::Catalog => "source",
1214            DataSourceDesc::Webhook { .. } => "webhook",
1215        }
1216    }
1217
1218    /// Connection ID of the source, if one exists.
1219    pub fn connection_id(&self) -> Option<CatalogItemId> {
1220        match &self.data_source {
1221            DataSourceDesc::Ingestion { desc, .. }
1222            | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.connection_id(),
1223            DataSourceDesc::IngestionExport { .. }
1224            | DataSourceDesc::Introspection(_)
1225            | DataSourceDesc::Webhook { .. }
1226            | DataSourceDesc::Progress
1227            | DataSourceDesc::Catalog => None,
1228        }
1229    }
1230
1231    /// The single [`GlobalId`] that refers to this Source.
1232    pub fn global_id(&self) -> GlobalId {
1233        self.global_id
1234    }
1235
1236    /// The expensive resource that each source consumes is persist shards. To
1237    /// prevent abuse, we want to prevent users from creating sources that use an
1238    /// unbounded number of persist shards. But we also don't want to count
1239    /// persist shards that are mandated by the system (e.g., the progress
1240    /// shard) so that future versions of Materialize can introduce additional
1241    /// per-source shards (e.g., a per-source status shard) without impacting
1242    /// the limit calculation.
1243    pub fn user_controllable_persist_shard_count(&self) -> i64 {
1244        match &self.data_source {
1245            DataSourceDesc::Ingestion { .. } => 0,
1246            DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1247                match &desc.connection {
1248                    // These multi-output sources do not use their primary
1249                    // source's data shard, so we don't include it in accounting
1250                    // for users.
1251                    GenericSourceConnection::Postgres(_)
1252                    | GenericSourceConnection::MySql(_)
1253                    | GenericSourceConnection::SqlServer(_) => 0,
1254                    GenericSourceConnection::LoadGenerator(lg) => match lg.load_generator {
1255                        // Load generators that output data in their primary shard
1256                        LoadGenerator::Clock
1257                        | LoadGenerator::Counter { .. }
1258                        | LoadGenerator::Datums
1259                        | LoadGenerator::KeyValue(_) => 1,
1260                        LoadGenerator::Auction
1261                        | LoadGenerator::Marketing
1262                        | LoadGenerator::Tpch { .. } => 0,
1263                    },
1264                    GenericSourceConnection::Kafka(_) => 1,
1265                }
1266            }
1267            //  DataSourceDesc::IngestionExport represents a subsource, which
1268            //  use a data shard.
1269            DataSourceDesc::IngestionExport { .. } => 1,
1270            DataSourceDesc::Webhook { .. } => 1,
1271            // Introspection, catalog, and progress subsources are not under the user's control, so
1272            // shouldn't count toward their quota.
1273            DataSourceDesc::Introspection(_)
1274            | DataSourceDesc::Progress
1275            | DataSourceDesc::Catalog => 0,
1276        }
1277    }
1278}
1279
1280#[derive(Debug, Clone, Serialize)]
1281pub struct Log {
1282    /// The category of data this log stores.
1283    pub variant: LogVariant,
1284    /// [`GlobalId`] used to reference this log from outside the catalog.
1285    pub global_id: GlobalId,
1286}
1287
1288impl Log {
1289    /// The single [`GlobalId`] that refers to this Log.
1290    pub fn global_id(&self) -> GlobalId {
1291        self.global_id
1292    }
1293}
1294
1295#[derive(Debug, Clone, Serialize)]
1296pub struct Sink {
1297    /// Parse-able SQL that defines this sink.
1298    pub create_sql: String,
1299    /// [`GlobalId`] used to reference this sink from outside the catalog, e.g storage.
1300    pub global_id: GlobalId,
1301    /// Collection we read into this sink.
1302    pub from: GlobalId,
1303    /// Connection to the external service we're sinking into, e.g. Kafka.
1304    pub connection: StorageSinkConnection<ReferencedConnection>,
1305    /// Envelope we use to sink into the external system.
1306    ///
1307    /// TODO(guswynn): this probably should just be in the `connection`.
1308    pub envelope: SinkEnvelope,
1309    /// Emit an initial snapshot into the sink.
1310    pub with_snapshot: bool,
1311    /// Used to fence other writes into this sink as we evolve the upstream materialized view.
1312    pub version: u64,
1313    /// Other catalog objects this sink references.
1314    pub resolved_ids: ResolvedIds,
1315    /// Cluster this sink runs on.
1316    pub cluster_id: ClusterId,
1317    /// Commit interval for the sink.
1318    pub commit_interval: Option<Duration>,
1319}
1320
1321impl Sink {
1322    pub fn sink_type(&self) -> &str {
1323        self.connection.name()
1324    }
1325
1326    /// Envelope of the sink.
1327    pub fn envelope(&self) -> Option<&str> {
1328        match &self.envelope {
1329            SinkEnvelope::Debezium => Some("debezium"),
1330            SinkEnvelope::Upsert => Some("upsert"),
1331            SinkEnvelope::Append => Some("append"),
1332        }
1333    }
1334
1335    /// Output a combined format string of the sink. For legacy reasons
1336    /// if the key-format is none or the key & value formats are
1337    /// both the same (either avro or json), we return the value format name,
1338    /// otherwise we return a composite name.
1339    pub fn combined_format(&self) -> Option<Cow<'_, str>> {
1340        match &self.connection {
1341            StorageSinkConnection::Kafka(connection) => Some(connection.format.get_format_name()),
1342            StorageSinkConnection::Iceberg(_) => None,
1343        }
1344    }
1345
1346    /// Output distinct key_format and value_format of the sink.
1347    pub fn formats(&self) -> Option<(Option<&str>, &str)> {
1348        match &self.connection {
1349            StorageSinkConnection::Kafka(connection) => {
1350                let key_format = connection
1351                    .format
1352                    .key_format
1353                    .as_ref()
1354                    .map(|f| f.get_format_name());
1355                let value_format = connection.format.value_format.get_format_name();
1356                Some((key_format, value_format))
1357            }
1358            StorageSinkConnection::Iceberg(_) => None,
1359        }
1360    }
1361
1362    pub fn connection_id(&self) -> Option<CatalogItemId> {
1363        self.connection.connection_id()
1364    }
1365
1366    /// The single [`GlobalId`] that this Sink can be referenced by.
1367    pub fn global_id(&self) -> GlobalId {
1368        self.global_id
1369    }
1370}
1371
1372#[derive(Debug, Clone, Serialize)]
1373pub struct View {
1374    /// Parse-able SQL that defines this view.
1375    pub create_sql: String,
1376    /// [`GlobalId`] used to reference this view from outside the catalog, e.g. compute.
1377    pub global_id: GlobalId,
1378    /// Unoptimized high-level expression from parsing the `create_sql`.
1379    pub raw_expr: Arc<HirRelationExpr>,
1380    /// Optimized mid-level expression from (locally) optimizing the `raw_expr`.
1381    pub locally_optimized_expr: Arc<OptimizedMirRelationExpr>,
1382    /// Columns of this view.
1383    pub desc: RelationDesc,
1384    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
1385    pub conn_id: Option<ConnectionId>,
1386    /// Other catalog objects that are referenced by this view, determined at name resolution.
1387    pub resolved_ids: ResolvedIds,
1388    /// All of the catalog objects that are referenced by this view.
1389    pub dependencies: DependencyIds,
1390}
1391
1392impl View {
1393    /// The single [`GlobalId`] this [`View`] can be referenced by.
1394    pub fn global_id(&self) -> GlobalId {
1395        self.global_id
1396    }
1397}
1398
1399#[derive(Debug, Clone, Serialize)]
1400pub struct MaterializedView {
1401    /// Parse-able SQL that defines this materialized view.
1402    pub create_sql: String,
1403    /// Versions of this materialized view, and the [`GlobalId`]s that refer to them.
1404    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
1405    pub collections: BTreeMap<RelationVersion, GlobalId>,
1406    /// Raw high-level expression from planning, derived from the `create_sql`.
1407    pub raw_expr: Arc<HirRelationExpr>,
1408    /// Optimized mid-level expression, derived from the `raw_expr`.
1409    pub locally_optimized_expr: Arc<OptimizedMirRelationExpr>,
1410    /// [`VersionedRelationDesc`] of this materialized view, derived from the `create_sql`.
1411    pub desc: VersionedRelationDesc,
1412    /// Other catalog items that this materialized view references, determined at name resolution.
1413    pub resolved_ids: ResolvedIds,
1414    /// All of the catalog objects that are referenced by this view.
1415    pub dependencies: DependencyIds,
1416    /// ID of the materialized view this materialized view is intended to replace.
1417    pub replacement_target: Option<CatalogItemId>,
1418    /// Cluster that this materialized view runs on.
1419    pub cluster_id: ClusterId,
1420    /// If set, only install this materialized view's dataflow on the specified replica.
1421    pub target_replica: Option<ReplicaId>,
1422    /// Column indexes that we assert are not `NULL`.
1423    ///
1424    /// TODO(parkmycar): Switch this to use the `ColumnIdx` type.
1425    pub non_null_assertions: Vec<usize>,
1426    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
1427    pub custom_logical_compaction_window: Option<CompactionWindow>,
1428    /// Schedule to refresh this materialized view, e.g. set via `REFRESH EVERY` option.
1429    pub refresh_schedule: Option<RefreshSchedule>,
1430    /// The initial `as_of` of the storage collection associated with the materialized view.
1431    ///
1432    /// Note: This doesn't change upon restarts.
1433    /// (The dataflow's initial `as_of` can be different.)
1434    pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1435    // The catalog `dump` method uses serde to serialize catalog state, e.g., Testdrive catalog
1436    // consistency checks do two dumps and compare them. One of these states comes from the durable
1437    // catalog, but the following fields are not restored when the consistency check loads the
1438    // durable catalog, hence we need `#[serde(skip)]`.
1439    /// Optimized global MIR plan, set after global optimization.
1440    #[serde(skip)]
1441    pub optimized_plan: Option<Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
1442    /// Physical (LIR) plan, set after physical optimization.
1443    #[serde(skip)]
1444    pub physical_plan: Option<Arc<DataflowDescription<ComputePlan>>>,
1445    /// Dataflow metainfo (optimizer notices, etc.), set after optimization.
1446    #[serde(skip)]
1447    pub dataflow_metainfo: Option<DataflowMetainfo<Arc<OptimizerNotice>>>,
1448}
1449
1450impl MaterializedView {
1451    /// Returns all [`GlobalId`]s that this [`MaterializedView`] can be referenced by.
1452    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1453        self.collections.values().copied()
1454    }
1455
1456    /// The latest [`GlobalId`] for this [`MaterializedView`] which represents the writing
1457    /// version.
1458    pub fn global_id_writes(&self) -> GlobalId {
1459        *self
1460            .collections
1461            .last_key_value()
1462            .expect("at least one version of a materialized view")
1463            .1
1464    }
1465
1466    /// Returns all collections and their [`RelationDesc`]s associated with this [`MaterializedView`].
1467    pub fn collection_descs(
1468        &self,
1469    ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
1470        self.collections.iter().map(|(version, gid)| {
1471            let desc = self
1472                .desc
1473                .at_version(RelationVersionSelector::Specific(*version));
1474            (*gid, *version, desc)
1475        })
1476    }
1477
1478    /// Returns the [`RelationDesc`] for a specific [`GlobalId`].
1479    pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
1480        let (version, _gid) = self
1481            .collections
1482            .iter()
1483            .find(|(_version, gid)| *gid == id)
1484            .expect("GlobalId to exist");
1485        self.desc
1486            .at_version(RelationVersionSelector::Specific(*version))
1487    }
1488
1489    /// Apply the given replacement materialized view to this [`MaterializedView`].
1490    pub fn apply_replacement(&mut self, replacement: Self) {
1491        let target_id = replacement
1492            .replacement_target
1493            .expect("replacement has target");
1494
1495        fn parse(create_sql: &str) -> mz_sql::ast::CreateMaterializedViewStatement<Raw> {
1496            let res = mz_sql::parse::parse(create_sql).unwrap_or_else(|e| {
1497                panic!("invalid create_sql persisted in catalog: {e}\n{create_sql}");
1498            });
1499            if let Statement::CreateMaterializedView(cmvs) = res.into_element().ast {
1500                cmvs
1501            } else {
1502                panic!("invalid MV create_sql persisted in catalog\n{create_sql}");
1503            }
1504        }
1505
1506        let old_stmt = parse(&self.create_sql);
1507        let rpl_stmt = parse(&replacement.create_sql);
1508        let new_stmt = mz_sql::ast::CreateMaterializedViewStatement {
1509            if_exists: old_stmt.if_exists,
1510            name: old_stmt.name,
1511            columns: rpl_stmt.columns,
1512            replacement_for: None,
1513            in_cluster: rpl_stmt.in_cluster,
1514            in_cluster_replica: rpl_stmt.in_cluster_replica,
1515            query: rpl_stmt.query,
1516            as_of: rpl_stmt.as_of,
1517            with_options: rpl_stmt.with_options,
1518        };
1519        let create_sql = new_stmt.to_ast_string_stable();
1520
1521        let mut collections = std::mem::take(&mut self.collections);
1522        // Note: We can't use `self.desc.latest_version` here because a replacement doesn't
1523        // necessary evolve the relation schema, so that version might be lower than the actual
1524        // latest version.
1525        let latest_version = collections.keys().max().expect("at least one version");
1526        let new_version = latest_version.bump();
1527        collections.insert(new_version, replacement.global_id_writes());
1528
1529        let mut resolved_ids = replacement.resolved_ids;
1530        resolved_ids.remove_item(&target_id);
1531        let mut dependencies = replacement.dependencies;
1532        dependencies.0.remove(&target_id);
1533
1534        *self = Self {
1535            create_sql,
1536            collections,
1537            raw_expr: replacement.raw_expr,
1538            locally_optimized_expr: replacement.locally_optimized_expr,
1539            desc: replacement.desc,
1540            resolved_ids,
1541            dependencies,
1542            replacement_target: None,
1543            cluster_id: replacement.cluster_id,
1544            target_replica: replacement.target_replica,
1545            non_null_assertions: replacement.non_null_assertions,
1546            custom_logical_compaction_window: replacement.custom_logical_compaction_window,
1547            refresh_schedule: replacement.refresh_schedule,
1548            initial_as_of: replacement.initial_as_of,
1549            optimized_plan: replacement.optimized_plan,
1550            physical_plan: replacement.physical_plan,
1551            dataflow_metainfo: replacement.dataflow_metainfo,
1552        };
1553    }
1554}
1555
1556#[derive(Debug, Clone, Serialize)]
1557pub struct Index {
1558    /// Parse-able SQL that defines this table.
1559    pub create_sql: String,
1560    /// [`GlobalId`] used to reference this index from outside the catalog, e.g. compute.
1561    pub global_id: GlobalId,
1562    /// The [`GlobalId`] this Index is on.
1563    pub on: GlobalId,
1564    /// Keys of the index.
1565    pub keys: Arc<[MirScalarExpr]>,
1566    /// If created in the `TEMPORARY` schema, the [`ConnectionId`] for that session.
1567    pub conn_id: Option<ConnectionId>,
1568    /// Other catalog objects referenced by this index, e.g. the object we're indexing.
1569    pub resolved_ids: ResolvedIds,
1570    /// Cluster this index is installed on.
1571    pub cluster_id: ClusterId,
1572    /// Custom compaction window, e.g. set via `ALTER RETAIN HISTORY`.
1573    pub custom_logical_compaction_window: Option<CompactionWindow>,
1574    /// Whether the table's logical compaction window is controlled by the ['metrics_retention']
1575    /// session variable.
1576    ///
1577    /// ['metrics_retention']: mz_sql::session::vars::METRICS_RETENTION
1578    pub is_retained_metrics_object: bool,
1579    // The catalog `dump` method uses serde to serialize catalog state, e.g., Testdrive catalog
1580    // consistency checks do two dumps and compare them. One of these states comes from the durable
1581    // catalog, but the following fields are not restored when the consistency check loads the
1582    // durable catalog, hence we need `#[serde(skip)]`.
1583    /// Optimized global MIR plan, set after global optimization.
1584    #[serde(skip)]
1585    pub optimized_plan: Option<Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
1586    /// Physical (LIR) plan, set after physical optimization.
1587    #[serde(skip)]
1588    pub physical_plan: Option<Arc<DataflowDescription<ComputePlan>>>,
1589    /// Dataflow metainfo (optimizer notices, etc.), set after optimization.
1590    #[serde(skip)]
1591    pub dataflow_metainfo: Option<DataflowMetainfo<Arc<OptimizerNotice>>>,
1592}
1593
1594impl Index {
1595    /// The [`GlobalId`] that refers to this Index.
1596    pub fn global_id(&self) -> GlobalId {
1597        self.global_id
1598    }
1599}
1600
1601#[derive(Debug, Clone, Serialize)]
1602pub struct Type {
1603    /// Parse-able SQL that defines this type.
1604    pub create_sql: Option<String>,
1605    /// [`GlobalId`] used to reference this type from outside the catalog.
1606    pub global_id: GlobalId,
1607    #[serde(skip)]
1608    pub details: CatalogTypeDetails<IdReference>,
1609    /// Other catalog objects referenced by this type.
1610    pub resolved_ids: ResolvedIds,
1611}
1612
1613#[derive(Debug, Clone, Serialize)]
1614pub struct Func {
1615    /// Static definition of the function.
1616    #[serde(skip)]
1617    pub inner: &'static mz_sql::func::Func,
1618    /// [`GlobalId`] used to reference this function from outside the catalog.
1619    pub global_id: GlobalId,
1620}
1621
1622#[derive(Debug, Clone, Serialize)]
1623pub struct Secret {
1624    /// Parse-able SQL that defines this secret.
1625    pub create_sql: String,
1626    /// [`GlobalId`] used to reference this secret from outside the catalog.
1627    pub global_id: GlobalId,
1628}
1629
1630#[derive(Debug, Clone, Serialize)]
1631pub struct Connection {
1632    /// Parse-able SQL that defines this connection.
1633    pub create_sql: String,
1634    /// [`GlobalId`] used to reference this connection from the storage layer.
1635    pub global_id: GlobalId,
1636    /// The kind of connection.
1637    pub details: ConnectionDetails,
1638    /// Other objects this connection depends on.
1639    pub resolved_ids: ResolvedIds,
1640}
1641
1642impl Connection {
1643    /// The single [`GlobalId`] used to reference this connection.
1644    pub fn global_id(&self) -> GlobalId {
1645        self.global_id
1646    }
1647}
1648
1649#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
1650pub struct NetworkPolicy {
1651    pub name: String,
1652    pub id: NetworkPolicyId,
1653    pub oid: u32,
1654    pub rules: Vec<NetworkPolicyRule>,
1655    pub owner_id: RoleId,
1656    pub privileges: PrivilegeMap,
1657}
1658
1659impl From<NetworkPolicy> for durable::NetworkPolicy {
1660    fn from(policy: NetworkPolicy) -> durable::NetworkPolicy {
1661        durable::NetworkPolicy {
1662            id: policy.id,
1663            oid: policy.oid,
1664            name: policy.name,
1665            rules: policy.rules,
1666            owner_id: policy.owner_id,
1667            privileges: policy.privileges.into_all_values().collect(),
1668        }
1669    }
1670}
1671
1672impl From<durable::NetworkPolicy> for NetworkPolicy {
1673    fn from(
1674        durable::NetworkPolicy {
1675            id,
1676            oid,
1677            name,
1678            rules,
1679            owner_id,
1680            privileges,
1681        }: durable::NetworkPolicy,
1682    ) -> Self {
1683        NetworkPolicy {
1684            id,
1685            oid,
1686            name,
1687            rules,
1688            owner_id,
1689            privileges: PrivilegeMap::from_mz_acl_items(privileges),
1690        }
1691    }
1692}
1693
1694impl UpdateFrom<durable::NetworkPolicy> for NetworkPolicy {
1695    fn update_from(
1696        &mut self,
1697        durable::NetworkPolicy {
1698            id,
1699            oid,
1700            name,
1701            rules,
1702            owner_id,
1703            privileges,
1704        }: durable::NetworkPolicy,
1705    ) {
1706        self.id = id;
1707        self.oid = oid;
1708        self.name = name;
1709        self.rules = rules;
1710        self.owner_id = owner_id;
1711        self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1712    }
1713}
1714
1715impl CatalogItem {
1716    /// Returns a string indicating the type of this catalog entry.
1717    pub fn typ(&self) -> mz_sql::catalog::CatalogItemType {
1718        match self {
1719            CatalogItem::Table(_) => CatalogItemType::Table,
1720            CatalogItem::Source(_) => CatalogItemType::Source,
1721            CatalogItem::Log(_) => CatalogItemType::Source,
1722            CatalogItem::Sink(_) => CatalogItemType::Sink,
1723            CatalogItem::View(_) => CatalogItemType::View,
1724            CatalogItem::MaterializedView(_) => CatalogItemType::MaterializedView,
1725            CatalogItem::Index(_) => CatalogItemType::Index,
1726            CatalogItem::Type(_) => CatalogItemType::Type,
1727            CatalogItem::Func(_) => CatalogItemType::Func,
1728            CatalogItem::Secret(_) => CatalogItemType::Secret,
1729            CatalogItem::Connection(_) => CatalogItemType::Connection,
1730        }
1731    }
1732
1733    /// Returns the [`GlobalId`]s that reference this item, if any.
1734    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1735        let gid = match self {
1736            CatalogItem::Source(source) => source.global_id,
1737            CatalogItem::Log(log) => log.global_id,
1738            CatalogItem::Sink(sink) => sink.global_id,
1739            CatalogItem::View(view) => view.global_id,
1740            CatalogItem::MaterializedView(mv) => {
1741                return itertools::Either::Left(mv.collections.values().copied());
1742            }
1743            CatalogItem::Index(index) => index.global_id,
1744            CatalogItem::Func(func) => func.global_id,
1745            CatalogItem::Type(ty) => ty.global_id,
1746            CatalogItem::Secret(secret) => secret.global_id,
1747            CatalogItem::Connection(conn) => conn.global_id,
1748            CatalogItem::Table(table) => {
1749                return itertools::Either::Left(table.collections.values().copied());
1750            }
1751        };
1752        itertools::Either::Right(std::iter::once(gid))
1753    }
1754
1755    /// Returns the most up-to-date [`GlobalId`] for this item.
1756    ///
1757    /// Note: The only type of object that can have multiple [`GlobalId`]s are tables.
1758    pub fn latest_global_id(&self) -> GlobalId {
1759        match self {
1760            CatalogItem::Source(source) => source.global_id,
1761            CatalogItem::Log(log) => log.global_id,
1762            CatalogItem::Sink(sink) => sink.global_id,
1763            CatalogItem::View(view) => view.global_id,
1764            CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
1765            CatalogItem::Index(index) => index.global_id,
1766            CatalogItem::Func(func) => func.global_id,
1767            CatalogItem::Type(ty) => ty.global_id,
1768            CatalogItem::Secret(secret) => secret.global_id,
1769            CatalogItem::Connection(conn) => conn.global_id,
1770            CatalogItem::Table(table) => table.global_id_writes(),
1771        }
1772    }
1773
1774    /// Returns the optimized global MIR plan, if this item has one.
1775    pub fn optimized_plan(&self) -> Option<&Arc<DataflowDescription<OptimizedMirRelationExpr>>> {
1776        match self {
1777            CatalogItem::Index(idx) => idx.optimized_plan.as_ref(),
1778            CatalogItem::MaterializedView(mv) => mv.optimized_plan.as_ref(),
1779            _ => None,
1780        }
1781    }
1782
1783    /// Returns the physical (LIR) plan, if this item has one.
1784    pub fn physical_plan(&self) -> Option<&Arc<DataflowDescription<ComputePlan>>> {
1785        match self {
1786            CatalogItem::Index(idx) => idx.physical_plan.as_ref(),
1787            CatalogItem::MaterializedView(mv) => mv.physical_plan.as_ref(),
1788            _ => None,
1789        }
1790    }
1791
1792    /// Returns the dataflow metainfo, if this item has one.
1793    pub fn dataflow_metainfo(&self) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
1794        match self {
1795            CatalogItem::Index(idx) => idx.dataflow_metainfo.as_ref(),
1796            CatalogItem::MaterializedView(mv) => mv.dataflow_metainfo.as_ref(),
1797            _ => None,
1798        }
1799    }
1800
1801    /// Returns mutable references to the plan fields (`optimized_plan`,
1802    /// `physical_plan`, `dataflow_metainfo`) on plan-bearing items
1803    /// (`Index`, `MaterializedView`), or `None` for
1804    /// other item kinds.
1805    pub fn plan_fields_mut(
1806        &mut self,
1807    ) -> Option<(
1808        &mut Option<Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
1809        &mut Option<Arc<DataflowDescription<ComputePlan>>>,
1810        &mut Option<DataflowMetainfo<Arc<OptimizerNotice>>>,
1811    )> {
1812        match self {
1813            CatalogItem::Index(idx) => Some((
1814                &mut idx.optimized_plan,
1815                &mut idx.physical_plan,
1816                &mut idx.dataflow_metainfo,
1817            )),
1818            CatalogItem::MaterializedView(mv) => Some((
1819                &mut mv.optimized_plan,
1820                &mut mv.physical_plan,
1821                &mut mv.dataflow_metainfo,
1822            )),
1823            _ => None,
1824        }
1825    }
1826
1827    /// Whether this item represents a storage collection.
1828    pub fn is_storage_collection(&self) -> bool {
1829        match self {
1830            CatalogItem::Table(_)
1831            | CatalogItem::Source(_)
1832            | CatalogItem::MaterializedView(_)
1833            | CatalogItem::Sink(_) => true,
1834            CatalogItem::Log(_)
1835            | CatalogItem::View(_)
1836            | CatalogItem::Index(_)
1837            | CatalogItem::Type(_)
1838            | CatalogItem::Func(_)
1839            | CatalogItem::Secret(_)
1840            | CatalogItem::Connection(_) => false,
1841        }
1842    }
1843
1844    /// Returns the [`RelationDesc`] for items that yield rows, at the requested
1845    /// version.
1846    ///
1847    /// Some item types honor `version` so callers can ask for the schema that
1848    /// matches a specific [`GlobalId`] or historical definition. Other relation
1849    /// types ignore `version` because they have a single shape. Non-relational
1850    /// items ( for example functions, indexes, sinks, secrets, and connections)
1851    /// return `None`.
1852    pub fn relation_desc(&self, version: RelationVersionSelector) -> Option<Cow<'_, RelationDesc>> {
1853        match &self {
1854            CatalogItem::Source(src) => Some(Cow::Borrowed(&src.desc)),
1855            CatalogItem::Log(log) => Some(Cow::Owned(log.variant.desc())),
1856            CatalogItem::Table(tbl) => Some(Cow::Owned(tbl.desc.at_version(version))),
1857            CatalogItem::View(view) => Some(Cow::Borrowed(&view.desc)),
1858            CatalogItem::MaterializedView(mview) => {
1859                Some(Cow::Owned(mview.desc.at_version(version)))
1860            }
1861            CatalogItem::Func(_)
1862            | CatalogItem::Index(_)
1863            | CatalogItem::Sink(_)
1864            | CatalogItem::Secret(_)
1865            | CatalogItem::Connection(_)
1866            | CatalogItem::Type(_) => None,
1867        }
1868    }
1869
1870    pub fn func(
1871        &self,
1872        entry: &CatalogEntry,
1873    ) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
1874        match &self {
1875            CatalogItem::Func(func) => Ok(func.inner),
1876            _ => Err(SqlCatalogError::UnexpectedType {
1877                name: entry.name().item.to_string(),
1878                actual_type: entry.item_type(),
1879                expected_type: CatalogItemType::Func,
1880            }),
1881        }
1882    }
1883
1884    pub fn source_desc(
1885        &self,
1886        entry: &CatalogEntry,
1887    ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
1888        match &self {
1889            CatalogItem::Source(source) => match &source.data_source {
1890                DataSourceDesc::Ingestion { desc, .. }
1891                | DataSourceDesc::OldSyntaxIngestion { desc, .. } => Ok(Some(desc)),
1892                DataSourceDesc::IngestionExport { .. }
1893                | DataSourceDesc::Introspection(_)
1894                | DataSourceDesc::Webhook { .. }
1895                | DataSourceDesc::Progress
1896                | DataSourceDesc::Catalog => Ok(None),
1897            },
1898            _ => Err(SqlCatalogError::UnexpectedType {
1899                name: entry.name().item.to_string(),
1900                actual_type: entry.item_type(),
1901                expected_type: CatalogItemType::Source,
1902            }),
1903        }
1904    }
1905
1906    /// Reports whether this catalog entry is a progress source.
1907    pub fn is_progress_source(&self) -> bool {
1908        matches!(
1909            self,
1910            CatalogItem::Source(Source {
1911                data_source: DataSourceDesc::Progress,
1912                ..
1913            })
1914        )
1915    }
1916
1917    /// Collects the identifiers of the objects that were encountered when resolving names in the
1918    /// item's DDL statement.
1919    pub fn references(&self) -> &ResolvedIds {
1920        static EMPTY: LazyLock<ResolvedIds> = LazyLock::new(ResolvedIds::empty);
1921        match self {
1922            CatalogItem::Func(_) => &*EMPTY,
1923            CatalogItem::Index(idx) => &idx.resolved_ids,
1924            CatalogItem::Sink(sink) => &sink.resolved_ids,
1925            CatalogItem::Source(source) => &source.resolved_ids,
1926            CatalogItem::Log(_) => &*EMPTY,
1927            CatalogItem::Table(table) => &table.resolved_ids,
1928            CatalogItem::Type(typ) => &typ.resolved_ids,
1929            CatalogItem::View(view) => &view.resolved_ids,
1930            CatalogItem::MaterializedView(mview) => &mview.resolved_ids,
1931            CatalogItem::Secret(_) => &*EMPTY,
1932            CatalogItem::Connection(connection) => &connection.resolved_ids,
1933        }
1934    }
1935
1936    /// Collects the identifiers of the objects used by this [`CatalogItem`].
1937    ///
1938    /// Like [`CatalogItem::references()`] but also includes objects that are not directly
1939    /// referenced. For example this will include any catalog objects used to implement functions
1940    /// and casts in the item.
1941    pub fn uses(&self) -> BTreeSet<CatalogItemId> {
1942        let mut uses: BTreeSet<_> = self.references().items().copied().collect();
1943        match self {
1944            // TODO(jkosh44) This isn't really correct for functions. They may use other objects in
1945            // their implementation. However, currently there's no way to get that information.
1946            CatalogItem::Func(_) => {}
1947            CatalogItem::Index(_) => {}
1948            CatalogItem::Sink(_) => {}
1949            CatalogItem::Source(_) => {}
1950            CatalogItem::Log(_) => {}
1951            CatalogItem::Table(_) => {}
1952            CatalogItem::Type(_) => {}
1953            CatalogItem::View(view) => uses.extend(view.dependencies.0.iter().copied()),
1954            CatalogItem::MaterializedView(mview) => {
1955                uses.extend(mview.dependencies.0.iter().copied())
1956            }
1957            CatalogItem::Secret(_) => {}
1958            CatalogItem::Connection(_) => {}
1959        }
1960        uses
1961    }
1962
1963    /// Returns the connection ID that this item belongs to, if this item is
1964    /// temporary.
1965    pub fn conn_id(&self) -> Option<&ConnectionId> {
1966        match self {
1967            CatalogItem::View(view) => view.conn_id.as_ref(),
1968            CatalogItem::Index(index) => index.conn_id.as_ref(),
1969            CatalogItem::Table(table) => table.conn_id.as_ref(),
1970            CatalogItem::Log(_)
1971            | CatalogItem::Source(_)
1972            | CatalogItem::Sink(_)
1973            | CatalogItem::MaterializedView(_)
1974            | CatalogItem::Secret(_)
1975            | CatalogItem::Type(_)
1976            | CatalogItem::Func(_)
1977            | CatalogItem::Connection(_) => None,
1978        }
1979    }
1980
1981    /// Sets the connection ID that this item belongs to, which makes it a
1982    /// temporary item.
1983    pub fn set_conn_id(&mut self, conn_id: Option<ConnectionId>) {
1984        match self {
1985            CatalogItem::View(view) => view.conn_id = conn_id,
1986            CatalogItem::Index(index) => index.conn_id = conn_id,
1987            CatalogItem::Table(table) => table.conn_id = conn_id,
1988            CatalogItem::Log(_)
1989            | CatalogItem::Source(_)
1990            | CatalogItem::Sink(_)
1991            | CatalogItem::MaterializedView(_)
1992            | CatalogItem::Secret(_)
1993            | CatalogItem::Type(_)
1994            | CatalogItem::Func(_)
1995            | CatalogItem::Connection(_) => (),
1996        }
1997    }
1998
1999    /// Indicates whether this item is temporary or not.
2000    pub fn is_temporary(&self) -> bool {
2001        self.conn_id().is_some()
2002    }
2003
2004    pub fn rename_schema_refs(
2005        &self,
2006        database_name: &str,
2007        cur_schema_name: &str,
2008        new_schema_name: &str,
2009    ) -> Result<CatalogItem, (String, String)> {
2010        let do_rewrite = |create_sql: String| -> Result<String, (String, String)> {
2011            let mut create_stmt = mz_sql::parse::parse(&create_sql)
2012                .expect("invalid create sql persisted to catalog")
2013                .into_element()
2014                .ast;
2015
2016            // Rename all references to cur_schema_name.
2017            mz_sql::ast::transform::create_stmt_rename_schema_refs(
2018                &mut create_stmt,
2019                database_name,
2020                cur_schema_name,
2021                new_schema_name,
2022            )?;
2023
2024            Ok(create_stmt.to_ast_string_stable())
2025        };
2026
2027        match self {
2028            CatalogItem::Table(i) => {
2029                let mut i = i.clone();
2030                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2031                Ok(CatalogItem::Table(i))
2032            }
2033            CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
2034            CatalogItem::Source(i) => {
2035                let mut i = i.clone();
2036                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2037                Ok(CatalogItem::Source(i))
2038            }
2039            CatalogItem::Sink(i) => {
2040                let mut i = i.clone();
2041                i.create_sql = do_rewrite(i.create_sql)?;
2042                Ok(CatalogItem::Sink(i))
2043            }
2044            CatalogItem::View(i) => {
2045                let mut i = i.clone();
2046                i.create_sql = do_rewrite(i.create_sql)?;
2047                Ok(CatalogItem::View(i))
2048            }
2049            CatalogItem::MaterializedView(i) => {
2050                let mut i = i.clone();
2051                i.create_sql = do_rewrite(i.create_sql)?;
2052                Ok(CatalogItem::MaterializedView(i))
2053            }
2054            CatalogItem::Index(i) => {
2055                let mut i = i.clone();
2056                i.create_sql = do_rewrite(i.create_sql)?;
2057                Ok(CatalogItem::Index(i))
2058            }
2059            CatalogItem::Secret(i) => {
2060                let mut i = i.clone();
2061                i.create_sql = do_rewrite(i.create_sql)?;
2062                Ok(CatalogItem::Secret(i))
2063            }
2064            CatalogItem::Connection(i) => {
2065                let mut i = i.clone();
2066                i.create_sql = do_rewrite(i.create_sql)?;
2067                Ok(CatalogItem::Connection(i))
2068            }
2069            CatalogItem::Type(i) => {
2070                let mut i = i.clone();
2071                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2072                Ok(CatalogItem::Type(i))
2073            }
2074            CatalogItem::Func(i) => Ok(CatalogItem::Func(i.clone())),
2075        }
2076    }
2077
2078    /// Returns a clone of `self` with all instances of `from` renamed to `to`
2079    /// (with the option of including the item's own name) or errors if request
2080    /// is ambiguous.
2081    pub fn rename_item_refs(
2082        &self,
2083        from: FullItemName,
2084        to_item_name: String,
2085        rename_self: bool,
2086    ) -> Result<CatalogItem, String> {
2087        let do_rewrite = |create_sql: String| -> Result<String, String> {
2088            let mut create_stmt = mz_sql::parse::parse(&create_sql)
2089                .expect("invalid create sql persisted to catalog")
2090                .into_element()
2091                .ast;
2092            if rename_self {
2093                mz_sql::ast::transform::create_stmt_rename(&mut create_stmt, to_item_name.clone());
2094            }
2095            // Determination of what constitutes an ambiguous request is done here.
2096            mz_sql::ast::transform::create_stmt_rename_refs(&mut create_stmt, from, to_item_name)?;
2097            Ok(create_stmt.to_ast_string_stable())
2098        };
2099
2100        match self {
2101            CatalogItem::Table(i) => {
2102                let mut i = i.clone();
2103                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2104                Ok(CatalogItem::Table(i))
2105            }
2106            CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
2107            CatalogItem::Source(i) => {
2108                let mut i = i.clone();
2109                i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2110                Ok(CatalogItem::Source(i))
2111            }
2112            CatalogItem::Sink(i) => {
2113                let mut i = i.clone();
2114                i.create_sql = do_rewrite(i.create_sql)?;
2115                Ok(CatalogItem::Sink(i))
2116            }
2117            CatalogItem::View(i) => {
2118                let mut i = i.clone();
2119                i.create_sql = do_rewrite(i.create_sql)?;
2120                Ok(CatalogItem::View(i))
2121            }
2122            CatalogItem::MaterializedView(i) => {
2123                let mut i = i.clone();
2124                i.create_sql = do_rewrite(i.create_sql)?;
2125                Ok(CatalogItem::MaterializedView(i))
2126            }
2127            CatalogItem::Index(i) => {
2128                let mut i = i.clone();
2129                i.create_sql = do_rewrite(i.create_sql)?;
2130                Ok(CatalogItem::Index(i))
2131            }
2132            CatalogItem::Secret(i) => {
2133                let mut i = i.clone();
2134                i.create_sql = do_rewrite(i.create_sql)?;
2135                Ok(CatalogItem::Secret(i))
2136            }
2137            CatalogItem::Func(_) | CatalogItem::Type(_) => {
2138                unreachable!("{}s cannot be renamed", self.typ())
2139            }
2140            CatalogItem::Connection(i) => {
2141                let mut i = i.clone();
2142                i.create_sql = do_rewrite(i.create_sql)?;
2143                Ok(CatalogItem::Connection(i))
2144            }
2145        }
2146    }
2147
2148    /// Returns a clone of `self` with all instances of `old_id` replaced with `new_id`.
2149    pub fn replace_item_refs(&self, old_id: CatalogItemId, new_id: CatalogItemId) -> CatalogItem {
2150        let do_rewrite = |create_sql: String| -> String {
2151            let mut create_stmt = mz_sql::parse::parse(&create_sql)
2152                .expect("invalid create sql persisted to catalog")
2153                .into_element()
2154                .ast;
2155            mz_sql::ast::transform::create_stmt_replace_ids(
2156                &mut create_stmt,
2157                &[(old_id, new_id)].into(),
2158            );
2159            create_stmt.to_ast_string_stable()
2160        };
2161
2162        match self {
2163            CatalogItem::Table(i) => {
2164                let mut i = i.clone();
2165                i.create_sql = i.create_sql.map(do_rewrite);
2166                CatalogItem::Table(i)
2167            }
2168            CatalogItem::Log(i) => CatalogItem::Log(i.clone()),
2169            CatalogItem::Source(i) => {
2170                let mut i = i.clone();
2171                i.create_sql = i.create_sql.map(do_rewrite);
2172                CatalogItem::Source(i)
2173            }
2174            CatalogItem::Sink(i) => {
2175                let mut i = i.clone();
2176                i.create_sql = do_rewrite(i.create_sql);
2177                CatalogItem::Sink(i)
2178            }
2179            CatalogItem::View(i) => {
2180                let mut i = i.clone();
2181                i.create_sql = do_rewrite(i.create_sql);
2182                CatalogItem::View(i)
2183            }
2184            CatalogItem::MaterializedView(i) => {
2185                let mut i = i.clone();
2186                i.create_sql = do_rewrite(i.create_sql);
2187                CatalogItem::MaterializedView(i)
2188            }
2189            CatalogItem::Index(i) => {
2190                let mut i = i.clone();
2191                i.create_sql = do_rewrite(i.create_sql);
2192                CatalogItem::Index(i)
2193            }
2194            CatalogItem::Secret(i) => {
2195                let mut i = i.clone();
2196                i.create_sql = do_rewrite(i.create_sql);
2197                CatalogItem::Secret(i)
2198            }
2199            CatalogItem::Func(_) | CatalogItem::Type(_) => {
2200                unreachable!("references of {}s cannot be replaced", self.typ())
2201            }
2202            CatalogItem::Connection(i) => {
2203                let mut i = i.clone();
2204                i.create_sql = do_rewrite(i.create_sql);
2205                CatalogItem::Connection(i)
2206            }
2207        }
2208    }
2209    /// Updates the retain history for an item. Returns the previous retain history value. Returns
2210    /// an error if this item does not support retain history.
2211    pub fn update_retain_history(
2212        &mut self,
2213        value: Option<Value>,
2214        window: CompactionWindow,
2215    ) -> Result<Option<WithOptionValue<Raw>>, ()> {
2216        let update = |mut ast: &mut Statement<Raw>| {
2217            // Each statement type has unique option types. This macro handles them commonly.
2218            macro_rules! update_retain_history {
2219                ( $stmt:ident, $opt:ident, $name:ident ) => {{
2220                    // Replace or add the option.
2221                    let pos = $stmt
2222                        .with_options
2223                        .iter()
2224                        // In case there are ever multiple, look for the last one.
2225                        .rposition(|o| o.name == mz_sql_parser::ast::$name::RetainHistory);
2226                    if let Some(value) = value {
2227                        let next = mz_sql_parser::ast::$opt {
2228                            name: mz_sql_parser::ast::$name::RetainHistory,
2229                            value: Some(WithOptionValue::RetainHistoryFor(value)),
2230                        };
2231                        if let Some(idx) = pos {
2232                            let previous = $stmt.with_options[idx].clone();
2233                            $stmt.with_options[idx] = next;
2234                            previous.value
2235                        } else {
2236                            $stmt.with_options.push(next);
2237                            None
2238                        }
2239                    } else {
2240                        if let Some(idx) = pos {
2241                            $stmt.with_options.swap_remove(idx).value
2242                        } else {
2243                            None
2244                        }
2245                    }
2246                }};
2247            }
2248            let previous = match &mut ast {
2249                Statement::CreateTable(stmt) => {
2250                    update_retain_history!(stmt, TableOption, TableOptionName)
2251                }
2252                Statement::CreateIndex(stmt) => {
2253                    update_retain_history!(stmt, IndexOption, IndexOptionName)
2254                }
2255                Statement::CreateSource(stmt) => {
2256                    update_retain_history!(stmt, CreateSourceOption, CreateSourceOptionName)
2257                }
2258                Statement::CreateMaterializedView(stmt) => {
2259                    update_retain_history!(stmt, MaterializedViewOption, MaterializedViewOptionName)
2260                }
2261                _ => {
2262                    return Err(());
2263                }
2264            };
2265            Ok(previous)
2266        };
2267
2268        let res = self.update_sql(update)?;
2269        let cw = self
2270            .custom_logical_compaction_window_mut()
2271            .expect("item must have compaction window");
2272        *cw = Some(window);
2273        Ok(res)
2274    }
2275
2276    /// Updates the timestamp interval for a source. Returns the previous timestamp interval
2277    /// value, if any. Returns an error if this item is not a source.
2278    pub fn update_timestamp_interval(
2279        &mut self,
2280        value: Option<Value>,
2281        interval: Duration,
2282    ) -> Result<Option<WithOptionValue<Raw>>, ()> {
2283        let update = |ast: &mut Statement<Raw>| match ast {
2284            Statement::CreateSource(stmt) => {
2285                let pos = stmt.with_options.iter().rposition(|o| {
2286                    o.name == mz_sql_parser::ast::CreateSourceOptionName::TimestampInterval
2287                });
2288                let previous = if let Some(value) = value {
2289                    let next = mz_sql_parser::ast::CreateSourceOption {
2290                        name: mz_sql_parser::ast::CreateSourceOptionName::TimestampInterval,
2291                        value: Some(WithOptionValue::Value(value)),
2292                    };
2293                    if let Some(idx) = pos {
2294                        let previous = stmt.with_options[idx].clone();
2295                        stmt.with_options[idx] = next;
2296                        previous.value
2297                    } else {
2298                        stmt.with_options.push(next);
2299                        None
2300                    }
2301                } else if let Some(idx) = pos {
2302                    stmt.with_options.swap_remove(idx).value
2303                } else {
2304                    None
2305                };
2306                Ok(previous)
2307            }
2308            _ => Err(()),
2309        };
2310
2311        let previous = self.update_sql(update)?;
2312
2313        // Update the in-memory SourceDesc timestamp_interval.
2314        match self {
2315            CatalogItem::Source(source) => {
2316                match &mut source.data_source {
2317                    DataSourceDesc::Ingestion { desc, .. }
2318                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
2319                        desc.timestamp_interval = interval;
2320                    }
2321                    _ => return Err(()),
2322                }
2323                Ok(previous)
2324            }
2325            _ => Err(()),
2326        }
2327    }
2328
2329    pub fn add_column(
2330        &mut self,
2331        name: ColumnName,
2332        typ: SqlColumnType,
2333        sql: RawDataType,
2334    ) -> Result<RelationVersion, PlanError> {
2335        let CatalogItem::Table(table) = self else {
2336            return Err(PlanError::Unsupported {
2337                feature: "adding columns to a non-Table".to_string(),
2338                discussion_no: None,
2339            });
2340        };
2341        let next_version = table.desc.add_column(name.clone(), typ);
2342
2343        let update = |mut ast: &mut Statement<Raw>| match &mut ast {
2344            Statement::CreateTable(stmt) => {
2345                let version = ColumnOptionDef {
2346                    name: None,
2347                    option: ColumnOption::Versioned {
2348                        action: ColumnVersioned::Added,
2349                        version: next_version.into(),
2350                    },
2351                };
2352                let column = ColumnDef {
2353                    name: name.into(),
2354                    data_type: sql,
2355                    collation: None,
2356                    options: vec![version],
2357                };
2358                stmt.columns.push(column);
2359                Ok(())
2360            }
2361            _ => Err(()),
2362        };
2363
2364        self.update_sql(update)
2365            .map_err(|()| PlanError::Unstructured("expected CREATE TABLE statement".to_string()))?;
2366        Ok(next_version)
2367    }
2368
2369    /// Updates the create_sql field of this item. Returns an error if this is a builtin item,
2370    /// otherwise returns f's result.
2371    pub fn update_sql<F, T>(&mut self, f: F) -> Result<T, ()>
2372    where
2373        F: FnOnce(&mut Statement<Raw>) -> Result<T, ()>,
2374    {
2375        let create_sql = match self {
2376            CatalogItem::Table(Table { create_sql, .. })
2377            | CatalogItem::Type(Type { create_sql, .. })
2378            | CatalogItem::Source(Source { create_sql, .. }) => create_sql.as_mut(),
2379            CatalogItem::Sink(Sink { create_sql, .. })
2380            | CatalogItem::View(View { create_sql, .. })
2381            | CatalogItem::MaterializedView(MaterializedView { create_sql, .. })
2382            | CatalogItem::Index(Index { create_sql, .. })
2383            | CatalogItem::Secret(Secret { create_sql, .. })
2384            | CatalogItem::Connection(Connection { create_sql, .. }) => Some(create_sql),
2385            CatalogItem::Func(_) | CatalogItem::Log(_) => None,
2386        };
2387        let Some(create_sql) = create_sql else {
2388            return Err(());
2389        };
2390        let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
2391            .expect("non-system items must be parseable")
2392            .into_element()
2393            .ast;
2394        debug!("rewrite: {}", ast.to_ast_string_redacted());
2395        let t = f(&mut ast)?;
2396        *create_sql = ast.to_ast_string_stable();
2397        debug!("rewrote: {}", ast.to_ast_string_redacted());
2398        Ok(t)
2399    }
2400
2401    /// If the object is considered a "compute object"
2402    /// (i.e., it is managed by the compute controller),
2403    /// this function returns its cluster ID. Otherwise, it returns nothing.
2404    ///
2405    /// This function differs from `cluster_id` because while all
2406    /// compute objects run on a cluster, the converse is not true.
2407    pub fn is_compute_object_on_cluster(&self) -> Option<ClusterId> {
2408        match self {
2409            CatalogItem::Index(index) => Some(index.cluster_id),
2410            CatalogItem::Table(_)
2411            | CatalogItem::Source(_)
2412            | CatalogItem::Log(_)
2413            | CatalogItem::View(_)
2414            | CatalogItem::MaterializedView(_)
2415            | CatalogItem::Sink(_)
2416            | CatalogItem::Type(_)
2417            | CatalogItem::Func(_)
2418            | CatalogItem::Secret(_)
2419            | CatalogItem::Connection(_) => None,
2420        }
2421    }
2422
2423    pub fn cluster_id(&self) -> Option<ClusterId> {
2424        match self {
2425            CatalogItem::MaterializedView(mv) => Some(mv.cluster_id),
2426            CatalogItem::Index(index) => Some(index.cluster_id),
2427            CatalogItem::Source(source) => match &source.data_source {
2428                DataSourceDesc::Ingestion { cluster_id, .. }
2429                | DataSourceDesc::OldSyntaxIngestion { cluster_id, .. } => Some(*cluster_id),
2430                // This is somewhat of a lie because the export runs on the same
2431                // cluster as its ingestion but we don't yet have a way of
2432                // cross-referencing the items
2433                DataSourceDesc::IngestionExport { .. } => None,
2434                DataSourceDesc::Webhook { cluster_id, .. } => Some(*cluster_id),
2435                DataSourceDesc::Introspection(_)
2436                | DataSourceDesc::Progress
2437                | DataSourceDesc::Catalog => None,
2438            },
2439            CatalogItem::Sink(sink) => Some(sink.cluster_id),
2440            CatalogItem::Table(_)
2441            | CatalogItem::Log(_)
2442            | CatalogItem::View(_)
2443            | CatalogItem::Type(_)
2444            | CatalogItem::Func(_)
2445            | CatalogItem::Secret(_)
2446            | CatalogItem::Connection(_) => None,
2447        }
2448    }
2449
2450    /// The custom compaction window, if any has been set. This does not reflect any propagated
2451    /// compaction window (i.e., source -> subsource).
2452    pub fn custom_logical_compaction_window(&self) -> Option<CompactionWindow> {
2453        match self {
2454            CatalogItem::Table(table) => table.custom_logical_compaction_window,
2455            CatalogItem::Source(source) => source.custom_logical_compaction_window,
2456            CatalogItem::Index(index) => index.custom_logical_compaction_window,
2457            CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
2458            CatalogItem::Log(_)
2459            | CatalogItem::View(_)
2460            | CatalogItem::Sink(_)
2461            | CatalogItem::Type(_)
2462            | CatalogItem::Func(_)
2463            | CatalogItem::Secret(_)
2464            | CatalogItem::Connection(_) => None,
2465        }
2466    }
2467
2468    /// Mutable access to the custom compaction window, or None if this type does not support custom
2469    /// compaction windows. This does not reflect any propagated compaction window (i.e., source ->
2470    /// subsource).
2471    pub fn custom_logical_compaction_window_mut(
2472        &mut self,
2473    ) -> Option<&mut Option<CompactionWindow>> {
2474        let cw = match self {
2475            CatalogItem::Table(table) => &mut table.custom_logical_compaction_window,
2476            CatalogItem::Source(source) => &mut source.custom_logical_compaction_window,
2477            CatalogItem::Index(index) => &mut index.custom_logical_compaction_window,
2478            CatalogItem::MaterializedView(mview) => &mut mview.custom_logical_compaction_window,
2479            CatalogItem::Log(_)
2480            | CatalogItem::View(_)
2481            | CatalogItem::Sink(_)
2482            | CatalogItem::Type(_)
2483            | CatalogItem::Func(_)
2484            | CatalogItem::Secret(_)
2485            | CatalogItem::Connection(_) => return None,
2486        };
2487        Some(cw)
2488    }
2489
2490    /// The initial compaction window, for objects that have one; that is, tables, sources, indexes,
2491    /// and MVs. This does not reflect any propagated compaction window (i.e., source -> subsource).
2492    ///
2493    /// If `custom_logical_compaction_window()` returns something, use that.  Otherwise, use a
2494    /// sensible default (currently 1s).
2495    ///
2496    /// For objects that do not have the concept of compaction window, return None.
2497    pub fn initial_logical_compaction_window(&self) -> Option<CompactionWindow> {
2498        let custom_logical_compaction_window = match self {
2499            CatalogItem::Table(_)
2500            | CatalogItem::Source(_)
2501            | CatalogItem::Index(_)
2502            | CatalogItem::MaterializedView(_) => self.custom_logical_compaction_window(),
2503            CatalogItem::Log(_)
2504            | CatalogItem::View(_)
2505            | CatalogItem::Sink(_)
2506            | CatalogItem::Type(_)
2507            | CatalogItem::Func(_)
2508            | CatalogItem::Secret(_)
2509            | CatalogItem::Connection(_) => return None,
2510        };
2511        Some(custom_logical_compaction_window.unwrap_or(CompactionWindow::Default))
2512    }
2513
2514    /// Whether the item's logical compaction window
2515    /// is controlled by the METRICS_RETENTION
2516    /// system var.
2517    pub fn is_retained_metrics_object(&self) -> bool {
2518        match self {
2519            CatalogItem::Table(table) => table.is_retained_metrics_object,
2520            CatalogItem::Source(source) => source.is_retained_metrics_object,
2521            CatalogItem::Index(index) => index.is_retained_metrics_object,
2522            CatalogItem::Log(_)
2523            | CatalogItem::View(_)
2524            | CatalogItem::MaterializedView(_)
2525            | CatalogItem::Sink(_)
2526            | CatalogItem::Type(_)
2527            | CatalogItem::Func(_)
2528            | CatalogItem::Secret(_)
2529            | CatalogItem::Connection(_) => false,
2530        }
2531    }
2532
2533    pub fn to_serialized(&self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2534        match self {
2535            CatalogItem::Table(table) => {
2536                let create_sql = table
2537                    .create_sql
2538                    .clone()
2539                    .expect("builtin tables cannot be serialized");
2540                let mut collections = table.collections.clone();
2541                let global_id = collections
2542                    .remove(&RelationVersion::root())
2543                    .expect("at least one version");
2544                (create_sql, global_id, collections)
2545            }
2546            CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2547            CatalogItem::Source(source) => {
2548                assert!(
2549                    !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2550                    "cannot serialize introspection/builtin sources",
2551                );
2552                let create_sql = source
2553                    .create_sql
2554                    .clone()
2555                    .expect("builtin sources cannot be serialized");
2556                (create_sql, source.global_id, BTreeMap::new())
2557            }
2558            CatalogItem::View(view) => (view.create_sql.clone(), view.global_id, BTreeMap::new()),
2559            CatalogItem::MaterializedView(mview) => {
2560                let mut collections = mview.collections.clone();
2561                let global_id = collections
2562                    .remove(&RelationVersion::root())
2563                    .expect("at least one version");
2564                (mview.create_sql.clone(), global_id, collections)
2565            }
2566            CatalogItem::Index(index) => {
2567                (index.create_sql.clone(), index.global_id, BTreeMap::new())
2568            }
2569            CatalogItem::Sink(sink) => (sink.create_sql.clone(), sink.global_id, BTreeMap::new()),
2570            CatalogItem::Type(typ) => {
2571                let create_sql = typ
2572                    .create_sql
2573                    .clone()
2574                    .expect("builtin types cannot be serialized");
2575                (create_sql, typ.global_id, BTreeMap::new())
2576            }
2577            CatalogItem::Secret(secret) => {
2578                (secret.create_sql.clone(), secret.global_id, BTreeMap::new())
2579            }
2580            CatalogItem::Connection(connection) => (
2581                connection.create_sql.clone(),
2582                connection.global_id,
2583                BTreeMap::new(),
2584            ),
2585            CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2586        }
2587    }
2588
2589    pub fn into_serialized(self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2590        match self {
2591            CatalogItem::Table(mut table) => {
2592                let create_sql = table
2593                    .create_sql
2594                    .expect("builtin tables cannot be serialized");
2595                let global_id = table
2596                    .collections
2597                    .remove(&RelationVersion::root())
2598                    .expect("at least one version");
2599                (create_sql, global_id, table.collections)
2600            }
2601            CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2602            CatalogItem::Source(source) => {
2603                assert!(
2604                    !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2605                    "cannot serialize introspection/builtin sources",
2606                );
2607                let create_sql = source
2608                    .create_sql
2609                    .expect("builtin sources cannot be serialized");
2610                (create_sql, source.global_id, BTreeMap::new())
2611            }
2612            CatalogItem::View(view) => (view.create_sql, view.global_id, BTreeMap::new()),
2613            CatalogItem::MaterializedView(mut mview) => {
2614                let global_id = mview
2615                    .collections
2616                    .remove(&RelationVersion::root())
2617                    .expect("at least one version");
2618                (mview.create_sql, global_id, mview.collections)
2619            }
2620            CatalogItem::Index(index) => (index.create_sql, index.global_id, BTreeMap::new()),
2621            CatalogItem::Sink(sink) => (sink.create_sql, sink.global_id, BTreeMap::new()),
2622            CatalogItem::Type(typ) => {
2623                let create_sql = typ.create_sql.expect("builtin types cannot be serialized");
2624                (create_sql, typ.global_id, BTreeMap::new())
2625            }
2626            CatalogItem::Secret(secret) => (secret.create_sql, secret.global_id, BTreeMap::new()),
2627            CatalogItem::Connection(connection) => {
2628                (connection.create_sql, connection.global_id, BTreeMap::new())
2629            }
2630            CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2631        }
2632    }
2633
2634    /// Returns a global ID for a specific version selector. Returns `None` if the item does
2635    /// not have versions or if the version does not exist.
2636    pub fn global_id_for_version(&self, version: RelationVersionSelector) -> Option<GlobalId> {
2637        let collections = match self {
2638            CatalogItem::MaterializedView(mv) => &mv.collections,
2639            CatalogItem::Table(table) => &table.collections,
2640            CatalogItem::Source(source) => return Some(source.global_id),
2641            CatalogItem::Log(log) => return Some(log.global_id),
2642            CatalogItem::View(view) => return Some(view.global_id),
2643            CatalogItem::Sink(sink) => return Some(sink.global_id),
2644            CatalogItem::Index(index) => return Some(index.global_id),
2645            CatalogItem::Type(ty) => return Some(ty.global_id),
2646            CatalogItem::Func(func) => return Some(func.global_id),
2647            CatalogItem::Secret(secret) => return Some(secret.global_id),
2648            CatalogItem::Connection(conn) => return Some(conn.global_id),
2649        };
2650        match version {
2651            RelationVersionSelector::Latest => collections.values().last().copied(),
2652            RelationVersionSelector::Specific(version) => collections.get(&version).copied(),
2653        }
2654    }
2655}
2656
2657impl CatalogEntry {
2658    /// Reports the latest [`RelationDesc`] of the rows produced by this [`CatalogEntry`], if it
2659    /// produces rows.
2660    pub fn relation_desc_latest(&self) -> Option<Cow<'_, RelationDesc>> {
2661        self.item.relation_desc(RelationVersionSelector::Latest)
2662    }
2663
2664    /// Reports if the item has columns.
2665    pub fn has_columns(&self) -> bool {
2666        match self.item() {
2667            CatalogItem::Type(Type { details, .. }) => {
2668                matches!(details.typ, CatalogType::Record { .. })
2669            }
2670            _ => self.relation_desc_latest().is_some(),
2671        }
2672    }
2673
2674    /// Returns the [`mz_sql::func::Func`] associated with this `CatalogEntry`.
2675    pub fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
2676        self.item.func(self)
2677    }
2678
2679    /// Returns the inner [`Index`] if this entry is an index, else `None`.
2680    pub fn index(&self) -> Option<&Index> {
2681        match self.item() {
2682            CatalogItem::Index(idx) => Some(idx),
2683            _ => None,
2684        }
2685    }
2686
2687    /// Returns the inner [`MaterializedView`] if this entry is a materialized view, else `None`.
2688    pub fn materialized_view(&self) -> Option<&MaterializedView> {
2689        match self.item() {
2690            CatalogItem::MaterializedView(mv) => Some(mv),
2691            _ => None,
2692        }
2693    }
2694
2695    /// Returns the inner [`Table`] if this entry is a table, else `None`.
2696    pub fn table(&self) -> Option<&Table> {
2697        match self.item() {
2698            CatalogItem::Table(tbl) => Some(tbl),
2699            _ => None,
2700        }
2701    }
2702
2703    /// Returns the inner [`Source`] if this entry is a source, else `None`.
2704    pub fn source(&self) -> Option<&Source> {
2705        match self.item() {
2706            CatalogItem::Source(src) => Some(src),
2707            _ => None,
2708        }
2709    }
2710
2711    /// Returns the inner [`Sink`] if this entry is a sink, else `None`.
2712    pub fn sink(&self) -> Option<&Sink> {
2713        match self.item() {
2714            CatalogItem::Sink(sink) => Some(sink),
2715            _ => None,
2716        }
2717    }
2718
2719    /// Returns the inner [`Secret`] if this entry is a secret, else `None`.
2720    pub fn secret(&self) -> Option<&Secret> {
2721        match self.item() {
2722            CatalogItem::Secret(secret) => Some(secret),
2723            _ => None,
2724        }
2725    }
2726
2727    pub fn connection(&self) -> Result<&Connection, SqlCatalogError> {
2728        match self.item() {
2729            CatalogItem::Connection(connection) => Ok(connection),
2730            _ => {
2731                let db_name = match self.name().qualifiers.database_spec {
2732                    ResolvedDatabaseSpecifier::Ambient => "".to_string(),
2733                    ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
2734                };
2735                Err(SqlCatalogError::UnknownConnection(format!(
2736                    "{}{}.{}",
2737                    db_name,
2738                    self.name().qualifiers.schema_spec,
2739                    self.name().item
2740                )))
2741            }
2742        }
2743    }
2744
2745    /// Returns the [`mz_storage_types::sources::SourceDesc`] associated with
2746    /// this `CatalogEntry`, if any.
2747    pub fn source_desc(
2748        &self,
2749    ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
2750        self.item.source_desc(self)
2751    }
2752
2753    /// Reports whether this catalog entry is a connection.
2754    pub fn is_connection(&self) -> bool {
2755        matches!(self.item(), CatalogItem::Connection(_))
2756    }
2757
2758    /// Reports whether this catalog entry is a table.
2759    pub fn is_table(&self) -> bool {
2760        matches!(self.item(), CatalogItem::Table(_))
2761    }
2762
2763    /// Reports whether this catalog entry is a source. Note that this includes
2764    /// subsources.
2765    pub fn is_source(&self) -> bool {
2766        matches!(self.item(), CatalogItem::Source(_))
2767    }
2768
2769    /// Reports whether this catalog entry is a subsource and, if it is, the
2770    /// ingestion it is an export of, as well as the item it exports.
2771    pub fn subsource_details(
2772        &self,
2773    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
2774        match &self.item() {
2775            CatalogItem::Source(source) => match &source.data_source {
2776                DataSourceDesc::IngestionExport {
2777                    ingestion_id,
2778                    external_reference,
2779                    details,
2780                    data_config: _,
2781                } => Some((*ingestion_id, external_reference, details)),
2782                _ => None,
2783            },
2784            _ => None,
2785        }
2786    }
2787
2788    /// Reports whether this catalog entry is a source export and, if it is, the
2789    /// ingestion it is an export of, as well as the item it exports.
2790    pub fn source_export_details(
2791        &self,
2792    ) -> Option<(
2793        CatalogItemId,
2794        &UnresolvedItemName,
2795        &SourceExportDetails,
2796        &SourceExportDataConfig<ReferencedConnection>,
2797    )> {
2798        match &self.item() {
2799            CatalogItem::Source(source) => match &source.data_source {
2800                DataSourceDesc::IngestionExport {
2801                    ingestion_id,
2802                    external_reference,
2803                    details,
2804                    data_config,
2805                } => Some((*ingestion_id, external_reference, details, data_config)),
2806                _ => None,
2807            },
2808            CatalogItem::Table(table) => match &table.data_source {
2809                TableDataSource::DataSource {
2810                    desc:
2811                        DataSourceDesc::IngestionExport {
2812                            ingestion_id,
2813                            external_reference,
2814                            details,
2815                            data_config,
2816                        },
2817                    timeline: _,
2818                } => Some((*ingestion_id, external_reference, details, data_config)),
2819                _ => None,
2820            },
2821            _ => None,
2822        }
2823    }
2824
2825    /// Reports whether this catalog entry is a progress source.
2826    pub fn is_progress_source(&self) -> bool {
2827        self.item().is_progress_source()
2828    }
2829
2830    /// Returns the `GlobalId` of all of this entry's progress ID.
2831    pub fn progress_id(&self) -> Option<CatalogItemId> {
2832        match &self.item() {
2833            CatalogItem::Source(source) => match &source.data_source {
2834                DataSourceDesc::Ingestion { .. } => Some(self.id),
2835                DataSourceDesc::OldSyntaxIngestion {
2836                    progress_subsource, ..
2837                } => Some(*progress_subsource),
2838                DataSourceDesc::IngestionExport { .. }
2839                | DataSourceDesc::Introspection(_)
2840                | DataSourceDesc::Progress
2841                | DataSourceDesc::Webhook { .. }
2842                | DataSourceDesc::Catalog => None,
2843            },
2844            CatalogItem::Table(_)
2845            | CatalogItem::Log(_)
2846            | CatalogItem::View(_)
2847            | CatalogItem::MaterializedView(_)
2848            | CatalogItem::Sink(_)
2849            | CatalogItem::Index(_)
2850            | CatalogItem::Type(_)
2851            | CatalogItem::Func(_)
2852            | CatalogItem::Secret(_)
2853            | CatalogItem::Connection(_) => None,
2854        }
2855    }
2856
2857    /// Reports whether this catalog entry is a sink.
2858    pub fn is_sink(&self) -> bool {
2859        matches!(self.item(), CatalogItem::Sink(_))
2860    }
2861
2862    /// Reports whether this catalog entry is a materialized view.
2863    pub fn is_materialized_view(&self) -> bool {
2864        matches!(self.item(), CatalogItem::MaterializedView(_))
2865    }
2866
2867    /// Reports whether this catalog entry is a view.
2868    pub fn is_view(&self) -> bool {
2869        matches!(self.item(), CatalogItem::View(_))
2870    }
2871
2872    /// Reports whether this catalog entry is a secret.
2873    pub fn is_secret(&self) -> bool {
2874        matches!(self.item(), CatalogItem::Secret(_))
2875    }
2876
2877    /// Reports whether this catalog entry is an introspection source.
2878    pub fn is_introspection_source(&self) -> bool {
2879        matches!(self.item(), CatalogItem::Log(_))
2880    }
2881
2882    /// Reports whether this catalog entry is an index.
2883    pub fn is_index(&self) -> bool {
2884        matches!(self.item(), CatalogItem::Index(_))
2885    }
2886
2887    /// Reports whether this catalog entry can be treated as a relation, it can produce rows.
2888    pub fn is_relation(&self) -> bool {
2889        mz_sql::catalog::ObjectType::from(self.item_type()).is_relation()
2890    }
2891
2892    /// Collects the identifiers of the objects that were encountered when
2893    /// resolving names in the item's DDL statement.
2894    pub fn references(&self) -> &ResolvedIds {
2895        self.item.references()
2896    }
2897
2898    /// Collects the identifiers of the objects used by this [`CatalogEntry`].
2899    ///
2900    /// Like [`CatalogEntry::references()`] but also includes objects that are not directly
2901    /// referenced. For example this will include any catalog objects used to implement functions
2902    /// and casts in the item.
2903    pub fn uses(&self) -> BTreeSet<CatalogItemId> {
2904        self.item.uses()
2905    }
2906
2907    /// Returns the `CatalogItem` associated with this catalog entry.
2908    pub fn item(&self) -> &CatalogItem {
2909        &self.item
2910    }
2911
2912    /// Returns a mutable reference to the `CatalogItem` associated with this
2913    /// catalog entry.
2914    pub fn item_mut(&mut self) -> &mut CatalogItem {
2915        &mut self.item
2916    }
2917
2918    /// Returns the [`CatalogItemId`] of this catalog entry.
2919    pub fn id(&self) -> CatalogItemId {
2920        self.id
2921    }
2922
2923    /// Returns all of the [`GlobalId`]s associated with this item.
2924    pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2925        self.item().global_ids()
2926    }
2927
2928    pub fn latest_global_id(&self) -> GlobalId {
2929        self.item().latest_global_id()
2930    }
2931
2932    /// Returns the OID of this catalog entry.
2933    pub fn oid(&self) -> u32 {
2934        self.oid
2935    }
2936
2937    /// Returns the fully qualified name of this catalog entry.
2938    pub fn name(&self) -> &QualifiedItemName {
2939        &self.name
2940    }
2941
2942    /// Returns the identifiers of the dataflows that are directly referenced by this dataflow.
2943    pub fn referenced_by(&self) -> &[CatalogItemId] {
2944        &self.referenced_by
2945    }
2946
2947    /// Returns the identifiers of the dataflows that depend upon this dataflow.
2948    pub fn used_by(&self) -> &[CatalogItemId] {
2949        &self.used_by
2950    }
2951
2952    /// Returns the connection ID that this item belongs to, if this item is
2953    /// temporary.
2954    pub fn conn_id(&self) -> Option<&ConnectionId> {
2955        self.item.conn_id()
2956    }
2957
2958    /// Returns the role ID of the entry owner.
2959    pub fn owner_id(&self) -> &RoleId {
2960        &self.owner_id
2961    }
2962
2963    /// Returns the privileges of the entry.
2964    pub fn privileges(&self) -> &PrivilegeMap {
2965        &self.privileges
2966    }
2967
2968    /// Returns the comment object ID for this entry.
2969    pub fn comment_object_id(&self) -> CommentObjectId {
2970        use CatalogItemType::*;
2971        match self.item_type() {
2972            Table => CommentObjectId::Table(self.id),
2973            Source => CommentObjectId::Source(self.id),
2974            Sink => CommentObjectId::Sink(self.id),
2975            View => CommentObjectId::View(self.id),
2976            MaterializedView => CommentObjectId::MaterializedView(self.id),
2977            Index => CommentObjectId::Index(self.id),
2978            Func => CommentObjectId::Func(self.id),
2979            Connection => CommentObjectId::Connection(self.id),
2980            Type => CommentObjectId::Type(self.id),
2981            Secret => CommentObjectId::Secret(self.id),
2982        }
2983    }
2984}
2985
2986#[derive(Debug, Clone, Default)]
2987pub struct CommentsMap {
2988    map: BTreeMap<CommentObjectId, BTreeMap<Option<usize>, String>>,
2989}
2990
2991impl CommentsMap {
2992    pub fn update_comment(
2993        &mut self,
2994        object_id: CommentObjectId,
2995        sub_component: Option<usize>,
2996        comment: Option<String>,
2997    ) -> Option<String> {
2998        let object_comments = self.map.entry(object_id).or_default();
2999
3000        // Either replace the existing comment, or remove it if comment is None/NULL.
3001        let (empty, prev) = if let Some(comment) = comment {
3002            let prev = object_comments.insert(sub_component, comment);
3003            (false, prev)
3004        } else {
3005            let prev = object_comments.remove(&sub_component);
3006            (object_comments.is_empty(), prev)
3007        };
3008
3009        // Cleanup entries that are now empty.
3010        if empty {
3011            self.map.remove(&object_id);
3012        }
3013
3014        // Return the previous comment, if there was one, for easy removal.
3015        prev
3016    }
3017
3018    /// Remove all comments for `object_id` from the map.
3019    ///
3020    /// Generally there is one comment for a given [`CommentObjectId`], but in the case of
3021    /// relations you can also have comments on the individual columns. Dropping the comments for a
3022    /// relation will also drop all of the comments on any columns.
3023    pub fn drop_comments(
3024        &mut self,
3025        object_ids: &BTreeSet<CommentObjectId>,
3026    ) -> Vec<(CommentObjectId, Option<usize>, String)> {
3027        let mut removed_comments = Vec::new();
3028
3029        for object_id in object_ids {
3030            if let Some(comments) = self.map.remove(object_id) {
3031                let removed = comments
3032                    .into_iter()
3033                    .map(|(sub_comp, comment)| (object_id.clone(), sub_comp, comment));
3034                removed_comments.extend(removed);
3035            }
3036        }
3037
3038        removed_comments
3039    }
3040
3041    pub fn iter(&self) -> impl Iterator<Item = (CommentObjectId, Option<usize>, &str)> {
3042        self.map
3043            .iter()
3044            .map(|(id, comments)| {
3045                comments
3046                    .iter()
3047                    .map(|(pos, comment)| (*id, *pos, comment.as_str()))
3048            })
3049            .flatten()
3050    }
3051
3052    pub fn get_object_comments(
3053        &self,
3054        object_id: CommentObjectId,
3055    ) -> Option<&BTreeMap<Option<usize>, String>> {
3056        self.map.get(&object_id)
3057    }
3058}
3059
3060impl Serialize for CommentsMap {
3061    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
3062    where
3063        S: serde::Serializer,
3064    {
3065        let comment_count = self
3066            .map
3067            .iter()
3068            .map(|(_object_id, comments)| comments.len())
3069            .sum();
3070
3071        let mut seq = serializer.serialize_seq(Some(comment_count))?;
3072        for (object_id, sub) in &self.map {
3073            for (sub_component, comment) in sub {
3074                seq.serialize_element(&(
3075                    format!("{object_id:?}"),
3076                    format!("{sub_component:?}"),
3077                    comment,
3078                ))?;
3079            }
3080        }
3081        seq.end()
3082    }
3083}
3084
3085#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
3086pub struct DefaultPrivileges {
3087    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
3088    privileges: BTreeMap<DefaultPrivilegeObject, RoleDefaultPrivileges>,
3089}
3090
3091// Use a new type here because otherwise we have two levels of BTreeMap, both needing
3092// map_key_to_string.
3093#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
3094struct RoleDefaultPrivileges(
3095    /// Denormalized, the key is the grantee Role.
3096    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
3097    BTreeMap<RoleId, DefaultPrivilegeAclItem>,
3098);
3099
3100impl Deref for RoleDefaultPrivileges {
3101    type Target = BTreeMap<RoleId, DefaultPrivilegeAclItem>;
3102
3103    fn deref(&self) -> &Self::Target {
3104        &self.0
3105    }
3106}
3107
3108impl DerefMut for RoleDefaultPrivileges {
3109    fn deref_mut(&mut self) -> &mut Self::Target {
3110        &mut self.0
3111    }
3112}
3113
3114impl DefaultPrivileges {
3115    /// Add a new default privilege into the set of all default privileges.
3116    pub fn grant(&mut self, object: DefaultPrivilegeObject, privilege: DefaultPrivilegeAclItem) {
3117        if privilege.acl_mode.is_empty() {
3118            return;
3119        }
3120
3121        let privileges = self.privileges.entry(object).or_default();
3122        if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
3123            default_privilege.acl_mode |= privilege.acl_mode;
3124        } else {
3125            privileges.insert(privilege.grantee, privilege);
3126        }
3127    }
3128
3129    /// Revoke a default privilege from the set of all default privileges.
3130    pub fn revoke(&mut self, object: &DefaultPrivilegeObject, privilege: &DefaultPrivilegeAclItem) {
3131        if let Some(privileges) = self.privileges.get_mut(object) {
3132            if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
3133                default_privilege.acl_mode =
3134                    default_privilege.acl_mode.difference(privilege.acl_mode);
3135                if default_privilege.acl_mode.is_empty() {
3136                    privileges.remove(&privilege.grantee);
3137                }
3138            }
3139            if privileges.is_empty() {
3140                self.privileges.remove(object);
3141            }
3142        }
3143    }
3144
3145    /// Get the privileges that will be granted on all objects matching `object` to `grantee`, if
3146    /// any exist.
3147    pub fn get_privileges_for_grantee(
3148        &self,
3149        object: &DefaultPrivilegeObject,
3150        grantee: &RoleId,
3151    ) -> Option<&AclMode> {
3152        self.privileges
3153            .get(object)
3154            .and_then(|privileges| privileges.get(grantee))
3155            .map(|privilege| &privilege.acl_mode)
3156    }
3157
3158    /// Get all default privileges that apply to the provided object details.
3159    pub fn get_applicable_privileges(
3160        &self,
3161        role_id: RoleId,
3162        database_id: Option<DatabaseId>,
3163        schema_id: Option<SchemaId>,
3164        object_type: mz_sql::catalog::ObjectType,
3165    ) -> impl Iterator<Item = DefaultPrivilegeAclItem> + '_ {
3166        // Privileges consider all relations to be of type table due to PostgreSQL compatibility. We
3167        // don't require the caller to worry about that and we will map their `object_type` to the
3168        // correct type for privileges.
3169        let privilege_object_type = if object_type.is_relation() {
3170            mz_sql::catalog::ObjectType::Table
3171        } else {
3172            object_type
3173        };
3174        let valid_acl_mode = rbac::all_object_privileges(SystemObjectType::Object(object_type));
3175
3176        // Collect all entries that apply to the provided object details.
3177        // If either `database_id` or `schema_id` are `None`, then we might end up with duplicate
3178        // entries in the vec below. That's OK because we consolidate the results after.
3179        [
3180            DefaultPrivilegeObject {
3181                role_id,
3182                database_id,
3183                schema_id,
3184                object_type: privilege_object_type,
3185            },
3186            DefaultPrivilegeObject {
3187                role_id,
3188                database_id,
3189                schema_id: None,
3190                object_type: privilege_object_type,
3191            },
3192            DefaultPrivilegeObject {
3193                role_id,
3194                database_id: None,
3195                schema_id: None,
3196                object_type: privilege_object_type,
3197            },
3198            DefaultPrivilegeObject {
3199                role_id: RoleId::Public,
3200                database_id,
3201                schema_id,
3202                object_type: privilege_object_type,
3203            },
3204            DefaultPrivilegeObject {
3205                role_id: RoleId::Public,
3206                database_id,
3207                schema_id: None,
3208                object_type: privilege_object_type,
3209            },
3210            DefaultPrivilegeObject {
3211                role_id: RoleId::Public,
3212                database_id: None,
3213                schema_id: None,
3214                object_type: privilege_object_type,
3215            },
3216        ]
3217        .into_iter()
3218        .filter_map(|object| self.privileges.get(&object))
3219        .flat_map(|acl_map| acl_map.values())
3220        // Consolidate privileges with a common grantee.
3221        .fold(
3222            BTreeMap::new(),
3223            |mut accum, DefaultPrivilegeAclItem { grantee, acl_mode }| {
3224                let accum_acl_mode = accum.entry(grantee).or_insert_with(AclMode::empty);
3225                *accum_acl_mode |= *acl_mode;
3226                accum
3227            },
3228        )
3229        .into_iter()
3230        // Restrict the acl_mode to only privileges valid for the provided object type. If the
3231        // default privilege has an object type of Table, then it may contain privileges valid for
3232        // tables but not other relations. If the passed in object type is another relation, then
3233        // we need to remove any privilege that is not valid for the specified relation.
3234        .map(move |(grantee, acl_mode)| (grantee, acl_mode & valid_acl_mode))
3235        // Filter out empty privileges.
3236        .filter(|(_, acl_mode)| !acl_mode.is_empty())
3237        .map(|(grantee, acl_mode)| DefaultPrivilegeAclItem {
3238            grantee: *grantee,
3239            acl_mode,
3240        })
3241    }
3242
3243    pub fn iter(
3244        &self,
3245    ) -> impl Iterator<
3246        Item = (
3247            &DefaultPrivilegeObject,
3248            impl Iterator<Item = &DefaultPrivilegeAclItem>,
3249        ),
3250    > {
3251        self.privileges
3252            .iter()
3253            .map(|(object, acl_map)| (object, acl_map.values()))
3254    }
3255}
3256
3257#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3258pub struct ClusterConfig {
3259    pub variant: ClusterVariant,
3260    pub workload_class: Option<String>,
3261}
3262
3263impl ClusterConfig {
3264    pub fn features(&self) -> Option<&OptimizerFeatureOverrides> {
3265        match &self.variant {
3266            ClusterVariant::Managed(managed) => Some(&managed.optimizer_feature_overrides),
3267            ClusterVariant::Unmanaged => None,
3268        }
3269    }
3270}
3271
3272impl From<ClusterConfig> for durable::ClusterConfig {
3273    fn from(config: ClusterConfig) -> Self {
3274        Self {
3275            variant: config.variant.into(),
3276            workload_class: config.workload_class,
3277        }
3278    }
3279}
3280
3281impl From<durable::ClusterConfig> for ClusterConfig {
3282    fn from(config: durable::ClusterConfig) -> Self {
3283        Self {
3284            variant: config.variant.into(),
3285            workload_class: config.workload_class,
3286        }
3287    }
3288}
3289
3290#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3291pub struct ClusterVariantManaged {
3292    pub size: String,
3293    pub availability_zones: Vec<String>,
3294    pub logging: ReplicaLogging,
3295    pub replication_factor: u32,
3296    pub optimizer_feature_overrides: OptimizerFeatureOverrides,
3297    pub schedule: ClusterSchedule,
3298    /// User-configured autoscaling policy, distinct from the in-flight runtime
3299    /// records below. Shared with the durable layer, like [`ClusterSchedule`].
3300    pub auto_scaling_strategy: Option<AutoScalingStrategy>,
3301    /// In-flight graceful reconfiguration the controller is converging on.
3302    pub reconfiguration: Option<ReconfigurationState>,
3303    /// In-flight hydration burst the controller is running.
3304    pub burst: Option<BurstState>,
3305}
3306
3307impl From<ClusterVariantManaged> for durable::ClusterVariantManaged {
3308    fn from(managed: ClusterVariantManaged) -> Self {
3309        // Destructure the source (no `..`): a field added to either side is a
3310        // compile error here until it's carried across the boundary.
3311        let ClusterVariantManaged {
3312            size,
3313            availability_zones,
3314            logging,
3315            replication_factor,
3316            optimizer_feature_overrides,
3317            schedule,
3318            auto_scaling_strategy,
3319            reconfiguration,
3320            burst,
3321        } = managed;
3322        Self {
3323            size,
3324            availability_zones,
3325            logging,
3326            replication_factor,
3327            optimizer_feature_overrides: optimizer_feature_overrides.into(),
3328            schedule,
3329            auto_scaling_strategy,
3330            reconfiguration: reconfiguration.map(Into::into),
3331            burst: burst.map(Into::into),
3332        }
3333    }
3334}
3335
3336impl From<durable::ClusterVariantManaged> for ClusterVariantManaged {
3337    fn from(managed: durable::ClusterVariantManaged) -> Self {
3338        // Destructure the source (no `..`): a field added to either side is a
3339        // compile error here until it's carried across the boundary.
3340        let durable::ClusterVariantManaged {
3341            size,
3342            availability_zones,
3343            logging,
3344            replication_factor,
3345            optimizer_feature_overrides,
3346            schedule,
3347            auto_scaling_strategy,
3348            reconfiguration,
3349            burst,
3350        } = managed;
3351        Self {
3352            size,
3353            availability_zones,
3354            logging,
3355            replication_factor,
3356            optimizer_feature_overrides: optimizer_feature_overrides.into(),
3357            schedule,
3358            auto_scaling_strategy,
3359            reconfiguration: reconfiguration.map(Into::into),
3360            burst: burst.map(Into::into),
3361        }
3362    }
3363}
3364
3365/// In-memory mirror of [`durable::ReconfigurationState`].
3366///
3367/// This runtime state lives only in the durable layer, so the memory layer
3368/// carries its own `Serialize`/`Deserialize` mirror (to back the catalog
3369/// `dump()`) and converts across the boundary, rather than embedding the
3370/// durable-only type. The semantic contract lives on the durable type.
3371#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3372pub struct ReconfigurationState {
3373    pub target: ReconfigurationTarget,
3374    pub deadline: Timestamp,
3375    pub on_timeout: OnTimeoutAction,
3376}
3377
3378impl From<ReconfigurationState> for durable::ReconfigurationState {
3379    fn from(state: ReconfigurationState) -> Self {
3380        // Destructure the source (no `..`): a field added to either side is a
3381        // compile error here until it's carried across the boundary.
3382        let ReconfigurationState {
3383            target,
3384            deadline,
3385            on_timeout,
3386        } = state;
3387        Self {
3388            target: target.into(),
3389            deadline,
3390            on_timeout,
3391        }
3392    }
3393}
3394
3395impl From<durable::ReconfigurationState> for ReconfigurationState {
3396    fn from(state: durable::ReconfigurationState) -> Self {
3397        // Destructure the source (no `..`): a field added to either side is a
3398        // compile error here until it's carried across the boundary.
3399        let durable::ReconfigurationState {
3400            target,
3401            deadline,
3402            on_timeout,
3403        } = state;
3404        Self {
3405            target: target.into(),
3406            deadline,
3407            on_timeout,
3408        }
3409    }
3410}
3411
3412/// In-memory mirror of [`durable::ReconfigurationTarget`].
3413#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3414pub struct ReconfigurationTarget {
3415    pub size: String,
3416    pub replication_factor: u32,
3417    pub availability_zones: Vec<String>,
3418    pub logging: ReplicaLogging,
3419}
3420
3421impl From<ReconfigurationTarget> for durable::ReconfigurationTarget {
3422    fn from(target: ReconfigurationTarget) -> Self {
3423        // Destructure the source (no `..`): a field added to either side is a
3424        // compile error here until it's carried across the boundary.
3425        let ReconfigurationTarget {
3426            size,
3427            replication_factor,
3428            availability_zones,
3429            logging,
3430        } = target;
3431        Self {
3432            size,
3433            replication_factor,
3434            availability_zones,
3435            logging,
3436        }
3437    }
3438}
3439
3440impl From<durable::ReconfigurationTarget> for ReconfigurationTarget {
3441    fn from(target: durable::ReconfigurationTarget) -> Self {
3442        // Destructure the source (no `..`): a field added to either side is a
3443        // compile error here until it's carried across the boundary.
3444        let durable::ReconfigurationTarget {
3445            size,
3446            replication_factor,
3447            availability_zones,
3448            logging,
3449        } = target;
3450        Self {
3451            size,
3452            replication_factor,
3453            availability_zones,
3454            logging,
3455        }
3456    }
3457}
3458
3459/// In-memory mirror of [`durable::BurstState`].
3460#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3461pub struct BurstState {
3462    pub burst_size: String,
3463    pub linger_duration: Duration,
3464    pub steady_hydrated_at: Option<Timestamp>,
3465}
3466
3467impl From<BurstState> for durable::BurstState {
3468    fn from(burst: BurstState) -> Self {
3469        // Destructure the source (no `..`): a field added to either side is a
3470        // compile error here until it's carried across the boundary.
3471        let BurstState {
3472            burst_size,
3473            linger_duration,
3474            steady_hydrated_at,
3475        } = burst;
3476        Self {
3477            burst_size,
3478            linger_duration,
3479            steady_hydrated_at,
3480        }
3481    }
3482}
3483
3484impl From<durable::BurstState> for BurstState {
3485    fn from(burst: durable::BurstState) -> Self {
3486        // Destructure the source (no `..`): a field added to either side is a
3487        // compile error here until it's carried across the boundary.
3488        let durable::BurstState {
3489            burst_size,
3490            linger_duration,
3491            steady_hydrated_at,
3492        } = burst;
3493        Self {
3494            burst_size,
3495            linger_duration,
3496            steady_hydrated_at,
3497        }
3498    }
3499}
3500
3501#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3502pub enum ClusterVariant {
3503    Managed(ClusterVariantManaged),
3504    Unmanaged,
3505}
3506
3507impl From<ClusterVariant> for durable::ClusterVariant {
3508    fn from(variant: ClusterVariant) -> Self {
3509        match variant {
3510            ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3511            ClusterVariant::Unmanaged => Self::Unmanaged,
3512        }
3513    }
3514}
3515
3516impl From<durable::ClusterVariant> for ClusterVariant {
3517    fn from(variant: durable::ClusterVariant) -> Self {
3518        match variant {
3519            durable::ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3520            durable::ClusterVariant::Unmanaged => Self::Unmanaged,
3521        }
3522    }
3523}
3524
3525impl mz_sql::catalog::CatalogDatabase for Database {
3526    fn name(&self) -> &str {
3527        &self.name
3528    }
3529
3530    fn id(&self) -> DatabaseId {
3531        self.id
3532    }
3533
3534    fn has_schemas(&self) -> bool {
3535        !self.schemas_by_name.is_empty()
3536    }
3537
3538    fn schema_ids(&self) -> &BTreeMap<String, SchemaId> {
3539        &self.schemas_by_name
3540    }
3541
3542    // `as` is ok to use to cast to a trait object.
3543    #[allow(clippy::as_conversions)]
3544    fn schemas(&self) -> Vec<&dyn CatalogSchema> {
3545        self.schemas_by_id
3546            .values()
3547            .map(|schema| schema as &dyn CatalogSchema)
3548            .collect()
3549    }
3550
3551    fn owner_id(&self) -> RoleId {
3552        self.owner_id
3553    }
3554
3555    fn privileges(&self) -> &PrivilegeMap {
3556        &self.privileges
3557    }
3558}
3559
3560impl mz_sql::catalog::CatalogSchema for Schema {
3561    fn database(&self) -> &ResolvedDatabaseSpecifier {
3562        &self.name.database
3563    }
3564
3565    fn name(&self) -> &QualifiedSchemaName {
3566        &self.name
3567    }
3568
3569    fn id(&self) -> &SchemaSpecifier {
3570        &self.id
3571    }
3572
3573    fn has_items(&self) -> bool {
3574        !self.items.is_empty()
3575    }
3576
3577    fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_> {
3578        Box::new(
3579            self.items
3580                .values()
3581                .chain(self.functions.values())
3582                .chain(self.types.values())
3583                .copied(),
3584        )
3585    }
3586
3587    fn owner_id(&self) -> RoleId {
3588        self.owner_id
3589    }
3590
3591    fn privileges(&self) -> &PrivilegeMap {
3592        &self.privileges
3593    }
3594}
3595
3596impl mz_sql::catalog::CatalogRole for Role {
3597    fn name(&self) -> &str {
3598        &self.name
3599    }
3600
3601    fn id(&self) -> RoleId {
3602        self.id
3603    }
3604
3605    fn membership(&self) -> &BTreeMap<RoleId, RoleId> {
3606        &self.membership.map
3607    }
3608
3609    fn attributes(&self) -> &RoleAttributes {
3610        &self.attributes
3611    }
3612
3613    fn vars(&self) -> &BTreeMap<String, OwnedVarInput> {
3614        &self.vars.map
3615    }
3616}
3617
3618impl mz_sql::catalog::CatalogNetworkPolicy for NetworkPolicy {
3619    fn name(&self) -> &str {
3620        &self.name
3621    }
3622
3623    fn id(&self) -> NetworkPolicyId {
3624        self.id
3625    }
3626
3627    fn owner_id(&self) -> RoleId {
3628        self.owner_id
3629    }
3630
3631    fn privileges(&self) -> &PrivilegeMap {
3632        &self.privileges
3633    }
3634}
3635
3636impl mz_sql::catalog::CatalogCluster<'_> for Cluster {
3637    fn name(&self) -> &str {
3638        &self.name
3639    }
3640
3641    fn id(&self) -> ClusterId {
3642        self.id
3643    }
3644
3645    fn bound_objects(&self) -> &BTreeSet<CatalogItemId> {
3646        &self.bound_objects
3647    }
3648
3649    fn replica_ids(&self) -> &BTreeMap<String, ReplicaId> {
3650        &self.replica_id_by_name_
3651    }
3652
3653    // `as` is ok to use to cast to a trait object.
3654    #[allow(clippy::as_conversions)]
3655    fn replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>> {
3656        self.replicas()
3657            .map(|replica| replica as &dyn CatalogClusterReplica)
3658            .collect()
3659    }
3660
3661    fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica<'_> {
3662        self.replica(id).expect("catalog out of sync")
3663    }
3664
3665    fn owner_id(&self) -> RoleId {
3666        self.owner_id
3667    }
3668
3669    fn privileges(&self) -> &PrivilegeMap {
3670        &self.privileges
3671    }
3672
3673    fn is_managed(&self) -> bool {
3674        self.is_managed()
3675    }
3676
3677    fn managed_size(&self) -> Option<&str> {
3678        match &self.config.variant {
3679            ClusterVariant::Managed(ClusterVariantManaged { size, .. }) => Some(size),
3680            ClusterVariant::Unmanaged => None,
3681        }
3682    }
3683
3684    fn schedule(&self) -> Option<&ClusterSchedule> {
3685        match &self.config.variant {
3686            ClusterVariant::Managed(ClusterVariantManaged { schedule, .. }) => Some(schedule),
3687            ClusterVariant::Unmanaged => None,
3688        }
3689    }
3690
3691    fn replication_factor(&self) -> Option<u32> {
3692        match &self.config.variant {
3693            ClusterVariant::Managed(ClusterVariantManaged {
3694                replication_factor, ..
3695            }) => Some(*replication_factor),
3696            ClusterVariant::Unmanaged => None,
3697        }
3698    }
3699
3700    fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
3701        self.try_to_plan()
3702    }
3703}
3704
3705impl mz_sql::catalog::CatalogClusterReplica<'_> for ClusterReplica {
3706    fn name(&self) -> &str {
3707        &self.name
3708    }
3709
3710    fn cluster_id(&self) -> ClusterId {
3711        self.cluster_id
3712    }
3713
3714    fn replica_id(&self) -> ReplicaId {
3715        self.replica_id
3716    }
3717
3718    fn owner_id(&self) -> RoleId {
3719        self.owner_id
3720    }
3721
3722    fn internal(&self) -> bool {
3723        self.config.location.internal()
3724    }
3725}
3726
3727impl mz_sql::catalog::CatalogItem for CatalogEntry {
3728    fn name(&self) -> &QualifiedItemName {
3729        self.name()
3730    }
3731
3732    fn id(&self) -> CatalogItemId {
3733        self.id()
3734    }
3735
3736    fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
3737        Box::new(self.global_ids())
3738    }
3739
3740    fn oid(&self) -> u32 {
3741        self.oid()
3742    }
3743
3744    fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
3745        self.func()
3746    }
3747
3748    fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
3749        self.source_desc()
3750    }
3751
3752    fn connection(
3753        &self,
3754    ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
3755    {
3756        Ok(self.connection()?.details.to_connection())
3757    }
3758
3759    fn create_sql(&self) -> &str {
3760        match self.item() {
3761            CatalogItem::Table(Table { create_sql, .. }) => {
3762                create_sql.as_deref().unwrap_or("<builtin>")
3763            }
3764            CatalogItem::Source(Source { create_sql, .. }) => {
3765                create_sql.as_deref().unwrap_or("<builtin>")
3766            }
3767            CatalogItem::Sink(Sink { create_sql, .. }) => create_sql,
3768            CatalogItem::View(View { create_sql, .. }) => create_sql,
3769            CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) => create_sql,
3770            CatalogItem::Index(Index { create_sql, .. }) => create_sql,
3771            CatalogItem::Type(Type { create_sql, .. }) => {
3772                create_sql.as_deref().unwrap_or("<builtin>")
3773            }
3774            CatalogItem::Secret(Secret { create_sql, .. }) => create_sql,
3775            CatalogItem::Connection(Connection { create_sql, .. }) => create_sql,
3776            CatalogItem::Func(_) => "<builtin>",
3777            CatalogItem::Log(_) => "<builtin>",
3778        }
3779    }
3780
3781    fn item_type(&self) -> SqlCatalogItemType {
3782        self.item().typ()
3783    }
3784
3785    fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
3786        if let CatalogItem::Index(Index { keys, on, .. }) = self.item() {
3787            Some((keys, *on))
3788        } else {
3789            None
3790        }
3791    }
3792
3793    fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
3794        if let CatalogItem::Table(Table {
3795            data_source: TableDataSource::TableWrites { defaults },
3796            ..
3797        }) = self.item()
3798        {
3799            Some(defaults.as_slice())
3800        } else {
3801            None
3802        }
3803    }
3804
3805    fn replacement_target(&self) -> Option<CatalogItemId> {
3806        if let CatalogItem::MaterializedView(mv) = self.item() {
3807            mv.replacement_target
3808        } else {
3809            None
3810        }
3811    }
3812
3813    fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
3814        if let CatalogItem::Type(Type { details, .. }) = self.item() {
3815            Some(details)
3816        } else {
3817            None
3818        }
3819    }
3820
3821    fn references(&self) -> &ResolvedIds {
3822        self.references()
3823    }
3824
3825    fn uses(&self) -> BTreeSet<CatalogItemId> {
3826        self.uses()
3827    }
3828
3829    fn referenced_by(&self) -> &[CatalogItemId] {
3830        self.referenced_by()
3831    }
3832
3833    fn used_by(&self) -> &[CatalogItemId] {
3834        self.used_by()
3835    }
3836
3837    fn subsource_details(
3838        &self,
3839    ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
3840        self.subsource_details()
3841    }
3842
3843    fn source_export_details(
3844        &self,
3845    ) -> Option<(
3846        CatalogItemId,
3847        &UnresolvedItemName,
3848        &SourceExportDetails,
3849        &SourceExportDataConfig<ReferencedConnection>,
3850    )> {
3851        self.source_export_details()
3852    }
3853
3854    fn is_progress_source(&self) -> bool {
3855        self.is_progress_source()
3856    }
3857
3858    fn progress_id(&self) -> Option<CatalogItemId> {
3859        self.progress_id()
3860    }
3861
3862    fn owner_id(&self) -> RoleId {
3863        self.owner_id
3864    }
3865
3866    fn privileges(&self) -> &PrivilegeMap {
3867        &self.privileges
3868    }
3869
3870    fn cluster_id(&self) -> Option<ClusterId> {
3871        self.item().cluster_id()
3872    }
3873
3874    fn at_version(
3875        &self,
3876        version: RelationVersionSelector,
3877    ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
3878        Box::new(CatalogCollectionEntry {
3879            entry: self.clone(),
3880            version,
3881        })
3882    }
3883
3884    fn latest_version(&self) -> Option<RelationVersion> {
3885        self.table().map(|t| t.desc.latest_version())
3886    }
3887}
3888
3889/// A single update to the catalog state.
3890#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3891pub struct StateUpdate {
3892    pub kind: StateUpdateKind,
3893    pub ts: Timestamp,
3894    pub diff: StateDiff,
3895}
3896
3897/// The contents of a single state update.
3898///
3899/// Variants are listed in dependency order.
3900#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3901pub enum StateUpdateKind {
3902    Role(durable::objects::Role),
3903    RoleAuth(durable::objects::RoleAuth),
3904    Database(durable::objects::Database),
3905    Schema(durable::objects::Schema),
3906    DefaultPrivilege(durable::objects::DefaultPrivilege),
3907    SystemPrivilege(MzAclItem),
3908    SystemConfiguration(durable::objects::SystemConfiguration),
3909    Cluster(durable::objects::Cluster),
3910    ClusterSystemConfiguration(durable::objects::ClusterSystemConfiguration),
3911    NetworkPolicy(durable::objects::NetworkPolicy),
3912    IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3913    ClusterReplica(durable::objects::ClusterReplica),
3914    ReplicaSystemConfiguration(durable::objects::ReplicaSystemConfiguration),
3915    SourceReferences(durable::objects::SourceReferences),
3916    SystemObjectMapping(durable::objects::SystemObjectMapping),
3917    // Temporary items are not actually updated via the durable catalog, but
3918    // this allows us to model them the same way as all other items in parts of
3919    // the pipeline.
3920    TemporaryItem(TemporaryItem),
3921    Item(durable::objects::Item),
3922    Comment(durable::objects::Comment),
3923    AuditLog(durable::objects::AuditLog),
3924    // Storage updates.
3925    StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3926    UnfinalizedShard(durable::objects::UnfinalizedShard),
3927}
3928
3929/// Valid diffs for catalog state updates.
3930#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
3931pub enum StateDiff {
3932    Retraction,
3933    Addition,
3934}
3935
3936impl From<StateDiff> for Diff {
3937    fn from(diff: StateDiff) -> Self {
3938        match diff {
3939            StateDiff::Retraction => Diff::MINUS_ONE,
3940            StateDiff::Addition => Diff::ONE,
3941        }
3942    }
3943}
3944impl TryFrom<Diff> for StateDiff {
3945    type Error = String;
3946
3947    fn try_from(diff: Diff) -> Result<Self, Self::Error> {
3948        match diff {
3949            Diff::MINUS_ONE => Ok(Self::Retraction),
3950            Diff::ONE => Ok(Self::Addition),
3951            diff => Err(format!("invalid diff {diff}")),
3952        }
3953    }
3954}
3955
3956/// Information needed to process an update to a temporary item.
3957#[derive(Debug, Clone, Ord, PartialOrd, PartialEq, Eq)]
3958pub struct TemporaryItem {
3959    pub id: CatalogItemId,
3960    pub oid: u32,
3961    pub global_id: GlobalId,
3962    pub schema_id: SchemaId,
3963    pub name: String,
3964    pub conn_id: Option<ConnectionId>,
3965    pub create_sql: String,
3966    pub owner_id: RoleId,
3967    pub privileges: Vec<MzAclItem>,
3968    pub extra_versions: BTreeMap<RelationVersion, GlobalId>,
3969}
3970
3971impl From<CatalogEntry> for TemporaryItem {
3972    fn from(entry: CatalogEntry) -> Self {
3973        let conn_id = entry.conn_id().cloned();
3974        let (create_sql, global_id, extra_versions) = entry.item.to_serialized();
3975
3976        TemporaryItem {
3977            id: entry.id,
3978            oid: entry.oid,
3979            global_id,
3980            schema_id: entry.name.qualifiers.schema_spec.into(),
3981            name: entry.name.item,
3982            conn_id,
3983            create_sql,
3984            owner_id: entry.owner_id,
3985            privileges: entry.privileges.into_all_values().collect(),
3986            extra_versions,
3987        }
3988    }
3989}
3990
3991impl TemporaryItem {
3992    pub fn item_type(&self) -> CatalogItemType {
3993        item_type(&self.create_sql)
3994    }
3995}
3996
3997/// The same as [`StateUpdateKind`], but without `TemporaryItem` so we can derive [`Ord`].
3998#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
3999pub enum BootstrapStateUpdateKind {
4000    Role(durable::objects::Role),
4001    RoleAuth(durable::objects::RoleAuth),
4002    Database(durable::objects::Database),
4003    Schema(durable::objects::Schema),
4004    DefaultPrivilege(durable::objects::DefaultPrivilege),
4005    SystemPrivilege(MzAclItem),
4006    SystemConfiguration(durable::objects::SystemConfiguration),
4007    Cluster(durable::objects::Cluster),
4008    ClusterSystemConfiguration(durable::objects::ClusterSystemConfiguration),
4009    NetworkPolicy(durable::objects::NetworkPolicy),
4010    IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
4011    ClusterReplica(durable::objects::ClusterReplica),
4012    ReplicaSystemConfiguration(durable::objects::ReplicaSystemConfiguration),
4013    SourceReferences(durable::objects::SourceReferences),
4014    SystemObjectMapping(durable::objects::SystemObjectMapping),
4015    Item(durable::objects::Item),
4016    Comment(durable::objects::Comment),
4017    AuditLog(durable::objects::AuditLog),
4018    // Storage updates.
4019    StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
4020    UnfinalizedShard(durable::objects::UnfinalizedShard),
4021}
4022
4023impl From<BootstrapStateUpdateKind> for StateUpdateKind {
4024    fn from(value: BootstrapStateUpdateKind) -> Self {
4025        match value {
4026            BootstrapStateUpdateKind::Role(kind) => StateUpdateKind::Role(kind),
4027            BootstrapStateUpdateKind::RoleAuth(kind) => StateUpdateKind::RoleAuth(kind),
4028            BootstrapStateUpdateKind::Database(kind) => StateUpdateKind::Database(kind),
4029            BootstrapStateUpdateKind::Schema(kind) => StateUpdateKind::Schema(kind),
4030            BootstrapStateUpdateKind::DefaultPrivilege(kind) => {
4031                StateUpdateKind::DefaultPrivilege(kind)
4032            }
4033            BootstrapStateUpdateKind::SystemPrivilege(kind) => {
4034                StateUpdateKind::SystemPrivilege(kind)
4035            }
4036            BootstrapStateUpdateKind::SystemConfiguration(kind) => {
4037                StateUpdateKind::SystemConfiguration(kind)
4038            }
4039            BootstrapStateUpdateKind::ClusterSystemConfiguration(kind) => {
4040                StateUpdateKind::ClusterSystemConfiguration(kind)
4041            }
4042            BootstrapStateUpdateKind::ReplicaSystemConfiguration(kind) => {
4043                StateUpdateKind::ReplicaSystemConfiguration(kind)
4044            }
4045            BootstrapStateUpdateKind::SourceReferences(kind) => {
4046                StateUpdateKind::SourceReferences(kind)
4047            }
4048            BootstrapStateUpdateKind::Cluster(kind) => StateUpdateKind::Cluster(kind),
4049            BootstrapStateUpdateKind::NetworkPolicy(kind) => StateUpdateKind::NetworkPolicy(kind),
4050            BootstrapStateUpdateKind::IntrospectionSourceIndex(kind) => {
4051                StateUpdateKind::IntrospectionSourceIndex(kind)
4052            }
4053            BootstrapStateUpdateKind::ClusterReplica(kind) => StateUpdateKind::ClusterReplica(kind),
4054            BootstrapStateUpdateKind::SystemObjectMapping(kind) => {
4055                StateUpdateKind::SystemObjectMapping(kind)
4056            }
4057            BootstrapStateUpdateKind::Item(kind) => StateUpdateKind::Item(kind),
4058            BootstrapStateUpdateKind::Comment(kind) => StateUpdateKind::Comment(kind),
4059            BootstrapStateUpdateKind::AuditLog(kind) => StateUpdateKind::AuditLog(kind),
4060            BootstrapStateUpdateKind::StorageCollectionMetadata(kind) => {
4061                StateUpdateKind::StorageCollectionMetadata(kind)
4062            }
4063            BootstrapStateUpdateKind::UnfinalizedShard(kind) => {
4064                StateUpdateKind::UnfinalizedShard(kind)
4065            }
4066        }
4067    }
4068}
4069
4070impl TryFrom<StateUpdateKind> for BootstrapStateUpdateKind {
4071    type Error = TemporaryItem;
4072
4073    fn try_from(value: StateUpdateKind) -> Result<Self, Self::Error> {
4074        match value {
4075            StateUpdateKind::Role(kind) => Ok(BootstrapStateUpdateKind::Role(kind)),
4076            StateUpdateKind::RoleAuth(kind) => Ok(BootstrapStateUpdateKind::RoleAuth(kind)),
4077            StateUpdateKind::Database(kind) => Ok(BootstrapStateUpdateKind::Database(kind)),
4078            StateUpdateKind::Schema(kind) => Ok(BootstrapStateUpdateKind::Schema(kind)),
4079            StateUpdateKind::DefaultPrivilege(kind) => {
4080                Ok(BootstrapStateUpdateKind::DefaultPrivilege(kind))
4081            }
4082            StateUpdateKind::SystemPrivilege(kind) => {
4083                Ok(BootstrapStateUpdateKind::SystemPrivilege(kind))
4084            }
4085            StateUpdateKind::SystemConfiguration(kind) => {
4086                Ok(BootstrapStateUpdateKind::SystemConfiguration(kind))
4087            }
4088            StateUpdateKind::ClusterSystemConfiguration(kind) => {
4089                Ok(BootstrapStateUpdateKind::ClusterSystemConfiguration(kind))
4090            }
4091            StateUpdateKind::ReplicaSystemConfiguration(kind) => {
4092                Ok(BootstrapStateUpdateKind::ReplicaSystemConfiguration(kind))
4093            }
4094            StateUpdateKind::Cluster(kind) => Ok(BootstrapStateUpdateKind::Cluster(kind)),
4095            StateUpdateKind::NetworkPolicy(kind) => {
4096                Ok(BootstrapStateUpdateKind::NetworkPolicy(kind))
4097            }
4098            StateUpdateKind::IntrospectionSourceIndex(kind) => {
4099                Ok(BootstrapStateUpdateKind::IntrospectionSourceIndex(kind))
4100            }
4101            StateUpdateKind::ClusterReplica(kind) => {
4102                Ok(BootstrapStateUpdateKind::ClusterReplica(kind))
4103            }
4104            StateUpdateKind::SourceReferences(kind) => {
4105                Ok(BootstrapStateUpdateKind::SourceReferences(kind))
4106            }
4107            StateUpdateKind::SystemObjectMapping(kind) => {
4108                Ok(BootstrapStateUpdateKind::SystemObjectMapping(kind))
4109            }
4110            StateUpdateKind::TemporaryItem(kind) => Err(kind),
4111            StateUpdateKind::Item(kind) => Ok(BootstrapStateUpdateKind::Item(kind)),
4112            StateUpdateKind::Comment(kind) => Ok(BootstrapStateUpdateKind::Comment(kind)),
4113            StateUpdateKind::AuditLog(kind) => Ok(BootstrapStateUpdateKind::AuditLog(kind)),
4114            StateUpdateKind::StorageCollectionMetadata(kind) => {
4115                Ok(BootstrapStateUpdateKind::StorageCollectionMetadata(kind))
4116            }
4117            StateUpdateKind::UnfinalizedShard(kind) => {
4118                Ok(BootstrapStateUpdateKind::UnfinalizedShard(kind))
4119            }
4120        }
4121    }
4122}