1use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use chrono::{DateTime, Utc};
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_compute_client::logging::LogVariant;
24use mz_controller::clusters::{ClusterRole, ClusterStatus, ReplicaConfig, ReplicaLogging};
25use mz_controller_types::{ClusterId, ReplicaId};
26use mz_expr::{MirScalarExpr, OptimizedMirRelationExpr};
27use mz_ore::collections::CollectionExt;
28use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
29use mz_repr::network_policy_id::NetworkPolicyId;
30use mz_repr::optimize::OptimizerFeatureOverrides;
31use mz_repr::refresh_schedule::RefreshSchedule;
32use mz_repr::role_id::RoleId;
33use mz_repr::{
34 CatalogItemId, ColumnName, Diff, GlobalId, RelationDesc, RelationVersion,
35 RelationVersionSelector, SqlColumnType, Timestamp, VersionedRelationDesc,
36};
37use mz_sql::ast::display::AstDisplay;
38use mz_sql::ast::{
39 ColumnDef, ColumnOption, ColumnOptionDef, ColumnVersioned, Expr, Raw, RawDataType, Statement,
40 UnresolvedItemName, Value, WithOptionValue,
41};
42use mz_sql::catalog::{
43 CatalogClusterReplica, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem,
44 CatalogItemType as SqlCatalogItemType, CatalogItemType, CatalogSchema, CatalogType,
45 CatalogTypeDetails, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference,
46 RoleAttributes, RoleMembership, RoleVars, SystemObjectType,
47};
48use mz_sql::names::{
49 Aug, CommentObjectId, DatabaseId, DependencyIds, FullItemName, QualifiedItemName,
50 QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier,
51};
52use mz_sql::plan::{
53 ClusterSchedule, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, ConnectionDetails,
54 CreateClusterManagedPlan, CreateClusterPlan, CreateClusterVariant, CreateSourcePlan,
55 HirRelationExpr, NetworkPolicyRule, PlanError, WebhookBodyFormat, WebhookHeaders,
56 WebhookValidation,
57};
58use mz_sql::rbac;
59use mz_sql::session::vars::OwnedVarInput;
60use mz_storage_client::controller::IntrospectionType;
61use mz_storage_types::connections::inline::ReferencedConnection;
62use mz_storage_types::sinks::{SinkEnvelope, StorageSinkConnection};
63use mz_storage_types::sources::load_generator::LoadGenerator;
64use mz_storage_types::sources::{
65 GenericSourceConnection, SourceConnection, SourceDesc, SourceEnvelope, SourceExportDataConfig,
66 SourceExportDetails, Timeline,
67};
68use serde::ser::SerializeSeq;
69use serde::{Deserialize, Serialize};
70use timely::progress::Antichain;
71use tracing::debug;
72
73use crate::builtin::{MZ_CATALOG_SERVER_CLUSTER, MZ_SYSTEM_CLUSTER};
74use crate::durable;
75use crate::durable::objects::item_type;
76
77pub trait UpdateFrom<T>: From<T> {
79 fn update_from(&mut self, from: T);
80}
81
82#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
83pub struct Database {
84 pub name: String,
85 pub id: DatabaseId,
86 pub oid: u32,
87 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
88 pub schemas_by_id: BTreeMap<SchemaId, Schema>,
89 pub schemas_by_name: BTreeMap<String, SchemaId>,
90 pub owner_id: RoleId,
91 pub privileges: PrivilegeMap,
92}
93
94impl From<Database> for durable::Database {
95 fn from(database: Database) -> durable::Database {
96 durable::Database {
97 id: database.id,
98 oid: database.oid,
99 name: database.name,
100 owner_id: database.owner_id,
101 privileges: database.privileges.into_all_values().collect(),
102 }
103 }
104}
105
106impl From<durable::Database> for Database {
107 fn from(
108 durable::Database {
109 id,
110 oid,
111 name,
112 owner_id,
113 privileges,
114 }: durable::Database,
115 ) -> Database {
116 Database {
117 id,
118 oid,
119 schemas_by_id: BTreeMap::new(),
120 schemas_by_name: BTreeMap::new(),
121 name,
122 owner_id,
123 privileges: PrivilegeMap::from_mz_acl_items(privileges),
124 }
125 }
126}
127
128impl UpdateFrom<durable::Database> for Database {
129 fn update_from(
130 &mut self,
131 durable::Database {
132 id,
133 oid,
134 name,
135 owner_id,
136 privileges,
137 }: durable::Database,
138 ) {
139 self.id = id;
140 self.oid = oid;
141 self.name = name;
142 self.owner_id = owner_id;
143 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
144 }
145}
146
147#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
148pub struct Schema {
149 pub name: QualifiedSchemaName,
150 pub id: SchemaSpecifier,
151 pub oid: u32,
152 pub items: BTreeMap<String, CatalogItemId>,
153 pub functions: BTreeMap<String, CatalogItemId>,
154 pub types: BTreeMap<String, CatalogItemId>,
155 pub owner_id: RoleId,
156 pub privileges: PrivilegeMap,
157}
158
159impl From<Schema> for durable::Schema {
160 fn from(schema: Schema) -> durable::Schema {
161 durable::Schema {
162 id: schema.id.into(),
163 oid: schema.oid,
164 name: schema.name.schema,
165 database_id: schema.name.database.id(),
166 owner_id: schema.owner_id,
167 privileges: schema.privileges.into_all_values().collect(),
168 }
169 }
170}
171
172impl From<durable::Schema> for Schema {
173 fn from(
174 durable::Schema {
175 id,
176 oid,
177 name,
178 database_id,
179 owner_id,
180 privileges,
181 }: durable::Schema,
182 ) -> Schema {
183 Schema {
184 name: QualifiedSchemaName {
185 database: database_id.into(),
186 schema: name,
187 },
188 id: id.into(),
189 oid,
190 items: BTreeMap::new(),
191 functions: BTreeMap::new(),
192 types: BTreeMap::new(),
193 owner_id,
194 privileges: PrivilegeMap::from_mz_acl_items(privileges),
195 }
196 }
197}
198
199impl UpdateFrom<durable::Schema> for Schema {
200 fn update_from(
201 &mut self,
202 durable::Schema {
203 id,
204 oid,
205 name,
206 database_id,
207 owner_id,
208 privileges,
209 }: durable::Schema,
210 ) {
211 self.name = QualifiedSchemaName {
212 database: database_id.into(),
213 schema: name,
214 };
215 self.id = id.into();
216 self.oid = oid;
217 self.owner_id = owner_id;
218 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
219 }
220}
221
222#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
223pub struct Role {
224 pub name: String,
225 pub id: RoleId,
226 pub oid: u32,
227 pub attributes: RoleAttributes,
228 pub membership: RoleMembership,
229 pub vars: RoleVars,
230}
231
232impl Role {
233 pub fn is_user(&self) -> bool {
234 self.id.is_user()
235 }
236
237 pub fn vars<'a>(&'a self) -> impl Iterator<Item = (&'a str, &'a OwnedVarInput)> {
238 self.vars.map.iter().map(|(name, val)| (name.as_str(), val))
239 }
240}
241
242impl From<Role> for durable::Role {
243 fn from(role: Role) -> durable::Role {
244 durable::Role {
245 id: role.id,
246 oid: role.oid,
247 name: role.name,
248 attributes: role.attributes,
249 membership: role.membership,
250 vars: role.vars,
251 }
252 }
253}
254
255impl From<durable::Role> for Role {
256 fn from(
257 durable::Role {
258 id,
259 oid,
260 name,
261 attributes,
262 membership,
263 vars,
264 }: durable::Role,
265 ) -> Self {
266 Role {
267 name,
268 id,
269 oid,
270 attributes,
271 membership,
272 vars,
273 }
274 }
275}
276
277impl UpdateFrom<durable::Role> for Role {
278 fn update_from(
279 &mut self,
280 durable::Role {
281 id,
282 oid,
283 name,
284 attributes,
285 membership,
286 vars,
287 }: durable::Role,
288 ) {
289 self.id = id;
290 self.oid = oid;
291 self.name = name;
292 self.attributes = attributes;
293 self.membership = membership;
294 self.vars = vars;
295 }
296}
297
298#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
299pub struct RoleAuth {
300 pub role_id: RoleId,
301 pub password_hash: Option<String>,
302 pub updated_at: u64,
303}
304
305impl From<RoleAuth> for durable::RoleAuth {
306 fn from(role_auth: RoleAuth) -> durable::RoleAuth {
307 durable::RoleAuth {
308 role_id: role_auth.role_id,
309 password_hash: role_auth.password_hash,
310 updated_at: role_auth.updated_at,
311 }
312 }
313}
314
315impl From<durable::RoleAuth> for RoleAuth {
316 fn from(
317 durable::RoleAuth {
318 role_id,
319 password_hash,
320 updated_at,
321 }: durable::RoleAuth,
322 ) -> RoleAuth {
323 RoleAuth {
324 role_id,
325 password_hash,
326 updated_at,
327 }
328 }
329}
330
331impl UpdateFrom<durable::RoleAuth> for RoleAuth {
332 fn update_from(&mut self, from: durable::RoleAuth) {
333 self.role_id = from.role_id;
334 self.password_hash = from.password_hash;
335 }
336}
337
338#[derive(Debug, Serialize, Clone, PartialEq)]
339pub struct Cluster {
340 pub name: String,
341 pub id: ClusterId,
342 pub config: ClusterConfig,
343 #[serde(skip)]
344 pub log_indexes: BTreeMap<LogVariant, GlobalId>,
345 pub bound_objects: BTreeSet<CatalogItemId>,
348 pub replica_id_by_name_: BTreeMap<String, ReplicaId>,
349 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
350 pub replicas_by_id_: BTreeMap<ReplicaId, ClusterReplica>,
351 pub owner_id: RoleId,
352 pub privileges: PrivilegeMap,
353}
354
355impl Cluster {
356 pub fn role(&self) -> ClusterRole {
358 if self.name == MZ_SYSTEM_CLUSTER.name {
361 ClusterRole::SystemCritical
362 } else if self.name == MZ_CATALOG_SERVER_CLUSTER.name {
363 ClusterRole::System
364 } else {
365 ClusterRole::User
366 }
367 }
368
369 pub fn is_managed(&self) -> bool {
371 matches!(self.config.variant, ClusterVariant::Managed { .. })
372 }
373
374 pub fn user_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
376 self.replicas().filter(|r| !r.config.location.internal())
377 }
378
379 pub fn replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
381 self.replicas_by_id_.values()
382 }
383
384 pub fn replica(&self, replica_id: ReplicaId) -> Option<&ClusterReplica> {
386 self.replicas_by_id_.get(&replica_id)
387 }
388
389 pub fn replica_id(&self, name: &str) -> Option<ReplicaId> {
391 self.replica_id_by_name_.get(name).copied()
392 }
393
394 pub fn availability_zones(&self) -> Option<&[String]> {
396 match &self.config.variant {
397 ClusterVariant::Managed(managed) => Some(&managed.availability_zones),
398 ClusterVariant::Unmanaged => None,
399 }
400 }
401
402 pub fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
403 let name = self.name.clone();
404 let variant = match &self.config.variant {
405 ClusterVariant::Managed(ClusterVariantManaged {
406 size,
407 availability_zones,
408 logging,
409 replication_factor,
410 optimizer_feature_overrides,
411 schedule,
412 }) => {
413 let introspection = match logging {
414 ReplicaLogging {
415 log_logging,
416 interval: Some(interval),
417 } => Some(ComputeReplicaIntrospectionConfig {
418 debugging: *log_logging,
419 interval: interval.clone(),
420 }),
421 ReplicaLogging {
422 log_logging: _,
423 interval: None,
424 } => None,
425 };
426 let compute = ComputeReplicaConfig { introspection };
427 CreateClusterVariant::Managed(CreateClusterManagedPlan {
428 replication_factor: replication_factor.clone(),
429 size: size.clone(),
430 availability_zones: availability_zones.clone(),
431 compute,
432 optimizer_feature_overrides: optimizer_feature_overrides.clone(),
433 schedule: schedule.clone(),
434 })
435 }
436 ClusterVariant::Unmanaged => {
437 return Err(PlanError::Unsupported {
440 feature: "SHOW CREATE for unmanaged clusters".to_string(),
441 discussion_no: None,
442 });
443 }
444 };
445 let workload_class = self.config.workload_class.clone();
446 Ok(CreateClusterPlan {
447 name,
448 variant,
449 workload_class,
450 })
451 }
452}
453
454impl From<Cluster> for durable::Cluster {
455 fn from(cluster: Cluster) -> durable::Cluster {
456 durable::Cluster {
457 id: cluster.id,
458 name: cluster.name,
459 owner_id: cluster.owner_id,
460 privileges: cluster.privileges.into_all_values().collect(),
461 config: cluster.config.into(),
462 }
463 }
464}
465
466impl From<durable::Cluster> for Cluster {
467 fn from(
468 durable::Cluster {
469 id,
470 name,
471 owner_id,
472 privileges,
473 config,
474 }: durable::Cluster,
475 ) -> Self {
476 Cluster {
477 name: name.clone(),
478 id,
479 bound_objects: BTreeSet::new(),
480 log_indexes: BTreeMap::new(),
481 replica_id_by_name_: BTreeMap::new(),
482 replicas_by_id_: BTreeMap::new(),
483 owner_id,
484 privileges: PrivilegeMap::from_mz_acl_items(privileges),
485 config: config.into(),
486 }
487 }
488}
489
490impl UpdateFrom<durable::Cluster> for Cluster {
491 fn update_from(
492 &mut self,
493 durable::Cluster {
494 id,
495 name,
496 owner_id,
497 privileges,
498 config,
499 }: durable::Cluster,
500 ) {
501 self.id = id;
502 self.name = name;
503 self.owner_id = owner_id;
504 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
505 self.config = config.into();
506 }
507}
508
509#[derive(Debug, Serialize, Clone, PartialEq)]
510pub struct ClusterReplica {
511 pub name: String,
512 pub cluster_id: ClusterId,
513 pub replica_id: ReplicaId,
514 pub config: ReplicaConfig,
515 pub owner_id: RoleId,
516}
517
518impl From<ClusterReplica> for durable::ClusterReplica {
519 fn from(replica: ClusterReplica) -> durable::ClusterReplica {
520 durable::ClusterReplica {
521 cluster_id: replica.cluster_id,
522 replica_id: replica.replica_id,
523 name: replica.name,
524 config: replica.config.into(),
525 owner_id: replica.owner_id,
526 }
527 }
528}
529
530#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
531pub struct ClusterReplicaProcessStatus {
532 pub status: ClusterStatus,
533 pub time: DateTime<Utc>,
534}
535
536#[derive(Debug, Serialize, Clone, PartialEq)]
537pub struct SourceReferences {
538 pub updated_at: u64,
539 pub references: Vec<SourceReference>,
540}
541
542#[derive(Debug, Serialize, Clone, PartialEq)]
543pub struct SourceReference {
544 pub name: String,
545 pub namespace: Option<String>,
546 pub columns: Vec<String>,
547}
548
549impl From<SourceReference> for durable::SourceReference {
550 fn from(source_reference: SourceReference) -> durable::SourceReference {
551 durable::SourceReference {
552 name: source_reference.name,
553 namespace: source_reference.namespace,
554 columns: source_reference.columns,
555 }
556 }
557}
558
559impl SourceReferences {
560 pub fn to_durable(self, source_id: CatalogItemId) -> durable::SourceReferences {
561 durable::SourceReferences {
562 source_id,
563 updated_at: self.updated_at,
564 references: self.references.into_iter().map(Into::into).collect(),
565 }
566 }
567}
568
569impl From<durable::SourceReference> for SourceReference {
570 fn from(source_reference: durable::SourceReference) -> SourceReference {
571 SourceReference {
572 name: source_reference.name,
573 namespace: source_reference.namespace,
574 columns: source_reference.columns,
575 }
576 }
577}
578
579impl From<durable::SourceReferences> for SourceReferences {
580 fn from(source_references: durable::SourceReferences) -> SourceReferences {
581 SourceReferences {
582 updated_at: source_references.updated_at,
583 references: source_references
584 .references
585 .into_iter()
586 .map(|source_reference| source_reference.into())
587 .collect(),
588 }
589 }
590}
591
592impl From<mz_sql::plan::SourceReference> for SourceReference {
593 fn from(source_reference: mz_sql::plan::SourceReference) -> SourceReference {
594 SourceReference {
595 name: source_reference.name,
596 namespace: source_reference.namespace,
597 columns: source_reference.columns,
598 }
599 }
600}
601
602impl From<mz_sql::plan::SourceReferences> for SourceReferences {
603 fn from(source_references: mz_sql::plan::SourceReferences) -> SourceReferences {
604 SourceReferences {
605 updated_at: source_references.updated_at,
606 references: source_references
607 .references
608 .into_iter()
609 .map(|source_reference| source_reference.into())
610 .collect(),
611 }
612 }
613}
614
615impl From<SourceReferences> for mz_sql::plan::SourceReferences {
616 fn from(source_references: SourceReferences) -> mz_sql::plan::SourceReferences {
617 mz_sql::plan::SourceReferences {
618 updated_at: source_references.updated_at,
619 references: source_references
620 .references
621 .into_iter()
622 .map(|source_reference| source_reference.into())
623 .collect(),
624 }
625 }
626}
627
628impl From<SourceReference> for mz_sql::plan::SourceReference {
629 fn from(source_reference: SourceReference) -> mz_sql::plan::SourceReference {
630 mz_sql::plan::SourceReference {
631 name: source_reference.name,
632 namespace: source_reference.namespace,
633 columns: source_reference.columns,
634 }
635 }
636}
637
638#[derive(Clone, Debug, Serialize)]
639pub struct CatalogEntry {
640 pub item: CatalogItem,
641 #[serde(skip)]
642 pub referenced_by: Vec<CatalogItemId>,
643 #[serde(skip)]
647 pub used_by: Vec<CatalogItemId>,
648 pub id: CatalogItemId,
649 pub oid: u32,
650 pub name: QualifiedItemName,
651 pub owner_id: RoleId,
652 pub privileges: PrivilegeMap,
653}
654
655#[derive(Clone, Debug)]
673pub struct CatalogCollectionEntry {
674 pub entry: CatalogEntry,
675 pub version: RelationVersionSelector,
676}
677
678impl CatalogCollectionEntry {
679 pub fn relation_desc(&self) -> Option<Cow<'_, RelationDesc>> {
680 self.item().relation_desc(self.version)
681 }
682}
683
684impl mz_sql::catalog::CatalogCollectionItem for CatalogCollectionEntry {
685 fn relation_desc(&self) -> Option<Cow<'_, RelationDesc>> {
686 self.item().relation_desc(self.version)
687 }
688
689 fn global_id(&self) -> GlobalId {
690 self.entry
691 .item()
692 .global_id_for_version(self.version)
693 .expect("catalog corruption, missing version!")
694 }
695}
696
697impl Deref for CatalogCollectionEntry {
698 type Target = CatalogEntry;
699
700 fn deref(&self) -> &CatalogEntry {
701 &self.entry
702 }
703}
704
705impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry {
706 fn name(&self) -> &QualifiedItemName {
707 self.entry.name()
708 }
709
710 fn id(&self) -> CatalogItemId {
711 self.entry.id()
712 }
713
714 fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
715 Box::new(self.entry.global_ids())
716 }
717
718 fn oid(&self) -> u32 {
719 self.entry.oid()
720 }
721
722 fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
723 self.entry.func()
724 }
725
726 fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
727 self.entry.source_desc()
728 }
729
730 fn connection(
731 &self,
732 ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
733 {
734 mz_sql::catalog::CatalogItem::connection(&self.entry)
735 }
736
737 fn create_sql(&self) -> &str {
738 self.entry.create_sql()
739 }
740
741 fn item_type(&self) -> SqlCatalogItemType {
742 self.entry.item_type()
743 }
744
745 fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
746 self.entry.index_details()
747 }
748
749 fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
750 self.entry.writable_table_details()
751 }
752
753 fn replacement_target(&self) -> Option<CatalogItemId> {
754 self.entry.replacement_target()
755 }
756
757 fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
758 self.entry.type_details()
759 }
760
761 fn references(&self) -> &ResolvedIds {
762 self.entry.references()
763 }
764
765 fn uses(&self) -> BTreeSet<CatalogItemId> {
766 self.entry.uses()
767 }
768
769 fn referenced_by(&self) -> &[CatalogItemId] {
770 self.entry.referenced_by()
771 }
772
773 fn used_by(&self) -> &[CatalogItemId] {
774 self.entry.used_by()
775 }
776
777 fn subsource_details(
778 &self,
779 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
780 self.entry.subsource_details()
781 }
782
783 fn source_export_details(
784 &self,
785 ) -> Option<(
786 CatalogItemId,
787 &UnresolvedItemName,
788 &SourceExportDetails,
789 &SourceExportDataConfig<ReferencedConnection>,
790 )> {
791 self.entry.source_export_details()
792 }
793
794 fn is_progress_source(&self) -> bool {
795 self.entry.is_progress_source()
796 }
797
798 fn progress_id(&self) -> Option<CatalogItemId> {
799 self.entry.progress_id()
800 }
801
802 fn owner_id(&self) -> RoleId {
803 *self.entry.owner_id()
804 }
805
806 fn privileges(&self) -> &PrivilegeMap {
807 self.entry.privileges()
808 }
809
810 fn cluster_id(&self) -> Option<ClusterId> {
811 self.entry.item().cluster_id()
812 }
813
814 fn at_version(
815 &self,
816 version: RelationVersionSelector,
817 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
818 Box::new(CatalogCollectionEntry {
819 entry: self.entry.clone(),
820 version,
821 })
822 }
823
824 fn latest_version(&self) -> Option<RelationVersion> {
825 self.entry.latest_version()
826 }
827}
828
829#[derive(Debug, Clone, Serialize)]
830pub enum CatalogItem {
831 Table(Table),
832 Source(Source),
833 Log(Log),
834 View(View),
835 MaterializedView(MaterializedView),
836 Sink(Sink),
837 Index(Index),
838 Type(Type),
839 Func(Func),
840 Secret(Secret),
841 Connection(Connection),
842 ContinualTask(ContinualTask),
843}
844
845impl From<CatalogEntry> for durable::Item {
846 fn from(entry: CatalogEntry) -> durable::Item {
847 let (create_sql, global_id, extra_versions) = entry.item.into_serialized();
848 durable::Item {
849 id: entry.id,
850 oid: entry.oid,
851 global_id,
852 schema_id: entry.name.qualifiers.schema_spec.into(),
853 name: entry.name.item,
854 create_sql,
855 owner_id: entry.owner_id,
856 privileges: entry.privileges.into_all_values().collect(),
857 extra_versions,
858 }
859 }
860}
861
862#[derive(Debug, Clone, Serialize)]
863pub struct Table {
864 pub create_sql: Option<String>,
866 pub desc: VersionedRelationDesc,
868 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
870 pub collections: BTreeMap<RelationVersion, GlobalId>,
871 #[serde(skip)]
873 pub conn_id: Option<ConnectionId>,
874 pub resolved_ids: ResolvedIds,
876 pub custom_logical_compaction_window: Option<CompactionWindow>,
878 pub is_retained_metrics_object: bool,
883 pub data_source: TableDataSource,
885}
886
887impl Table {
888 pub fn timeline(&self) -> Timeline {
889 match &self.data_source {
890 TableDataSource::TableWrites { .. } => Timeline::EpochMilliseconds,
893 TableDataSource::DataSource { timeline, .. } => timeline.clone(),
894 }
895 }
896
897 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
899 self.collections.values().copied()
900 }
901
902 pub fn global_id_writes(&self) -> GlobalId {
904 *self
905 .collections
906 .last_key_value()
907 .expect("at least one version of a table")
908 .1
909 }
910
911 pub fn collection_descs(
913 &self,
914 ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
915 self.collections.iter().map(|(version, gid)| {
916 let desc = self
917 .desc
918 .at_version(RelationVersionSelector::Specific(*version));
919 (*gid, *version, desc)
920 })
921 }
922
923 pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
925 let (version, _gid) = self
926 .collections
927 .iter()
928 .find(|(_version, gid)| *gid == id)
929 .expect("GlobalId to exist");
930 self.desc
931 .at_version(RelationVersionSelector::Specific(*version))
932 }
933}
934
935#[derive(Clone, Debug, Serialize)]
936pub enum TableDataSource {
937 TableWrites {
939 #[serde(skip)]
940 defaults: Vec<Expr<Aug>>,
941 },
942
943 DataSource {
946 desc: DataSourceDesc,
947 timeline: Timeline,
948 },
949}
950
951#[derive(Debug, Clone, Serialize)]
952pub enum DataSourceDesc {
953 Ingestion {
955 desc: SourceDesc<ReferencedConnection>,
956 cluster_id: ClusterId,
957 },
958 OldSyntaxIngestion {
960 desc: SourceDesc<ReferencedConnection>,
961 cluster_id: ClusterId,
962 progress_subsource: CatalogItemId,
965 data_config: SourceExportDataConfig<ReferencedConnection>,
966 details: SourceExportDetails,
967 },
968 IngestionExport {
976 ingestion_id: CatalogItemId,
977 external_reference: UnresolvedItemName,
978 details: SourceExportDetails,
979 data_config: SourceExportDataConfig<ReferencedConnection>,
980 },
981 Introspection(IntrospectionType),
983 Progress,
985 Webhook {
987 validate_using: Option<WebhookValidation>,
989 body_format: WebhookBodyFormat,
991 headers: WebhookHeaders,
993 cluster_id: ClusterId,
995 },
996}
997
998impl DataSourceDesc {
999 pub fn formats(&self) -> (Option<&str>, Option<&str>) {
1001 match &self {
1002 DataSourceDesc::Ingestion { .. } => (None, None),
1003 DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1004 match &data_config.encoding.as_ref() {
1005 Some(encoding) => match &encoding.key {
1006 Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1007 None => (None, Some(encoding.value.type_())),
1008 },
1009 None => (None, None),
1010 }
1011 }
1012 DataSourceDesc::IngestionExport { data_config, .. } => match &data_config.encoding {
1013 Some(encoding) => match &encoding.key {
1014 Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1015 None => (None, Some(encoding.value.type_())),
1016 },
1017 None => (None, None),
1018 },
1019 DataSourceDesc::Introspection(_)
1020 | DataSourceDesc::Webhook { .. }
1021 | DataSourceDesc::Progress => (None, None),
1022 }
1023 }
1024
1025 pub fn envelope(&self) -> Option<&str> {
1027 fn envelope_string(envelope: &SourceEnvelope) -> &str {
1032 match envelope {
1033 SourceEnvelope::None(_) => "none",
1034 SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
1035 mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert",
1036 mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
1037 "debezium"
1041 }
1042 mz_storage_types::sources::envelope::UpsertStyle::ValueErrInline { .. } => {
1043 "upsert-value-err-inline"
1044 }
1045 },
1046 SourceEnvelope::CdcV2 => {
1047 "materialize"
1050 }
1051 }
1052 }
1053
1054 match self {
1055 DataSourceDesc::Ingestion { .. } => None,
1060 DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1061 Some(envelope_string(&data_config.envelope))
1062 }
1063 DataSourceDesc::IngestionExport { data_config, .. } => {
1064 Some(envelope_string(&data_config.envelope))
1065 }
1066 DataSourceDesc::Introspection(_)
1067 | DataSourceDesc::Webhook { .. }
1068 | DataSourceDesc::Progress => None,
1069 }
1070 }
1071}
1072
1073#[derive(Debug, Clone, Serialize)]
1074pub struct Source {
1075 pub create_sql: Option<String>,
1077 pub global_id: GlobalId,
1079 #[serde(skip)]
1081 pub data_source: DataSourceDesc,
1082 pub desc: RelationDesc,
1084 pub timeline: Timeline,
1086 pub resolved_ids: ResolvedIds,
1088 pub custom_logical_compaction_window: Option<CompactionWindow>,
1092 pub is_retained_metrics_object: bool,
1095}
1096
1097impl Source {
1098 pub fn new(
1105 plan: CreateSourcePlan,
1106 global_id: GlobalId,
1107 resolved_ids: ResolvedIds,
1108 custom_logical_compaction_window: Option<CompactionWindow>,
1109 is_retained_metrics_object: bool,
1110 ) -> Source {
1111 Source {
1112 create_sql: Some(plan.source.create_sql),
1113 data_source: match plan.source.data_source {
1114 mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1115 desc,
1116 cluster_id: plan
1117 .in_cluster
1118 .expect("ingestion-based sources must be given a cluster ID"),
1119 },
1120 mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1121 desc,
1122 progress_subsource,
1123 data_config,
1124 details,
1125 } => DataSourceDesc::OldSyntaxIngestion {
1126 desc,
1127 cluster_id: plan
1128 .in_cluster
1129 .expect("ingestion-based sources must be given a cluster ID"),
1130 progress_subsource,
1131 data_config,
1132 details,
1133 },
1134 mz_sql::plan::DataSourceDesc::Progress => {
1135 assert!(
1136 plan.in_cluster.is_none(),
1137 "subsources must not have a host config or cluster_id defined"
1138 );
1139 DataSourceDesc::Progress
1140 }
1141 mz_sql::plan::DataSourceDesc::IngestionExport {
1142 ingestion_id,
1143 external_reference,
1144 details,
1145 data_config,
1146 } => {
1147 assert!(
1148 plan.in_cluster.is_none(),
1149 "subsources must not have a host config or cluster_id defined"
1150 );
1151 DataSourceDesc::IngestionExport {
1152 ingestion_id,
1153 external_reference,
1154 details,
1155 data_config,
1156 }
1157 }
1158 mz_sql::plan::DataSourceDesc::Webhook {
1159 validate_using,
1160 body_format,
1161 headers,
1162 cluster_id,
1163 } => {
1164 mz_ore::soft_assert_or_log!(
1165 cluster_id.is_none(),
1166 "cluster_id set at Source level for Webhooks"
1167 );
1168 DataSourceDesc::Webhook {
1169 validate_using,
1170 body_format,
1171 headers,
1172 cluster_id: plan
1173 .in_cluster
1174 .expect("webhook sources must be given a cluster ID"),
1175 }
1176 }
1177 },
1178 desc: plan.source.desc,
1179 global_id,
1180 timeline: plan.timeline,
1181 resolved_ids,
1182 custom_logical_compaction_window: plan
1183 .source
1184 .compaction_window
1185 .or(custom_logical_compaction_window),
1186 is_retained_metrics_object,
1187 }
1188 }
1189
1190 pub fn source_type(&self) -> &str {
1192 match &self.data_source {
1193 DataSourceDesc::Ingestion { desc, .. }
1194 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.name(),
1195 DataSourceDesc::Progress => "progress",
1196 DataSourceDesc::IngestionExport { .. } => "subsource",
1197 DataSourceDesc::Introspection(_) => "source",
1198 DataSourceDesc::Webhook { .. } => "webhook",
1199 }
1200 }
1201
1202 pub fn connection_id(&self) -> Option<CatalogItemId> {
1204 match &self.data_source {
1205 DataSourceDesc::Ingestion { desc, .. }
1206 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.connection_id(),
1207 DataSourceDesc::IngestionExport { .. }
1208 | DataSourceDesc::Introspection(_)
1209 | DataSourceDesc::Webhook { .. }
1210 | DataSourceDesc::Progress => None,
1211 }
1212 }
1213
1214 pub fn global_id(&self) -> GlobalId {
1216 self.global_id
1217 }
1218
1219 pub fn user_controllable_persist_shard_count(&self) -> i64 {
1227 match &self.data_source {
1228 DataSourceDesc::Ingestion { .. } => 0,
1229 DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1230 match &desc.connection {
1231 GenericSourceConnection::Postgres(_)
1235 | GenericSourceConnection::MySql(_)
1236 | GenericSourceConnection::SqlServer(_) => 0,
1237 GenericSourceConnection::LoadGenerator(lg) => match lg.load_generator {
1238 LoadGenerator::Clock
1240 | LoadGenerator::Counter { .. }
1241 | LoadGenerator::Datums
1242 | LoadGenerator::KeyValue(_) => 1,
1243 LoadGenerator::Auction
1244 | LoadGenerator::Marketing
1245 | LoadGenerator::Tpch { .. } => 0,
1246 },
1247 GenericSourceConnection::Kafka(_) => 1,
1248 }
1249 }
1250 DataSourceDesc::IngestionExport { .. } => 1,
1253 DataSourceDesc::Webhook { .. } => 1,
1254 DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => 0,
1257 }
1258 }
1259}
1260
1261#[derive(Debug, Clone, Serialize)]
1262pub struct Log {
1263 pub variant: LogVariant,
1265 pub global_id: GlobalId,
1267}
1268
1269impl Log {
1270 pub fn global_id(&self) -> GlobalId {
1272 self.global_id
1273 }
1274}
1275
1276#[derive(Debug, Clone, Serialize)]
1277pub struct Sink {
1278 pub create_sql: String,
1280 pub global_id: GlobalId,
1282 pub from: GlobalId,
1284 pub connection: StorageSinkConnection<ReferencedConnection>,
1286 pub envelope: SinkEnvelope,
1290 pub with_snapshot: bool,
1292 pub version: u64,
1294 pub resolved_ids: ResolvedIds,
1296 pub cluster_id: ClusterId,
1298 pub commit_interval: Option<Duration>,
1300}
1301
1302impl Sink {
1303 pub fn sink_type(&self) -> &str {
1304 self.connection.name()
1305 }
1306
1307 pub fn envelope(&self) -> Option<&str> {
1309 match &self.envelope {
1310 SinkEnvelope::Debezium => Some("debezium"),
1311 SinkEnvelope::Upsert => Some("upsert"),
1312 }
1313 }
1314
1315 pub fn combined_format(&self) -> Option<Cow<'_, str>> {
1320 match &self.connection {
1321 StorageSinkConnection::Kafka(connection) => Some(connection.format.get_format_name()),
1322 _ => None,
1323 }
1324 }
1325
1326 pub fn formats(&self) -> Option<(Option<&str>, &str)> {
1328 match &self.connection {
1329 StorageSinkConnection::Kafka(connection) => {
1330 let key_format = connection
1331 .format
1332 .key_format
1333 .as_ref()
1334 .map(|f| f.get_format_name());
1335 let value_format = connection.format.value_format.get_format_name();
1336 Some((key_format, value_format))
1337 }
1338 _ => None,
1339 }
1340 }
1341
1342 pub fn connection_id(&self) -> Option<CatalogItemId> {
1343 self.connection.connection_id()
1344 }
1345
1346 pub fn global_id(&self) -> GlobalId {
1348 self.global_id
1349 }
1350}
1351
1352#[derive(Debug, Clone, Serialize)]
1353pub struct View {
1354 pub create_sql: String,
1356 pub global_id: GlobalId,
1358 pub raw_expr: Arc<HirRelationExpr>,
1360 pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1362 pub desc: RelationDesc,
1364 pub conn_id: Option<ConnectionId>,
1366 pub resolved_ids: ResolvedIds,
1368 pub dependencies: DependencyIds,
1370}
1371
1372impl View {
1373 pub fn global_id(&self) -> GlobalId {
1375 self.global_id
1376 }
1377}
1378
1379#[derive(Debug, Clone, Serialize)]
1380pub struct MaterializedView {
1381 pub create_sql: String,
1383 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
1385 pub collections: BTreeMap<RelationVersion, GlobalId>,
1386 pub raw_expr: Arc<HirRelationExpr>,
1388 pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1390 pub desc: VersionedRelationDesc,
1392 pub resolved_ids: ResolvedIds,
1394 pub dependencies: DependencyIds,
1396 pub replacement_target: Option<CatalogItemId>,
1398 pub cluster_id: ClusterId,
1400 pub non_null_assertions: Vec<usize>,
1404 pub custom_logical_compaction_window: Option<CompactionWindow>,
1406 pub refresh_schedule: Option<RefreshSchedule>,
1408 pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1413}
1414
1415impl MaterializedView {
1416 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1418 self.collections.values().copied()
1419 }
1420
1421 pub fn global_id_writes(&self) -> GlobalId {
1424 *self
1425 .collections
1426 .last_key_value()
1427 .expect("at least one version of a materialized view")
1428 .1
1429 }
1430
1431 pub fn collection_descs(
1433 &self,
1434 ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
1435 self.collections.iter().map(|(version, gid)| {
1436 let desc = self
1437 .desc
1438 .at_version(RelationVersionSelector::Specific(*version));
1439 (*gid, *version, desc)
1440 })
1441 }
1442
1443 pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
1445 let (version, _gid) = self
1446 .collections
1447 .iter()
1448 .find(|(_version, gid)| *gid == id)
1449 .expect("GlobalId to exist");
1450 self.desc
1451 .at_version(RelationVersionSelector::Specific(*version))
1452 }
1453
1454 pub fn apply_replacement(&mut self, replacement: Self) {
1456 let target_id = replacement
1457 .replacement_target
1458 .expect("replacement has target");
1459
1460 fn parse(create_sql: &str) -> mz_sql::ast::CreateMaterializedViewStatement<Raw> {
1461 let res = mz_sql::parse::parse(create_sql).unwrap_or_else(|e| {
1462 panic!("invalid create_sql persisted in catalog: {e}\n{create_sql}");
1463 });
1464 if let Statement::CreateMaterializedView(cmvs) = res.into_element().ast {
1465 cmvs
1466 } else {
1467 panic!("invalid MV create_sql persisted in catalog\n{create_sql}");
1468 }
1469 }
1470
1471 let old_stmt = parse(&self.create_sql);
1472 let rpl_stmt = parse(&replacement.create_sql);
1473 let new_stmt = mz_sql::ast::CreateMaterializedViewStatement {
1474 if_exists: old_stmt.if_exists,
1475 name: old_stmt.name,
1476 columns: rpl_stmt.columns,
1477 replacement_for: None,
1478 in_cluster: rpl_stmt.in_cluster,
1479 query: rpl_stmt.query,
1480 as_of: rpl_stmt.as_of,
1481 with_options: rpl_stmt.with_options,
1482 };
1483 let create_sql = new_stmt.to_ast_string_stable();
1484
1485 let mut collections = std::mem::take(&mut self.collections);
1486 let latest_version = collections.keys().max().expect("at least one version");
1490 let new_version = latest_version.bump();
1491 collections.insert(new_version, replacement.global_id_writes());
1492
1493 let mut resolved_ids = replacement.resolved_ids;
1494 resolved_ids.remove_item(&target_id);
1495 let mut dependencies = replacement.dependencies;
1496 dependencies.0.remove(&target_id);
1497
1498 *self = Self {
1499 create_sql,
1500 collections,
1501 raw_expr: replacement.raw_expr,
1502 optimized_expr: replacement.optimized_expr,
1503 desc: replacement.desc,
1504 resolved_ids,
1505 dependencies,
1506 replacement_target: None,
1507 cluster_id: replacement.cluster_id,
1508 non_null_assertions: replacement.non_null_assertions,
1509 custom_logical_compaction_window: replacement.custom_logical_compaction_window,
1510 refresh_schedule: replacement.refresh_schedule,
1511 initial_as_of: replacement.initial_as_of,
1512 };
1513 }
1514}
1515
1516#[derive(Debug, Clone, Serialize)]
1517pub struct Index {
1518 pub create_sql: String,
1520 pub global_id: GlobalId,
1522 pub on: GlobalId,
1524 pub keys: Arc<[MirScalarExpr]>,
1526 pub conn_id: Option<ConnectionId>,
1528 pub resolved_ids: ResolvedIds,
1530 pub cluster_id: ClusterId,
1532 pub custom_logical_compaction_window: Option<CompactionWindow>,
1534 pub is_retained_metrics_object: bool,
1539}
1540
1541impl Index {
1542 pub fn global_id(&self) -> GlobalId {
1544 self.global_id
1545 }
1546}
1547
1548#[derive(Debug, Clone, Serialize)]
1549pub struct Type {
1550 pub create_sql: Option<String>,
1552 pub global_id: GlobalId,
1554 #[serde(skip)]
1555 pub details: CatalogTypeDetails<IdReference>,
1556 pub resolved_ids: ResolvedIds,
1558}
1559
1560#[derive(Debug, Clone, Serialize)]
1561pub struct Func {
1562 #[serde(skip)]
1564 pub inner: &'static mz_sql::func::Func,
1565 pub global_id: GlobalId,
1567}
1568
1569#[derive(Debug, Clone, Serialize)]
1570pub struct Secret {
1571 pub create_sql: String,
1573 pub global_id: GlobalId,
1575}
1576
1577#[derive(Debug, Clone, Serialize)]
1578pub struct Connection {
1579 pub create_sql: String,
1581 pub global_id: GlobalId,
1583 pub details: ConnectionDetails,
1585 pub resolved_ids: ResolvedIds,
1587}
1588
1589impl Connection {
1590 pub fn global_id(&self) -> GlobalId {
1592 self.global_id
1593 }
1594}
1595
1596#[derive(Debug, Clone, Serialize)]
1597pub struct ContinualTask {
1598 pub create_sql: String,
1600 pub global_id: GlobalId,
1602 pub input_id: GlobalId,
1604 pub with_snapshot: bool,
1605 pub raw_expr: Arc<HirRelationExpr>,
1610 pub desc: RelationDesc,
1612 pub resolved_ids: ResolvedIds,
1614 pub dependencies: DependencyIds,
1616 pub cluster_id: ClusterId,
1618 pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1620}
1621
1622impl ContinualTask {
1623 pub fn global_id(&self) -> GlobalId {
1625 self.global_id
1626 }
1627}
1628
1629#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
1630pub struct NetworkPolicy {
1631 pub name: String,
1632 pub id: NetworkPolicyId,
1633 pub oid: u32,
1634 pub rules: Vec<NetworkPolicyRule>,
1635 pub owner_id: RoleId,
1636 pub privileges: PrivilegeMap,
1637}
1638
1639impl From<NetworkPolicy> for durable::NetworkPolicy {
1640 fn from(policy: NetworkPolicy) -> durable::NetworkPolicy {
1641 durable::NetworkPolicy {
1642 id: policy.id,
1643 oid: policy.oid,
1644 name: policy.name,
1645 rules: policy.rules,
1646 owner_id: policy.owner_id,
1647 privileges: policy.privileges.into_all_values().collect(),
1648 }
1649 }
1650}
1651
1652impl From<durable::NetworkPolicy> for NetworkPolicy {
1653 fn from(
1654 durable::NetworkPolicy {
1655 id,
1656 oid,
1657 name,
1658 rules,
1659 owner_id,
1660 privileges,
1661 }: durable::NetworkPolicy,
1662 ) -> Self {
1663 NetworkPolicy {
1664 id,
1665 oid,
1666 name,
1667 rules,
1668 owner_id,
1669 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1670 }
1671 }
1672}
1673
1674impl UpdateFrom<durable::NetworkPolicy> for NetworkPolicy {
1675 fn update_from(
1676 &mut self,
1677 durable::NetworkPolicy {
1678 id,
1679 oid,
1680 name,
1681 rules,
1682 owner_id,
1683 privileges,
1684 }: durable::NetworkPolicy,
1685 ) {
1686 self.id = id;
1687 self.oid = oid;
1688 self.name = name;
1689 self.rules = rules;
1690 self.owner_id = owner_id;
1691 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1692 }
1693}
1694
1695impl CatalogItem {
1696 pub fn typ(&self) -> mz_sql::catalog::CatalogItemType {
1698 match self {
1699 CatalogItem::Table(_) => CatalogItemType::Table,
1700 CatalogItem::Source(_) => CatalogItemType::Source,
1701 CatalogItem::Log(_) => CatalogItemType::Source,
1702 CatalogItem::Sink(_) => CatalogItemType::Sink,
1703 CatalogItem::View(_) => CatalogItemType::View,
1704 CatalogItem::MaterializedView(_) => CatalogItemType::MaterializedView,
1705 CatalogItem::Index(_) => CatalogItemType::Index,
1706 CatalogItem::Type(_) => CatalogItemType::Type,
1707 CatalogItem::Func(_) => CatalogItemType::Func,
1708 CatalogItem::Secret(_) => CatalogItemType::Secret,
1709 CatalogItem::Connection(_) => CatalogItemType::Connection,
1710 CatalogItem::ContinualTask(_) => CatalogItemType::ContinualTask,
1711 }
1712 }
1713
1714 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1716 let gid = match self {
1717 CatalogItem::Source(source) => source.global_id,
1718 CatalogItem::Log(log) => log.global_id,
1719 CatalogItem::Sink(sink) => sink.global_id,
1720 CatalogItem::View(view) => view.global_id,
1721 CatalogItem::MaterializedView(mv) => {
1722 return itertools::Either::Left(mv.collections.values().copied());
1723 }
1724 CatalogItem::ContinualTask(ct) => ct.global_id,
1725 CatalogItem::Index(index) => index.global_id,
1726 CatalogItem::Func(func) => func.global_id,
1727 CatalogItem::Type(ty) => ty.global_id,
1728 CatalogItem::Secret(secret) => secret.global_id,
1729 CatalogItem::Connection(conn) => conn.global_id,
1730 CatalogItem::Table(table) => {
1731 return itertools::Either::Left(table.collections.values().copied());
1732 }
1733 };
1734 itertools::Either::Right(std::iter::once(gid))
1735 }
1736
1737 pub fn latest_global_id(&self) -> GlobalId {
1741 match self {
1742 CatalogItem::Source(source) => source.global_id,
1743 CatalogItem::Log(log) => log.global_id,
1744 CatalogItem::Sink(sink) => sink.global_id,
1745 CatalogItem::View(view) => view.global_id,
1746 CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
1747 CatalogItem::ContinualTask(ct) => ct.global_id,
1748 CatalogItem::Index(index) => index.global_id,
1749 CatalogItem::Func(func) => func.global_id,
1750 CatalogItem::Type(ty) => ty.global_id,
1751 CatalogItem::Secret(secret) => secret.global_id,
1752 CatalogItem::Connection(conn) => conn.global_id,
1753 CatalogItem::Table(table) => table.global_id_writes(),
1754 }
1755 }
1756
1757 pub fn is_storage_collection(&self) -> bool {
1759 match self {
1760 CatalogItem::Table(_)
1761 | CatalogItem::Source(_)
1762 | CatalogItem::MaterializedView(_)
1763 | CatalogItem::Sink(_)
1764 | CatalogItem::ContinualTask(_) => true,
1765 CatalogItem::Log(_)
1766 | CatalogItem::View(_)
1767 | CatalogItem::Index(_)
1768 | CatalogItem::Type(_)
1769 | CatalogItem::Func(_)
1770 | CatalogItem::Secret(_)
1771 | CatalogItem::Connection(_) => false,
1772 }
1773 }
1774
1775 pub fn relation_desc(&self, version: RelationVersionSelector) -> Option<Cow<'_, RelationDesc>> {
1784 match &self {
1785 CatalogItem::Source(src) => Some(Cow::Borrowed(&src.desc)),
1786 CatalogItem::Log(log) => Some(Cow::Owned(log.variant.desc())),
1787 CatalogItem::Table(tbl) => Some(Cow::Owned(tbl.desc.at_version(version))),
1788 CatalogItem::View(view) => Some(Cow::Borrowed(&view.desc)),
1789 CatalogItem::MaterializedView(mview) => {
1790 Some(Cow::Owned(mview.desc.at_version(version)))
1791 }
1792 CatalogItem::ContinualTask(ct) => Some(Cow::Borrowed(&ct.desc)),
1793 CatalogItem::Func(_)
1794 | CatalogItem::Index(_)
1795 | CatalogItem::Sink(_)
1796 | CatalogItem::Secret(_)
1797 | CatalogItem::Connection(_)
1798 | CatalogItem::Type(_) => None,
1799 }
1800 }
1801
1802 pub fn func(
1803 &self,
1804 entry: &CatalogEntry,
1805 ) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
1806 match &self {
1807 CatalogItem::Func(func) => Ok(func.inner),
1808 _ => Err(SqlCatalogError::UnexpectedType {
1809 name: entry.name().item.to_string(),
1810 actual_type: entry.item_type(),
1811 expected_type: CatalogItemType::Func,
1812 }),
1813 }
1814 }
1815
1816 pub fn source_desc(
1817 &self,
1818 entry: &CatalogEntry,
1819 ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
1820 match &self {
1821 CatalogItem::Source(source) => match &source.data_source {
1822 DataSourceDesc::Ingestion { desc, .. }
1823 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => Ok(Some(desc)),
1824 DataSourceDesc::IngestionExport { .. }
1825 | DataSourceDesc::Introspection(_)
1826 | DataSourceDesc::Webhook { .. }
1827 | DataSourceDesc::Progress => Ok(None),
1828 },
1829 _ => Err(SqlCatalogError::UnexpectedType {
1830 name: entry.name().item.to_string(),
1831 actual_type: entry.item_type(),
1832 expected_type: CatalogItemType::Source,
1833 }),
1834 }
1835 }
1836
1837 pub fn is_progress_source(&self) -> bool {
1839 matches!(
1840 self,
1841 CatalogItem::Source(Source {
1842 data_source: DataSourceDesc::Progress,
1843 ..
1844 })
1845 )
1846 }
1847
1848 pub fn references(&self) -> &ResolvedIds {
1851 static EMPTY: LazyLock<ResolvedIds> = LazyLock::new(ResolvedIds::empty);
1852 match self {
1853 CatalogItem::Func(_) => &*EMPTY,
1854 CatalogItem::Index(idx) => &idx.resolved_ids,
1855 CatalogItem::Sink(sink) => &sink.resolved_ids,
1856 CatalogItem::Source(source) => &source.resolved_ids,
1857 CatalogItem::Log(_) => &*EMPTY,
1858 CatalogItem::Table(table) => &table.resolved_ids,
1859 CatalogItem::Type(typ) => &typ.resolved_ids,
1860 CatalogItem::View(view) => &view.resolved_ids,
1861 CatalogItem::MaterializedView(mview) => &mview.resolved_ids,
1862 CatalogItem::Secret(_) => &*EMPTY,
1863 CatalogItem::Connection(connection) => &connection.resolved_ids,
1864 CatalogItem::ContinualTask(ct) => &ct.resolved_ids,
1865 }
1866 }
1867
1868 pub fn uses(&self) -> BTreeSet<CatalogItemId> {
1874 let mut uses: BTreeSet<_> = self.references().items().copied().collect();
1875 match self {
1876 CatalogItem::Func(_) => {}
1879 CatalogItem::Index(_) => {}
1880 CatalogItem::Sink(_) => {}
1881 CatalogItem::Source(_) => {}
1882 CatalogItem::Log(_) => {}
1883 CatalogItem::Table(_) => {}
1884 CatalogItem::Type(_) => {}
1885 CatalogItem::View(view) => uses.extend(view.dependencies.0.iter().copied()),
1886 CatalogItem::MaterializedView(mview) => {
1887 uses.extend(mview.dependencies.0.iter().copied())
1888 }
1889 CatalogItem::ContinualTask(ct) => uses.extend(ct.dependencies.0.iter().copied()),
1890 CatalogItem::Secret(_) => {}
1891 CatalogItem::Connection(_) => {}
1892 }
1893 uses
1894 }
1895
1896 pub fn conn_id(&self) -> Option<&ConnectionId> {
1899 match self {
1900 CatalogItem::View(view) => view.conn_id.as_ref(),
1901 CatalogItem::Index(index) => index.conn_id.as_ref(),
1902 CatalogItem::Table(table) => table.conn_id.as_ref(),
1903 CatalogItem::Log(_)
1904 | CatalogItem::Source(_)
1905 | CatalogItem::Sink(_)
1906 | CatalogItem::MaterializedView(_)
1907 | CatalogItem::Secret(_)
1908 | CatalogItem::Type(_)
1909 | CatalogItem::Func(_)
1910 | CatalogItem::Connection(_)
1911 | CatalogItem::ContinualTask(_) => None,
1912 }
1913 }
1914
1915 pub fn set_conn_id(&mut self, conn_id: Option<ConnectionId>) {
1918 match self {
1919 CatalogItem::View(view) => view.conn_id = conn_id,
1920 CatalogItem::Index(index) => index.conn_id = conn_id,
1921 CatalogItem::Table(table) => table.conn_id = conn_id,
1922 CatalogItem::Log(_)
1923 | CatalogItem::Source(_)
1924 | CatalogItem::Sink(_)
1925 | CatalogItem::MaterializedView(_)
1926 | CatalogItem::Secret(_)
1927 | CatalogItem::Type(_)
1928 | CatalogItem::Func(_)
1929 | CatalogItem::Connection(_)
1930 | CatalogItem::ContinualTask(_) => (),
1931 }
1932 }
1933
1934 pub fn is_temporary(&self) -> bool {
1936 self.conn_id().is_some()
1937 }
1938
1939 pub fn rename_schema_refs(
1940 &self,
1941 database_name: &str,
1942 cur_schema_name: &str,
1943 new_schema_name: &str,
1944 ) -> Result<CatalogItem, (String, String)> {
1945 let do_rewrite = |create_sql: String| -> Result<String, (String, String)> {
1946 let mut create_stmt = mz_sql::parse::parse(&create_sql)
1947 .expect("invalid create sql persisted to catalog")
1948 .into_element()
1949 .ast;
1950
1951 mz_sql::ast::transform::create_stmt_rename_schema_refs(
1953 &mut create_stmt,
1954 database_name,
1955 cur_schema_name,
1956 new_schema_name,
1957 )?;
1958
1959 Ok(create_stmt.to_ast_string_stable())
1960 };
1961
1962 match self {
1963 CatalogItem::Table(i) => {
1964 let mut i = i.clone();
1965 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1966 Ok(CatalogItem::Table(i))
1967 }
1968 CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
1969 CatalogItem::Source(i) => {
1970 let mut i = i.clone();
1971 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1972 Ok(CatalogItem::Source(i))
1973 }
1974 CatalogItem::Sink(i) => {
1975 let mut i = i.clone();
1976 i.create_sql = do_rewrite(i.create_sql)?;
1977 Ok(CatalogItem::Sink(i))
1978 }
1979 CatalogItem::View(i) => {
1980 let mut i = i.clone();
1981 i.create_sql = do_rewrite(i.create_sql)?;
1982 Ok(CatalogItem::View(i))
1983 }
1984 CatalogItem::MaterializedView(i) => {
1985 let mut i = i.clone();
1986 i.create_sql = do_rewrite(i.create_sql)?;
1987 Ok(CatalogItem::MaterializedView(i))
1988 }
1989 CatalogItem::Index(i) => {
1990 let mut i = i.clone();
1991 i.create_sql = do_rewrite(i.create_sql)?;
1992 Ok(CatalogItem::Index(i))
1993 }
1994 CatalogItem::Secret(i) => {
1995 let mut i = i.clone();
1996 i.create_sql = do_rewrite(i.create_sql)?;
1997 Ok(CatalogItem::Secret(i))
1998 }
1999 CatalogItem::Connection(i) => {
2000 let mut i = i.clone();
2001 i.create_sql = do_rewrite(i.create_sql)?;
2002 Ok(CatalogItem::Connection(i))
2003 }
2004 CatalogItem::Type(i) => {
2005 let mut i = i.clone();
2006 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2007 Ok(CatalogItem::Type(i))
2008 }
2009 CatalogItem::Func(i) => Ok(CatalogItem::Func(i.clone())),
2010 CatalogItem::ContinualTask(i) => {
2011 let mut i = i.clone();
2012 i.create_sql = do_rewrite(i.create_sql)?;
2013 Ok(CatalogItem::ContinualTask(i))
2014 }
2015 }
2016 }
2017
2018 pub fn rename_item_refs(
2022 &self,
2023 from: FullItemName,
2024 to_item_name: String,
2025 rename_self: bool,
2026 ) -> Result<CatalogItem, String> {
2027 let do_rewrite = |create_sql: String| -> Result<String, String> {
2028 let mut create_stmt = mz_sql::parse::parse(&create_sql)
2029 .expect("invalid create sql persisted to catalog")
2030 .into_element()
2031 .ast;
2032 if rename_self {
2033 mz_sql::ast::transform::create_stmt_rename(&mut create_stmt, to_item_name.clone());
2034 }
2035 mz_sql::ast::transform::create_stmt_rename_refs(&mut create_stmt, from, to_item_name)?;
2037 Ok(create_stmt.to_ast_string_stable())
2038 };
2039
2040 match self {
2041 CatalogItem::Table(i) => {
2042 let mut i = i.clone();
2043 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2044 Ok(CatalogItem::Table(i))
2045 }
2046 CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
2047 CatalogItem::Source(i) => {
2048 let mut i = i.clone();
2049 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2050 Ok(CatalogItem::Source(i))
2051 }
2052 CatalogItem::Sink(i) => {
2053 let mut i = i.clone();
2054 i.create_sql = do_rewrite(i.create_sql)?;
2055 Ok(CatalogItem::Sink(i))
2056 }
2057 CatalogItem::View(i) => {
2058 let mut i = i.clone();
2059 i.create_sql = do_rewrite(i.create_sql)?;
2060 Ok(CatalogItem::View(i))
2061 }
2062 CatalogItem::MaterializedView(i) => {
2063 let mut i = i.clone();
2064 i.create_sql = do_rewrite(i.create_sql)?;
2065 Ok(CatalogItem::MaterializedView(i))
2066 }
2067 CatalogItem::Index(i) => {
2068 let mut i = i.clone();
2069 i.create_sql = do_rewrite(i.create_sql)?;
2070 Ok(CatalogItem::Index(i))
2071 }
2072 CatalogItem::Secret(i) => {
2073 let mut i = i.clone();
2074 i.create_sql = do_rewrite(i.create_sql)?;
2075 Ok(CatalogItem::Secret(i))
2076 }
2077 CatalogItem::Func(_) | CatalogItem::Type(_) => {
2078 unreachable!("{}s cannot be renamed", self.typ())
2079 }
2080 CatalogItem::Connection(i) => {
2081 let mut i = i.clone();
2082 i.create_sql = do_rewrite(i.create_sql)?;
2083 Ok(CatalogItem::Connection(i))
2084 }
2085 CatalogItem::ContinualTask(i) => {
2086 let mut i = i.clone();
2087 i.create_sql = do_rewrite(i.create_sql)?;
2088 Ok(CatalogItem::ContinualTask(i))
2089 }
2090 }
2091 }
2092
2093 pub fn replace_item_refs(&self, old_id: CatalogItemId, new_id: CatalogItemId) -> CatalogItem {
2095 let do_rewrite = |create_sql: String| -> String {
2096 let mut create_stmt = mz_sql::parse::parse(&create_sql)
2097 .expect("invalid create sql persisted to catalog")
2098 .into_element()
2099 .ast;
2100 mz_sql::ast::transform::create_stmt_replace_ids(
2101 &mut create_stmt,
2102 &[(old_id, new_id)].into(),
2103 );
2104 create_stmt.to_ast_string_stable()
2105 };
2106
2107 match self {
2108 CatalogItem::Table(i) => {
2109 let mut i = i.clone();
2110 i.create_sql = i.create_sql.map(do_rewrite);
2111 CatalogItem::Table(i)
2112 }
2113 CatalogItem::Log(i) => CatalogItem::Log(i.clone()),
2114 CatalogItem::Source(i) => {
2115 let mut i = i.clone();
2116 i.create_sql = i.create_sql.map(do_rewrite);
2117 CatalogItem::Source(i)
2118 }
2119 CatalogItem::Sink(i) => {
2120 let mut i = i.clone();
2121 i.create_sql = do_rewrite(i.create_sql);
2122 CatalogItem::Sink(i)
2123 }
2124 CatalogItem::View(i) => {
2125 let mut i = i.clone();
2126 i.create_sql = do_rewrite(i.create_sql);
2127 CatalogItem::View(i)
2128 }
2129 CatalogItem::MaterializedView(i) => {
2130 let mut i = i.clone();
2131 i.create_sql = do_rewrite(i.create_sql);
2132 CatalogItem::MaterializedView(i)
2133 }
2134 CatalogItem::Index(i) => {
2135 let mut i = i.clone();
2136 i.create_sql = do_rewrite(i.create_sql);
2137 CatalogItem::Index(i)
2138 }
2139 CatalogItem::Secret(i) => {
2140 let mut i = i.clone();
2141 i.create_sql = do_rewrite(i.create_sql);
2142 CatalogItem::Secret(i)
2143 }
2144 CatalogItem::Func(_) | CatalogItem::Type(_) => {
2145 unreachable!("references of {}s cannot be replaced", self.typ())
2146 }
2147 CatalogItem::Connection(i) => {
2148 let mut i = i.clone();
2149 i.create_sql = do_rewrite(i.create_sql);
2150 CatalogItem::Connection(i)
2151 }
2152 CatalogItem::ContinualTask(i) => {
2153 let mut i = i.clone();
2154 i.create_sql = do_rewrite(i.create_sql);
2155 CatalogItem::ContinualTask(i)
2156 }
2157 }
2158 }
2159 pub fn update_retain_history(
2162 &mut self,
2163 value: Option<Value>,
2164 window: CompactionWindow,
2165 ) -> Result<Option<WithOptionValue<Raw>>, ()> {
2166 let update = |mut ast: &mut Statement<Raw>| {
2167 macro_rules! update_retain_history {
2169 ( $stmt:ident, $opt:ident, $name:ident ) => {{
2170 let pos = $stmt
2172 .with_options
2173 .iter()
2174 .rposition(|o| o.name == mz_sql_parser::ast::$name::RetainHistory);
2176 if let Some(value) = value {
2177 let next = mz_sql_parser::ast::$opt {
2178 name: mz_sql_parser::ast::$name::RetainHistory,
2179 value: Some(WithOptionValue::RetainHistoryFor(value)),
2180 };
2181 if let Some(idx) = pos {
2182 let previous = $stmt.with_options[idx].clone();
2183 $stmt.with_options[idx] = next;
2184 previous.value
2185 } else {
2186 $stmt.with_options.push(next);
2187 None
2188 }
2189 } else {
2190 if let Some(idx) = pos {
2191 $stmt.with_options.swap_remove(idx).value
2192 } else {
2193 None
2194 }
2195 }
2196 }};
2197 }
2198 let previous = match &mut ast {
2199 Statement::CreateTable(stmt) => {
2200 update_retain_history!(stmt, TableOption, TableOptionName)
2201 }
2202 Statement::CreateIndex(stmt) => {
2203 update_retain_history!(stmt, IndexOption, IndexOptionName)
2204 }
2205 Statement::CreateSource(stmt) => {
2206 update_retain_history!(stmt, CreateSourceOption, CreateSourceOptionName)
2207 }
2208 Statement::CreateMaterializedView(stmt) => {
2209 update_retain_history!(stmt, MaterializedViewOption, MaterializedViewOptionName)
2210 }
2211 _ => {
2212 return Err(());
2213 }
2214 };
2215 Ok(previous)
2216 };
2217
2218 let res = self.update_sql(update)?;
2219 let cw = self
2220 .custom_logical_compaction_window_mut()
2221 .expect("item must have compaction window");
2222 *cw = Some(window);
2223 Ok(res)
2224 }
2225
2226 pub fn add_column(
2227 &mut self,
2228 name: ColumnName,
2229 typ: SqlColumnType,
2230 sql: RawDataType,
2231 ) -> Result<RelationVersion, PlanError> {
2232 let CatalogItem::Table(table) = self else {
2233 return Err(PlanError::Unsupported {
2234 feature: "adding columns to a non-Table".to_string(),
2235 discussion_no: None,
2236 });
2237 };
2238 let next_version = table.desc.add_column(name.clone(), typ);
2239
2240 let update = |mut ast: &mut Statement<Raw>| match &mut ast {
2241 Statement::CreateTable(stmt) => {
2242 let version = ColumnOptionDef {
2243 name: None,
2244 option: ColumnOption::Versioned {
2245 action: ColumnVersioned::Added,
2246 version: next_version.into(),
2247 },
2248 };
2249 let column = ColumnDef {
2250 name: name.into(),
2251 data_type: sql,
2252 collation: None,
2253 options: vec![version],
2254 };
2255 stmt.columns.push(column);
2256 Ok(())
2257 }
2258 _ => Err(()),
2259 };
2260
2261 self.update_sql(update)
2262 .map_err(|()| PlanError::Unstructured("expected CREATE TABLE statement".to_string()))?;
2263 Ok(next_version)
2264 }
2265
2266 pub fn update_sql<F, T>(&mut self, f: F) -> Result<T, ()>
2269 where
2270 F: FnOnce(&mut Statement<Raw>) -> Result<T, ()>,
2271 {
2272 let create_sql = match self {
2273 CatalogItem::Table(Table { create_sql, .. })
2274 | CatalogItem::Type(Type { create_sql, .. })
2275 | CatalogItem::Source(Source { create_sql, .. }) => create_sql.as_mut(),
2276 CatalogItem::Sink(Sink { create_sql, .. })
2277 | CatalogItem::View(View { create_sql, .. })
2278 | CatalogItem::MaterializedView(MaterializedView { create_sql, .. })
2279 | CatalogItem::Index(Index { create_sql, .. })
2280 | CatalogItem::Secret(Secret { create_sql, .. })
2281 | CatalogItem::Connection(Connection { create_sql, .. })
2282 | CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => Some(create_sql),
2283 CatalogItem::Func(_) | CatalogItem::Log(_) => None,
2284 };
2285 let Some(create_sql) = create_sql else {
2286 return Err(());
2287 };
2288 let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
2289 .expect("non-system items must be parseable")
2290 .into_element()
2291 .ast;
2292 debug!("rewrite: {}", ast.to_ast_string_redacted());
2293 let t = f(&mut ast)?;
2294 *create_sql = ast.to_ast_string_stable();
2295 debug!("rewrote: {}", ast.to_ast_string_redacted());
2296 Ok(t)
2297 }
2298
2299 pub fn is_compute_object_on_cluster(&self) -> Option<ClusterId> {
2306 match self {
2307 CatalogItem::Index(index) => Some(index.cluster_id),
2308 CatalogItem::Table(_)
2309 | CatalogItem::Source(_)
2310 | CatalogItem::Log(_)
2311 | CatalogItem::View(_)
2312 | CatalogItem::MaterializedView(_)
2313 | CatalogItem::Sink(_)
2314 | CatalogItem::Type(_)
2315 | CatalogItem::Func(_)
2316 | CatalogItem::Secret(_)
2317 | CatalogItem::Connection(_)
2318 | CatalogItem::ContinualTask(_) => None,
2319 }
2320 }
2321
2322 pub fn cluster_id(&self) -> Option<ClusterId> {
2323 match self {
2324 CatalogItem::MaterializedView(mv) => Some(mv.cluster_id),
2325 CatalogItem::Index(index) => Some(index.cluster_id),
2326 CatalogItem::Source(source) => match &source.data_source {
2327 DataSourceDesc::Ingestion { cluster_id, .. }
2328 | DataSourceDesc::OldSyntaxIngestion { cluster_id, .. } => Some(*cluster_id),
2329 DataSourceDesc::IngestionExport { .. } => None,
2333 DataSourceDesc::Webhook { cluster_id, .. } => Some(*cluster_id),
2334 DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => None,
2335 },
2336 CatalogItem::Sink(sink) => Some(sink.cluster_id),
2337 CatalogItem::ContinualTask(ct) => Some(ct.cluster_id),
2338 CatalogItem::Table(_)
2339 | CatalogItem::Log(_)
2340 | CatalogItem::View(_)
2341 | CatalogItem::Type(_)
2342 | CatalogItem::Func(_)
2343 | CatalogItem::Secret(_)
2344 | CatalogItem::Connection(_) => None,
2345 }
2346 }
2347
2348 pub fn custom_logical_compaction_window(&self) -> Option<CompactionWindow> {
2351 match self {
2352 CatalogItem::Table(table) => table.custom_logical_compaction_window,
2353 CatalogItem::Source(source) => source.custom_logical_compaction_window,
2354 CatalogItem::Index(index) => index.custom_logical_compaction_window,
2355 CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
2356 CatalogItem::Log(_)
2357 | CatalogItem::View(_)
2358 | CatalogItem::Sink(_)
2359 | CatalogItem::Type(_)
2360 | CatalogItem::Func(_)
2361 | CatalogItem::Secret(_)
2362 | CatalogItem::Connection(_)
2363 | CatalogItem::ContinualTask(_) => None,
2364 }
2365 }
2366
2367 pub fn custom_logical_compaction_window_mut(
2371 &mut self,
2372 ) -> Option<&mut Option<CompactionWindow>> {
2373 let cw = match self {
2374 CatalogItem::Table(table) => &mut table.custom_logical_compaction_window,
2375 CatalogItem::Source(source) => &mut source.custom_logical_compaction_window,
2376 CatalogItem::Index(index) => &mut index.custom_logical_compaction_window,
2377 CatalogItem::MaterializedView(mview) => &mut mview.custom_logical_compaction_window,
2378 CatalogItem::Log(_)
2379 | CatalogItem::View(_)
2380 | CatalogItem::Sink(_)
2381 | CatalogItem::Type(_)
2382 | CatalogItem::Func(_)
2383 | CatalogItem::Secret(_)
2384 | CatalogItem::Connection(_)
2385 | CatalogItem::ContinualTask(_) => return None,
2386 };
2387 Some(cw)
2388 }
2389
2390 pub fn initial_logical_compaction_window(&self) -> Option<CompactionWindow> {
2398 let custom_logical_compaction_window = match self {
2399 CatalogItem::Table(_)
2400 | CatalogItem::Source(_)
2401 | CatalogItem::Index(_)
2402 | CatalogItem::MaterializedView(_)
2403 | CatalogItem::ContinualTask(_) => self.custom_logical_compaction_window(),
2404 CatalogItem::Log(_)
2405 | CatalogItem::View(_)
2406 | CatalogItem::Sink(_)
2407 | CatalogItem::Type(_)
2408 | CatalogItem::Func(_)
2409 | CatalogItem::Secret(_)
2410 | CatalogItem::Connection(_) => return None,
2411 };
2412 Some(custom_logical_compaction_window.unwrap_or(CompactionWindow::Default))
2413 }
2414
2415 pub fn is_retained_metrics_object(&self) -> bool {
2419 match self {
2420 CatalogItem::Table(table) => table.is_retained_metrics_object,
2421 CatalogItem::Source(source) => source.is_retained_metrics_object,
2422 CatalogItem::Index(index) => index.is_retained_metrics_object,
2423 CatalogItem::Log(_)
2424 | CatalogItem::View(_)
2425 | CatalogItem::MaterializedView(_)
2426 | CatalogItem::Sink(_)
2427 | CatalogItem::Type(_)
2428 | CatalogItem::Func(_)
2429 | CatalogItem::Secret(_)
2430 | CatalogItem::Connection(_)
2431 | CatalogItem::ContinualTask(_) => false,
2432 }
2433 }
2434
2435 pub fn to_serialized(&self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2436 match self {
2437 CatalogItem::Table(table) => {
2438 let create_sql = table
2439 .create_sql
2440 .clone()
2441 .expect("builtin tables cannot be serialized");
2442 let mut collections = table.collections.clone();
2443 let global_id = collections
2444 .remove(&RelationVersion::root())
2445 .expect("at least one version");
2446 (create_sql, global_id, collections)
2447 }
2448 CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2449 CatalogItem::Source(source) => {
2450 assert!(
2451 !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2452 "cannot serialize introspection/builtin sources",
2453 );
2454 let create_sql = source
2455 .create_sql
2456 .clone()
2457 .expect("builtin sources cannot be serialized");
2458 (create_sql, source.global_id, BTreeMap::new())
2459 }
2460 CatalogItem::View(view) => (view.create_sql.clone(), view.global_id, BTreeMap::new()),
2461 CatalogItem::MaterializedView(mview) => {
2462 let mut collections = mview.collections.clone();
2463 let global_id = collections
2464 .remove(&RelationVersion::root())
2465 .expect("at least one version");
2466 (mview.create_sql.clone(), global_id, collections)
2467 }
2468 CatalogItem::Index(index) => {
2469 (index.create_sql.clone(), index.global_id, BTreeMap::new())
2470 }
2471 CatalogItem::Sink(sink) => (sink.create_sql.clone(), sink.global_id, BTreeMap::new()),
2472 CatalogItem::Type(typ) => {
2473 let create_sql = typ
2474 .create_sql
2475 .clone()
2476 .expect("builtin types cannot be serialized");
2477 (create_sql, typ.global_id, BTreeMap::new())
2478 }
2479 CatalogItem::Secret(secret) => {
2480 (secret.create_sql.clone(), secret.global_id, BTreeMap::new())
2481 }
2482 CatalogItem::Connection(connection) => (
2483 connection.create_sql.clone(),
2484 connection.global_id,
2485 BTreeMap::new(),
2486 ),
2487 CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2488 CatalogItem::ContinualTask(ct) => {
2489 (ct.create_sql.clone(), ct.global_id, BTreeMap::new())
2490 }
2491 }
2492 }
2493
2494 pub fn into_serialized(self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2495 match self {
2496 CatalogItem::Table(mut table) => {
2497 let create_sql = table
2498 .create_sql
2499 .expect("builtin tables cannot be serialized");
2500 let global_id = table
2501 .collections
2502 .remove(&RelationVersion::root())
2503 .expect("at least one version");
2504 (create_sql, global_id, table.collections)
2505 }
2506 CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2507 CatalogItem::Source(source) => {
2508 assert!(
2509 !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2510 "cannot serialize introspection/builtin sources",
2511 );
2512 let create_sql = source
2513 .create_sql
2514 .expect("builtin sources cannot be serialized");
2515 (create_sql, source.global_id, BTreeMap::new())
2516 }
2517 CatalogItem::View(view) => (view.create_sql, view.global_id, BTreeMap::new()),
2518 CatalogItem::MaterializedView(mut mview) => {
2519 let global_id = mview
2520 .collections
2521 .remove(&RelationVersion::root())
2522 .expect("at least one version");
2523 (mview.create_sql, global_id, mview.collections)
2524 }
2525 CatalogItem::Index(index) => (index.create_sql, index.global_id, BTreeMap::new()),
2526 CatalogItem::Sink(sink) => (sink.create_sql, sink.global_id, BTreeMap::new()),
2527 CatalogItem::Type(typ) => {
2528 let create_sql = typ.create_sql.expect("builtin types cannot be serialized");
2529 (create_sql, typ.global_id, BTreeMap::new())
2530 }
2531 CatalogItem::Secret(secret) => (secret.create_sql, secret.global_id, BTreeMap::new()),
2532 CatalogItem::Connection(connection) => {
2533 (connection.create_sql, connection.global_id, BTreeMap::new())
2534 }
2535 CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2536 CatalogItem::ContinualTask(ct) => (ct.create_sql, ct.global_id, BTreeMap::new()),
2537 }
2538 }
2539
2540 pub fn global_id_for_version(&self, version: RelationVersionSelector) -> Option<GlobalId> {
2543 let collections = match self {
2544 CatalogItem::MaterializedView(mv) => &mv.collections,
2545 CatalogItem::Table(table) => &table.collections,
2546 CatalogItem::Source(source) => return Some(source.global_id),
2547 CatalogItem::Log(log) => return Some(log.global_id),
2548 CatalogItem::View(view) => return Some(view.global_id),
2549 CatalogItem::Sink(sink) => return Some(sink.global_id),
2550 CatalogItem::Index(index) => return Some(index.global_id),
2551 CatalogItem::Type(ty) => return Some(ty.global_id),
2552 CatalogItem::Func(func) => return Some(func.global_id),
2553 CatalogItem::Secret(secret) => return Some(secret.global_id),
2554 CatalogItem::Connection(conn) => return Some(conn.global_id),
2555 CatalogItem::ContinualTask(ct) => return Some(ct.global_id),
2556 };
2557 match version {
2558 RelationVersionSelector::Latest => collections.values().last().copied(),
2559 RelationVersionSelector::Specific(version) => collections.get(&version).copied(),
2560 }
2561 }
2562}
2563
2564impl CatalogEntry {
2565 pub fn relation_desc_latest(&self) -> Option<Cow<'_, RelationDesc>> {
2568 self.item.relation_desc(RelationVersionSelector::Latest)
2569 }
2570
2571 pub fn has_columns(&self) -> bool {
2573 match self.item() {
2574 CatalogItem::Type(Type { details, .. }) => {
2575 matches!(details.typ, CatalogType::Record { .. })
2576 }
2577 _ => self.relation_desc_latest().is_some(),
2578 }
2579 }
2580
2581 pub fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
2583 self.item.func(self)
2584 }
2585
2586 pub fn index(&self) -> Option<&Index> {
2588 match self.item() {
2589 CatalogItem::Index(idx) => Some(idx),
2590 _ => None,
2591 }
2592 }
2593
2594 pub fn materialized_view(&self) -> Option<&MaterializedView> {
2596 match self.item() {
2597 CatalogItem::MaterializedView(mv) => Some(mv),
2598 _ => None,
2599 }
2600 }
2601
2602 pub fn table(&self) -> Option<&Table> {
2604 match self.item() {
2605 CatalogItem::Table(tbl) => Some(tbl),
2606 _ => None,
2607 }
2608 }
2609
2610 pub fn source(&self) -> Option<&Source> {
2612 match self.item() {
2613 CatalogItem::Source(src) => Some(src),
2614 _ => None,
2615 }
2616 }
2617
2618 pub fn sink(&self) -> Option<&Sink> {
2620 match self.item() {
2621 CatalogItem::Sink(sink) => Some(sink),
2622 _ => None,
2623 }
2624 }
2625
2626 pub fn secret(&self) -> Option<&Secret> {
2628 match self.item() {
2629 CatalogItem::Secret(secret) => Some(secret),
2630 _ => None,
2631 }
2632 }
2633
2634 pub fn connection(&self) -> Result<&Connection, SqlCatalogError> {
2635 match self.item() {
2636 CatalogItem::Connection(connection) => Ok(connection),
2637 _ => {
2638 let db_name = match self.name().qualifiers.database_spec {
2639 ResolvedDatabaseSpecifier::Ambient => "".to_string(),
2640 ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
2641 };
2642 Err(SqlCatalogError::UnknownConnection(format!(
2643 "{}{}.{}",
2644 db_name,
2645 self.name().qualifiers.schema_spec,
2646 self.name().item
2647 )))
2648 }
2649 }
2650 }
2651
2652 pub fn source_desc(
2655 &self,
2656 ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
2657 self.item.source_desc(self)
2658 }
2659
2660 pub fn is_connection(&self) -> bool {
2662 matches!(self.item(), CatalogItem::Connection(_))
2663 }
2664
2665 pub fn is_table(&self) -> bool {
2667 matches!(self.item(), CatalogItem::Table(_))
2668 }
2669
2670 pub fn is_source(&self) -> bool {
2673 matches!(self.item(), CatalogItem::Source(_))
2674 }
2675
2676 pub fn subsource_details(
2679 &self,
2680 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
2681 match &self.item() {
2682 CatalogItem::Source(source) => match &source.data_source {
2683 DataSourceDesc::IngestionExport {
2684 ingestion_id,
2685 external_reference,
2686 details,
2687 data_config: _,
2688 } => Some((*ingestion_id, external_reference, details)),
2689 _ => None,
2690 },
2691 _ => None,
2692 }
2693 }
2694
2695 pub fn source_export_details(
2698 &self,
2699 ) -> Option<(
2700 CatalogItemId,
2701 &UnresolvedItemName,
2702 &SourceExportDetails,
2703 &SourceExportDataConfig<ReferencedConnection>,
2704 )> {
2705 match &self.item() {
2706 CatalogItem::Source(source) => match &source.data_source {
2707 DataSourceDesc::IngestionExport {
2708 ingestion_id,
2709 external_reference,
2710 details,
2711 data_config,
2712 } => Some((*ingestion_id, external_reference, details, data_config)),
2713 _ => None,
2714 },
2715 CatalogItem::Table(table) => match &table.data_source {
2716 TableDataSource::DataSource {
2717 desc:
2718 DataSourceDesc::IngestionExport {
2719 ingestion_id,
2720 external_reference,
2721 details,
2722 data_config,
2723 },
2724 timeline: _,
2725 } => Some((*ingestion_id, external_reference, details, data_config)),
2726 _ => None,
2727 },
2728 _ => None,
2729 }
2730 }
2731
2732 pub fn is_progress_source(&self) -> bool {
2734 self.item().is_progress_source()
2735 }
2736
2737 pub fn progress_id(&self) -> Option<CatalogItemId> {
2739 match &self.item() {
2740 CatalogItem::Source(source) => match &source.data_source {
2741 DataSourceDesc::Ingestion { .. } => Some(self.id),
2742 DataSourceDesc::OldSyntaxIngestion {
2743 progress_subsource, ..
2744 } => Some(*progress_subsource),
2745 DataSourceDesc::IngestionExport { .. }
2746 | DataSourceDesc::Introspection(_)
2747 | DataSourceDesc::Progress
2748 | DataSourceDesc::Webhook { .. } => None,
2749 },
2750 CatalogItem::Table(_)
2751 | CatalogItem::Log(_)
2752 | CatalogItem::View(_)
2753 | CatalogItem::MaterializedView(_)
2754 | CatalogItem::Sink(_)
2755 | CatalogItem::Index(_)
2756 | CatalogItem::Type(_)
2757 | CatalogItem::Func(_)
2758 | CatalogItem::Secret(_)
2759 | CatalogItem::Connection(_)
2760 | CatalogItem::ContinualTask(_) => None,
2761 }
2762 }
2763
2764 pub fn is_sink(&self) -> bool {
2766 matches!(self.item(), CatalogItem::Sink(_))
2767 }
2768
2769 pub fn is_materialized_view(&self) -> bool {
2771 matches!(self.item(), CatalogItem::MaterializedView(_))
2772 }
2773
2774 pub fn is_view(&self) -> bool {
2776 matches!(self.item(), CatalogItem::View(_))
2777 }
2778
2779 pub fn is_secret(&self) -> bool {
2781 matches!(self.item(), CatalogItem::Secret(_))
2782 }
2783
2784 pub fn is_introspection_source(&self) -> bool {
2786 matches!(self.item(), CatalogItem::Log(_))
2787 }
2788
2789 pub fn is_index(&self) -> bool {
2791 matches!(self.item(), CatalogItem::Index(_))
2792 }
2793
2794 pub fn is_continual_task(&self) -> bool {
2796 matches!(self.item(), CatalogItem::ContinualTask(_))
2797 }
2798
2799 pub fn is_relation(&self) -> bool {
2801 mz_sql::catalog::ObjectType::from(self.item_type()).is_relation()
2802 }
2803
2804 pub fn references(&self) -> &ResolvedIds {
2807 self.item.references()
2808 }
2809
2810 pub fn uses(&self) -> BTreeSet<CatalogItemId> {
2816 self.item.uses()
2817 }
2818
2819 pub fn item(&self) -> &CatalogItem {
2821 &self.item
2822 }
2823
2824 pub fn id(&self) -> CatalogItemId {
2826 self.id
2827 }
2828
2829 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2831 self.item().global_ids()
2832 }
2833
2834 pub fn latest_global_id(&self) -> GlobalId {
2835 self.item().latest_global_id()
2836 }
2837
2838 pub fn oid(&self) -> u32 {
2840 self.oid
2841 }
2842
2843 pub fn name(&self) -> &QualifiedItemName {
2845 &self.name
2846 }
2847
2848 pub fn referenced_by(&self) -> &[CatalogItemId] {
2850 &self.referenced_by
2851 }
2852
2853 pub fn used_by(&self) -> &[CatalogItemId] {
2855 &self.used_by
2856 }
2857
2858 pub fn conn_id(&self) -> Option<&ConnectionId> {
2861 self.item.conn_id()
2862 }
2863
2864 pub fn owner_id(&self) -> &RoleId {
2866 &self.owner_id
2867 }
2868
2869 pub fn privileges(&self) -> &PrivilegeMap {
2871 &self.privileges
2872 }
2873
2874 pub fn comment_object_id(&self) -> CommentObjectId {
2876 use CatalogItemType::*;
2877 match self.item_type() {
2878 Table => CommentObjectId::Table(self.id),
2879 Source => CommentObjectId::Source(self.id),
2880 Sink => CommentObjectId::Sink(self.id),
2881 View => CommentObjectId::View(self.id),
2882 MaterializedView => CommentObjectId::MaterializedView(self.id),
2883 Index => CommentObjectId::Index(self.id),
2884 Func => CommentObjectId::Func(self.id),
2885 Connection => CommentObjectId::Connection(self.id),
2886 Type => CommentObjectId::Type(self.id),
2887 Secret => CommentObjectId::Secret(self.id),
2888 ContinualTask => CommentObjectId::ContinualTask(self.id),
2889 }
2890 }
2891}
2892
2893#[derive(Debug, Clone, Default)]
2894pub struct CommentsMap {
2895 map: BTreeMap<CommentObjectId, BTreeMap<Option<usize>, String>>,
2896}
2897
2898impl CommentsMap {
2899 pub fn update_comment(
2900 &mut self,
2901 object_id: CommentObjectId,
2902 sub_component: Option<usize>,
2903 comment: Option<String>,
2904 ) -> Option<String> {
2905 let object_comments = self.map.entry(object_id).or_default();
2906
2907 let (empty, prev) = if let Some(comment) = comment {
2909 let prev = object_comments.insert(sub_component, comment);
2910 (false, prev)
2911 } else {
2912 let prev = object_comments.remove(&sub_component);
2913 (object_comments.is_empty(), prev)
2914 };
2915
2916 if empty {
2918 self.map.remove(&object_id);
2919 }
2920
2921 prev
2923 }
2924
2925 pub fn drop_comments(
2931 &mut self,
2932 object_ids: &BTreeSet<CommentObjectId>,
2933 ) -> Vec<(CommentObjectId, Option<usize>, String)> {
2934 let mut removed_comments = Vec::new();
2935
2936 for object_id in object_ids {
2937 if let Some(comments) = self.map.remove(object_id) {
2938 let removed = comments
2939 .into_iter()
2940 .map(|(sub_comp, comment)| (object_id.clone(), sub_comp, comment));
2941 removed_comments.extend(removed);
2942 }
2943 }
2944
2945 removed_comments
2946 }
2947
2948 pub fn iter(&self) -> impl Iterator<Item = (CommentObjectId, Option<usize>, &str)> {
2949 self.map
2950 .iter()
2951 .map(|(id, comments)| {
2952 comments
2953 .iter()
2954 .map(|(pos, comment)| (*id, *pos, comment.as_str()))
2955 })
2956 .flatten()
2957 }
2958
2959 pub fn get_object_comments(
2960 &self,
2961 object_id: CommentObjectId,
2962 ) -> Option<&BTreeMap<Option<usize>, String>> {
2963 self.map.get(&object_id)
2964 }
2965}
2966
2967impl Serialize for CommentsMap {
2968 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2969 where
2970 S: serde::Serializer,
2971 {
2972 let comment_count = self
2973 .map
2974 .iter()
2975 .map(|(_object_id, comments)| comments.len())
2976 .sum();
2977
2978 let mut seq = serializer.serialize_seq(Some(comment_count))?;
2979 for (object_id, sub) in &self.map {
2980 for (sub_component, comment) in sub {
2981 seq.serialize_element(&(
2982 format!("{object_id:?}"),
2983 format!("{sub_component:?}"),
2984 comment,
2985 ))?;
2986 }
2987 }
2988 seq.end()
2989 }
2990}
2991
2992#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
2993pub struct DefaultPrivileges {
2994 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
2995 privileges: BTreeMap<DefaultPrivilegeObject, RoleDefaultPrivileges>,
2996}
2997
2998#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
3001struct RoleDefaultPrivileges(
3002 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
3004 BTreeMap<RoleId, DefaultPrivilegeAclItem>,
3005);
3006
3007impl Deref for RoleDefaultPrivileges {
3008 type Target = BTreeMap<RoleId, DefaultPrivilegeAclItem>;
3009
3010 fn deref(&self) -> &Self::Target {
3011 &self.0
3012 }
3013}
3014
3015impl DerefMut for RoleDefaultPrivileges {
3016 fn deref_mut(&mut self) -> &mut Self::Target {
3017 &mut self.0
3018 }
3019}
3020
3021impl DefaultPrivileges {
3022 pub fn grant(&mut self, object: DefaultPrivilegeObject, privilege: DefaultPrivilegeAclItem) {
3024 if privilege.acl_mode.is_empty() {
3025 return;
3026 }
3027
3028 let privileges = self.privileges.entry(object).or_default();
3029 if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
3030 default_privilege.acl_mode |= privilege.acl_mode;
3031 } else {
3032 privileges.insert(privilege.grantee, privilege);
3033 }
3034 }
3035
3036 pub fn revoke(&mut self, object: &DefaultPrivilegeObject, privilege: &DefaultPrivilegeAclItem) {
3038 if let Some(privileges) = self.privileges.get_mut(object) {
3039 if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
3040 default_privilege.acl_mode =
3041 default_privilege.acl_mode.difference(privilege.acl_mode);
3042 if default_privilege.acl_mode.is_empty() {
3043 privileges.remove(&privilege.grantee);
3044 }
3045 }
3046 if privileges.is_empty() {
3047 self.privileges.remove(object);
3048 }
3049 }
3050 }
3051
3052 pub fn get_privileges_for_grantee(
3055 &self,
3056 object: &DefaultPrivilegeObject,
3057 grantee: &RoleId,
3058 ) -> Option<&AclMode> {
3059 self.privileges
3060 .get(object)
3061 .and_then(|privileges| privileges.get(grantee))
3062 .map(|privilege| &privilege.acl_mode)
3063 }
3064
3065 pub fn get_applicable_privileges(
3067 &self,
3068 role_id: RoleId,
3069 database_id: Option<DatabaseId>,
3070 schema_id: Option<SchemaId>,
3071 object_type: mz_sql::catalog::ObjectType,
3072 ) -> impl Iterator<Item = DefaultPrivilegeAclItem> + '_ {
3073 let privilege_object_type = if object_type.is_relation() {
3077 mz_sql::catalog::ObjectType::Table
3078 } else {
3079 object_type
3080 };
3081 let valid_acl_mode = rbac::all_object_privileges(SystemObjectType::Object(object_type));
3082
3083 [
3087 DefaultPrivilegeObject {
3088 role_id,
3089 database_id,
3090 schema_id,
3091 object_type: privilege_object_type,
3092 },
3093 DefaultPrivilegeObject {
3094 role_id,
3095 database_id,
3096 schema_id: None,
3097 object_type: privilege_object_type,
3098 },
3099 DefaultPrivilegeObject {
3100 role_id,
3101 database_id: None,
3102 schema_id: None,
3103 object_type: privilege_object_type,
3104 },
3105 DefaultPrivilegeObject {
3106 role_id: RoleId::Public,
3107 database_id,
3108 schema_id,
3109 object_type: privilege_object_type,
3110 },
3111 DefaultPrivilegeObject {
3112 role_id: RoleId::Public,
3113 database_id,
3114 schema_id: None,
3115 object_type: privilege_object_type,
3116 },
3117 DefaultPrivilegeObject {
3118 role_id: RoleId::Public,
3119 database_id: None,
3120 schema_id: None,
3121 object_type: privilege_object_type,
3122 },
3123 ]
3124 .into_iter()
3125 .filter_map(|object| self.privileges.get(&object))
3126 .flat_map(|acl_map| acl_map.values())
3127 .fold(
3129 BTreeMap::new(),
3130 |mut accum, DefaultPrivilegeAclItem { grantee, acl_mode }| {
3131 let accum_acl_mode = accum.entry(grantee).or_insert_with(AclMode::empty);
3132 *accum_acl_mode |= *acl_mode;
3133 accum
3134 },
3135 )
3136 .into_iter()
3137 .map(move |(grantee, acl_mode)| (grantee, acl_mode & valid_acl_mode))
3142 .filter(|(_, acl_mode)| !acl_mode.is_empty())
3144 .map(|(grantee, acl_mode)| DefaultPrivilegeAclItem {
3145 grantee: *grantee,
3146 acl_mode,
3147 })
3148 }
3149
3150 pub fn iter(
3151 &self,
3152 ) -> impl Iterator<
3153 Item = (
3154 &DefaultPrivilegeObject,
3155 impl Iterator<Item = &DefaultPrivilegeAclItem>,
3156 ),
3157 > {
3158 self.privileges
3159 .iter()
3160 .map(|(object, acl_map)| (object, acl_map.values()))
3161 }
3162}
3163
3164#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3165pub struct ClusterConfig {
3166 pub variant: ClusterVariant,
3167 pub workload_class: Option<String>,
3168}
3169
3170impl ClusterConfig {
3171 pub fn features(&self) -> Option<&OptimizerFeatureOverrides> {
3172 match &self.variant {
3173 ClusterVariant::Managed(managed) => Some(&managed.optimizer_feature_overrides),
3174 ClusterVariant::Unmanaged => None,
3175 }
3176 }
3177}
3178
3179impl From<ClusterConfig> for durable::ClusterConfig {
3180 fn from(config: ClusterConfig) -> Self {
3181 Self {
3182 variant: config.variant.into(),
3183 workload_class: config.workload_class,
3184 }
3185 }
3186}
3187
3188impl From<durable::ClusterConfig> for ClusterConfig {
3189 fn from(config: durable::ClusterConfig) -> Self {
3190 Self {
3191 variant: config.variant.into(),
3192 workload_class: config.workload_class,
3193 }
3194 }
3195}
3196
3197#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3198pub struct ClusterVariantManaged {
3199 pub size: String,
3200 pub availability_zones: Vec<String>,
3201 pub logging: ReplicaLogging,
3202 pub replication_factor: u32,
3203 pub optimizer_feature_overrides: OptimizerFeatureOverrides,
3204 pub schedule: ClusterSchedule,
3205}
3206
3207impl From<ClusterVariantManaged> for durable::ClusterVariantManaged {
3208 fn from(managed: ClusterVariantManaged) -> Self {
3209 Self {
3210 size: managed.size,
3211 availability_zones: managed.availability_zones,
3212 logging: managed.logging,
3213 replication_factor: managed.replication_factor,
3214 optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3215 schedule: managed.schedule,
3216 }
3217 }
3218}
3219
3220impl From<durable::ClusterVariantManaged> for ClusterVariantManaged {
3221 fn from(managed: durable::ClusterVariantManaged) -> Self {
3222 Self {
3223 size: managed.size,
3224 availability_zones: managed.availability_zones,
3225 logging: managed.logging,
3226 replication_factor: managed.replication_factor,
3227 optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3228 schedule: managed.schedule,
3229 }
3230 }
3231}
3232
3233#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3234pub enum ClusterVariant {
3235 Managed(ClusterVariantManaged),
3236 Unmanaged,
3237}
3238
3239impl From<ClusterVariant> for durable::ClusterVariant {
3240 fn from(variant: ClusterVariant) -> Self {
3241 match variant {
3242 ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3243 ClusterVariant::Unmanaged => Self::Unmanaged,
3244 }
3245 }
3246}
3247
3248impl From<durable::ClusterVariant> for ClusterVariant {
3249 fn from(variant: durable::ClusterVariant) -> Self {
3250 match variant {
3251 durable::ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3252 durable::ClusterVariant::Unmanaged => Self::Unmanaged,
3253 }
3254 }
3255}
3256
3257impl mz_sql::catalog::CatalogDatabase for Database {
3258 fn name(&self) -> &str {
3259 &self.name
3260 }
3261
3262 fn id(&self) -> DatabaseId {
3263 self.id
3264 }
3265
3266 fn has_schemas(&self) -> bool {
3267 !self.schemas_by_name.is_empty()
3268 }
3269
3270 fn schema_ids(&self) -> &BTreeMap<String, SchemaId> {
3271 &self.schemas_by_name
3272 }
3273
3274 #[allow(clippy::as_conversions)]
3276 fn schemas(&self) -> Vec<&dyn CatalogSchema> {
3277 self.schemas_by_id
3278 .values()
3279 .map(|schema| schema as &dyn CatalogSchema)
3280 .collect()
3281 }
3282
3283 fn owner_id(&self) -> RoleId {
3284 self.owner_id
3285 }
3286
3287 fn privileges(&self) -> &PrivilegeMap {
3288 &self.privileges
3289 }
3290}
3291
3292impl mz_sql::catalog::CatalogSchema for Schema {
3293 fn database(&self) -> &ResolvedDatabaseSpecifier {
3294 &self.name.database
3295 }
3296
3297 fn name(&self) -> &QualifiedSchemaName {
3298 &self.name
3299 }
3300
3301 fn id(&self) -> &SchemaSpecifier {
3302 &self.id
3303 }
3304
3305 fn has_items(&self) -> bool {
3306 !self.items.is_empty()
3307 }
3308
3309 fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_> {
3310 Box::new(
3311 self.items
3312 .values()
3313 .chain(self.functions.values())
3314 .chain(self.types.values())
3315 .copied(),
3316 )
3317 }
3318
3319 fn owner_id(&self) -> RoleId {
3320 self.owner_id
3321 }
3322
3323 fn privileges(&self) -> &PrivilegeMap {
3324 &self.privileges
3325 }
3326}
3327
3328impl mz_sql::catalog::CatalogRole for Role {
3329 fn name(&self) -> &str {
3330 &self.name
3331 }
3332
3333 fn id(&self) -> RoleId {
3334 self.id
3335 }
3336
3337 fn membership(&self) -> &BTreeMap<RoleId, RoleId> {
3338 &self.membership.map
3339 }
3340
3341 fn attributes(&self) -> &RoleAttributes {
3342 &self.attributes
3343 }
3344
3345 fn vars(&self) -> &BTreeMap<String, OwnedVarInput> {
3346 &self.vars.map
3347 }
3348}
3349
3350impl mz_sql::catalog::CatalogNetworkPolicy for NetworkPolicy {
3351 fn name(&self) -> &str {
3352 &self.name
3353 }
3354
3355 fn id(&self) -> NetworkPolicyId {
3356 self.id
3357 }
3358
3359 fn owner_id(&self) -> RoleId {
3360 self.owner_id
3361 }
3362
3363 fn privileges(&self) -> &PrivilegeMap {
3364 &self.privileges
3365 }
3366}
3367
3368impl mz_sql::catalog::CatalogCluster<'_> for Cluster {
3369 fn name(&self) -> &str {
3370 &self.name
3371 }
3372
3373 fn id(&self) -> ClusterId {
3374 self.id
3375 }
3376
3377 fn bound_objects(&self) -> &BTreeSet<CatalogItemId> {
3378 &self.bound_objects
3379 }
3380
3381 fn replica_ids(&self) -> &BTreeMap<String, ReplicaId> {
3382 &self.replica_id_by_name_
3383 }
3384
3385 #[allow(clippy::as_conversions)]
3387 fn replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>> {
3388 self.replicas()
3389 .map(|replica| replica as &dyn CatalogClusterReplica)
3390 .collect()
3391 }
3392
3393 fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica<'_> {
3394 self.replica(id).expect("catalog out of sync")
3395 }
3396
3397 fn owner_id(&self) -> RoleId {
3398 self.owner_id
3399 }
3400
3401 fn privileges(&self) -> &PrivilegeMap {
3402 &self.privileges
3403 }
3404
3405 fn is_managed(&self) -> bool {
3406 self.is_managed()
3407 }
3408
3409 fn managed_size(&self) -> Option<&str> {
3410 match &self.config.variant {
3411 ClusterVariant::Managed(ClusterVariantManaged { size, .. }) => Some(size),
3412 _ => None,
3413 }
3414 }
3415
3416 fn schedule(&self) -> Option<&ClusterSchedule> {
3417 match &self.config.variant {
3418 ClusterVariant::Managed(ClusterVariantManaged { schedule, .. }) => Some(schedule),
3419 _ => None,
3420 }
3421 }
3422
3423 fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
3424 self.try_to_plan()
3425 }
3426}
3427
3428impl mz_sql::catalog::CatalogClusterReplica<'_> for ClusterReplica {
3429 fn name(&self) -> &str {
3430 &self.name
3431 }
3432
3433 fn cluster_id(&self) -> ClusterId {
3434 self.cluster_id
3435 }
3436
3437 fn replica_id(&self) -> ReplicaId {
3438 self.replica_id
3439 }
3440
3441 fn owner_id(&self) -> RoleId {
3442 self.owner_id
3443 }
3444
3445 fn internal(&self) -> bool {
3446 self.config.location.internal()
3447 }
3448}
3449
3450impl mz_sql::catalog::CatalogItem for CatalogEntry {
3451 fn name(&self) -> &QualifiedItemName {
3452 self.name()
3453 }
3454
3455 fn id(&self) -> CatalogItemId {
3456 self.id()
3457 }
3458
3459 fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
3460 Box::new(self.global_ids())
3461 }
3462
3463 fn oid(&self) -> u32 {
3464 self.oid()
3465 }
3466
3467 fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
3468 self.func()
3469 }
3470
3471 fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
3472 self.source_desc()
3473 }
3474
3475 fn connection(
3476 &self,
3477 ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
3478 {
3479 Ok(self.connection()?.details.to_connection())
3480 }
3481
3482 fn create_sql(&self) -> &str {
3483 match self.item() {
3484 CatalogItem::Table(Table { create_sql, .. }) => {
3485 create_sql.as_deref().unwrap_or("<builtin>")
3486 }
3487 CatalogItem::Source(Source { create_sql, .. }) => {
3488 create_sql.as_deref().unwrap_or("<builtin>")
3489 }
3490 CatalogItem::Sink(Sink { create_sql, .. }) => create_sql,
3491 CatalogItem::View(View { create_sql, .. }) => create_sql,
3492 CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) => create_sql,
3493 CatalogItem::Index(Index { create_sql, .. }) => create_sql,
3494 CatalogItem::Type(Type { create_sql, .. }) => {
3495 create_sql.as_deref().unwrap_or("<builtin>")
3496 }
3497 CatalogItem::Secret(Secret { create_sql, .. }) => create_sql,
3498 CatalogItem::Connection(Connection { create_sql, .. }) => create_sql,
3499 CatalogItem::Func(_) => "<builtin>",
3500 CatalogItem::Log(_) => "<builtin>",
3501 CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => create_sql,
3502 }
3503 }
3504
3505 fn item_type(&self) -> SqlCatalogItemType {
3506 self.item().typ()
3507 }
3508
3509 fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
3510 if let CatalogItem::Index(Index { keys, on, .. }) = self.item() {
3511 Some((keys, *on))
3512 } else {
3513 None
3514 }
3515 }
3516
3517 fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
3518 if let CatalogItem::Table(Table {
3519 data_source: TableDataSource::TableWrites { defaults },
3520 ..
3521 }) = self.item()
3522 {
3523 Some(defaults.as_slice())
3524 } else {
3525 None
3526 }
3527 }
3528
3529 fn replacement_target(&self) -> Option<CatalogItemId> {
3530 if let CatalogItem::MaterializedView(mv) = self.item() {
3531 mv.replacement_target
3532 } else {
3533 None
3534 }
3535 }
3536
3537 fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
3538 if let CatalogItem::Type(Type { details, .. }) = self.item() {
3539 Some(details)
3540 } else {
3541 None
3542 }
3543 }
3544
3545 fn references(&self) -> &ResolvedIds {
3546 self.references()
3547 }
3548
3549 fn uses(&self) -> BTreeSet<CatalogItemId> {
3550 self.uses()
3551 }
3552
3553 fn referenced_by(&self) -> &[CatalogItemId] {
3554 self.referenced_by()
3555 }
3556
3557 fn used_by(&self) -> &[CatalogItemId] {
3558 self.used_by()
3559 }
3560
3561 fn subsource_details(
3562 &self,
3563 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
3564 self.subsource_details()
3565 }
3566
3567 fn source_export_details(
3568 &self,
3569 ) -> Option<(
3570 CatalogItemId,
3571 &UnresolvedItemName,
3572 &SourceExportDetails,
3573 &SourceExportDataConfig<ReferencedConnection>,
3574 )> {
3575 self.source_export_details()
3576 }
3577
3578 fn is_progress_source(&self) -> bool {
3579 self.is_progress_source()
3580 }
3581
3582 fn progress_id(&self) -> Option<CatalogItemId> {
3583 self.progress_id()
3584 }
3585
3586 fn owner_id(&self) -> RoleId {
3587 self.owner_id
3588 }
3589
3590 fn privileges(&self) -> &PrivilegeMap {
3591 &self.privileges
3592 }
3593
3594 fn cluster_id(&self) -> Option<ClusterId> {
3595 self.item().cluster_id()
3596 }
3597
3598 fn at_version(
3599 &self,
3600 version: RelationVersionSelector,
3601 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
3602 Box::new(CatalogCollectionEntry {
3603 entry: self.clone(),
3604 version,
3605 })
3606 }
3607
3608 fn latest_version(&self) -> Option<RelationVersion> {
3609 self.table().map(|t| t.desc.latest_version())
3610 }
3611}
3612
3613#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3615pub struct StateUpdate {
3616 pub kind: StateUpdateKind,
3617 pub ts: Timestamp,
3618 pub diff: StateDiff,
3619}
3620
3621#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3625pub enum StateUpdateKind {
3626 Role(durable::objects::Role),
3627 RoleAuth(durable::objects::RoleAuth),
3628 Database(durable::objects::Database),
3629 Schema(durable::objects::Schema),
3630 DefaultPrivilege(durable::objects::DefaultPrivilege),
3631 SystemPrivilege(MzAclItem),
3632 SystemConfiguration(durable::objects::SystemConfiguration),
3633 Cluster(durable::objects::Cluster),
3634 NetworkPolicy(durable::objects::NetworkPolicy),
3635 IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3636 ClusterReplica(durable::objects::ClusterReplica),
3637 SourceReferences(durable::objects::SourceReferences),
3638 SystemObjectMapping(durable::objects::SystemObjectMapping),
3639 TemporaryItem(TemporaryItem),
3643 Item(durable::objects::Item),
3644 Comment(durable::objects::Comment),
3645 AuditLog(durable::objects::AuditLog),
3646 StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3648 UnfinalizedShard(durable::objects::UnfinalizedShard),
3649}
3650
3651#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
3653pub enum StateDiff {
3654 Retraction,
3655 Addition,
3656}
3657
3658impl From<StateDiff> for Diff {
3659 fn from(diff: StateDiff) -> Self {
3660 match diff {
3661 StateDiff::Retraction => Diff::MINUS_ONE,
3662 StateDiff::Addition => Diff::ONE,
3663 }
3664 }
3665}
3666impl TryFrom<Diff> for StateDiff {
3667 type Error = String;
3668
3669 fn try_from(diff: Diff) -> Result<Self, Self::Error> {
3670 match diff {
3671 Diff::MINUS_ONE => Ok(Self::Retraction),
3672 Diff::ONE => Ok(Self::Addition),
3673 diff => Err(format!("invalid diff {diff}")),
3674 }
3675 }
3676}
3677
3678#[derive(Debug, Clone, Ord, PartialOrd, PartialEq, Eq)]
3680pub struct TemporaryItem {
3681 pub id: CatalogItemId,
3682 pub oid: u32,
3683 pub global_id: GlobalId,
3684 pub schema_id: SchemaId,
3685 pub name: String,
3686 pub conn_id: Option<ConnectionId>,
3687 pub create_sql: String,
3688 pub owner_id: RoleId,
3689 pub privileges: Vec<MzAclItem>,
3690 pub extra_versions: BTreeMap<RelationVersion, GlobalId>,
3691}
3692
3693impl From<CatalogEntry> for TemporaryItem {
3694 fn from(entry: CatalogEntry) -> Self {
3695 let conn_id = entry.conn_id().cloned();
3696 let (create_sql, global_id, extra_versions) = entry.item.to_serialized();
3697
3698 TemporaryItem {
3699 id: entry.id,
3700 oid: entry.oid,
3701 global_id,
3702 schema_id: entry.name.qualifiers.schema_spec.into(),
3703 name: entry.name.item,
3704 conn_id,
3705 create_sql,
3706 owner_id: entry.owner_id,
3707 privileges: entry.privileges.into_all_values().collect(),
3708 extra_versions,
3709 }
3710 }
3711}
3712
3713impl TemporaryItem {
3714 pub fn item_type(&self) -> CatalogItemType {
3715 item_type(&self.create_sql)
3716 }
3717}
3718
3719#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
3721pub enum BootstrapStateUpdateKind {
3722 Role(durable::objects::Role),
3723 RoleAuth(durable::objects::RoleAuth),
3724 Database(durable::objects::Database),
3725 Schema(durable::objects::Schema),
3726 DefaultPrivilege(durable::objects::DefaultPrivilege),
3727 SystemPrivilege(MzAclItem),
3728 SystemConfiguration(durable::objects::SystemConfiguration),
3729 Cluster(durable::objects::Cluster),
3730 NetworkPolicy(durable::objects::NetworkPolicy),
3731 IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3732 ClusterReplica(durable::objects::ClusterReplica),
3733 SourceReferences(durable::objects::SourceReferences),
3734 SystemObjectMapping(durable::objects::SystemObjectMapping),
3735 Item(durable::objects::Item),
3736 Comment(durable::objects::Comment),
3737 AuditLog(durable::objects::AuditLog),
3738 StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3740 UnfinalizedShard(durable::objects::UnfinalizedShard),
3741}
3742
3743impl From<BootstrapStateUpdateKind> for StateUpdateKind {
3744 fn from(value: BootstrapStateUpdateKind) -> Self {
3745 match value {
3746 BootstrapStateUpdateKind::Role(kind) => StateUpdateKind::Role(kind),
3747 BootstrapStateUpdateKind::RoleAuth(kind) => StateUpdateKind::RoleAuth(kind),
3748 BootstrapStateUpdateKind::Database(kind) => StateUpdateKind::Database(kind),
3749 BootstrapStateUpdateKind::Schema(kind) => StateUpdateKind::Schema(kind),
3750 BootstrapStateUpdateKind::DefaultPrivilege(kind) => {
3751 StateUpdateKind::DefaultPrivilege(kind)
3752 }
3753 BootstrapStateUpdateKind::SystemPrivilege(kind) => {
3754 StateUpdateKind::SystemPrivilege(kind)
3755 }
3756 BootstrapStateUpdateKind::SystemConfiguration(kind) => {
3757 StateUpdateKind::SystemConfiguration(kind)
3758 }
3759 BootstrapStateUpdateKind::SourceReferences(kind) => {
3760 StateUpdateKind::SourceReferences(kind)
3761 }
3762 BootstrapStateUpdateKind::Cluster(kind) => StateUpdateKind::Cluster(kind),
3763 BootstrapStateUpdateKind::NetworkPolicy(kind) => StateUpdateKind::NetworkPolicy(kind),
3764 BootstrapStateUpdateKind::IntrospectionSourceIndex(kind) => {
3765 StateUpdateKind::IntrospectionSourceIndex(kind)
3766 }
3767 BootstrapStateUpdateKind::ClusterReplica(kind) => StateUpdateKind::ClusterReplica(kind),
3768 BootstrapStateUpdateKind::SystemObjectMapping(kind) => {
3769 StateUpdateKind::SystemObjectMapping(kind)
3770 }
3771 BootstrapStateUpdateKind::Item(kind) => StateUpdateKind::Item(kind),
3772 BootstrapStateUpdateKind::Comment(kind) => StateUpdateKind::Comment(kind),
3773 BootstrapStateUpdateKind::AuditLog(kind) => StateUpdateKind::AuditLog(kind),
3774 BootstrapStateUpdateKind::StorageCollectionMetadata(kind) => {
3775 StateUpdateKind::StorageCollectionMetadata(kind)
3776 }
3777 BootstrapStateUpdateKind::UnfinalizedShard(kind) => {
3778 StateUpdateKind::UnfinalizedShard(kind)
3779 }
3780 }
3781 }
3782}
3783
3784impl TryFrom<StateUpdateKind> for BootstrapStateUpdateKind {
3785 type Error = TemporaryItem;
3786
3787 fn try_from(value: StateUpdateKind) -> Result<Self, Self::Error> {
3788 match value {
3789 StateUpdateKind::Role(kind) => Ok(BootstrapStateUpdateKind::Role(kind)),
3790 StateUpdateKind::RoleAuth(kind) => Ok(BootstrapStateUpdateKind::RoleAuth(kind)),
3791 StateUpdateKind::Database(kind) => Ok(BootstrapStateUpdateKind::Database(kind)),
3792 StateUpdateKind::Schema(kind) => Ok(BootstrapStateUpdateKind::Schema(kind)),
3793 StateUpdateKind::DefaultPrivilege(kind) => {
3794 Ok(BootstrapStateUpdateKind::DefaultPrivilege(kind))
3795 }
3796 StateUpdateKind::SystemPrivilege(kind) => {
3797 Ok(BootstrapStateUpdateKind::SystemPrivilege(kind))
3798 }
3799 StateUpdateKind::SystemConfiguration(kind) => {
3800 Ok(BootstrapStateUpdateKind::SystemConfiguration(kind))
3801 }
3802 StateUpdateKind::Cluster(kind) => Ok(BootstrapStateUpdateKind::Cluster(kind)),
3803 StateUpdateKind::NetworkPolicy(kind) => {
3804 Ok(BootstrapStateUpdateKind::NetworkPolicy(kind))
3805 }
3806 StateUpdateKind::IntrospectionSourceIndex(kind) => {
3807 Ok(BootstrapStateUpdateKind::IntrospectionSourceIndex(kind))
3808 }
3809 StateUpdateKind::ClusterReplica(kind) => {
3810 Ok(BootstrapStateUpdateKind::ClusterReplica(kind))
3811 }
3812 StateUpdateKind::SourceReferences(kind) => {
3813 Ok(BootstrapStateUpdateKind::SourceReferences(kind))
3814 }
3815 StateUpdateKind::SystemObjectMapping(kind) => {
3816 Ok(BootstrapStateUpdateKind::SystemObjectMapping(kind))
3817 }
3818 StateUpdateKind::TemporaryItem(kind) => Err(kind),
3819 StateUpdateKind::Item(kind) => Ok(BootstrapStateUpdateKind::Item(kind)),
3820 StateUpdateKind::Comment(kind) => Ok(BootstrapStateUpdateKind::Comment(kind)),
3821 StateUpdateKind::AuditLog(kind) => Ok(BootstrapStateUpdateKind::AuditLog(kind)),
3822 StateUpdateKind::StorageCollectionMetadata(kind) => {
3823 Ok(BootstrapStateUpdateKind::StorageCollectionMetadata(kind))
3824 }
3825 StateUpdateKind::UnfinalizedShard(kind) => {
3826 Ok(BootstrapStateUpdateKind::UnfinalizedShard(kind))
3827 }
3828 }
3829 }
3830}