1use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use chrono::{DateTime, Utc};
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_compute_client::logging::LogVariant;
24use mz_compute_types::dataflows::DataflowDescription;
25use mz_compute_types::plan::Plan as ComputePlan;
26use mz_controller::clusters::{ClusterRole, ClusterStatus, ReplicaConfig, ReplicaLogging};
27use mz_controller_types::{ClusterId, ReplicaId};
28use mz_expr::{MirScalarExpr, OptimizedMirRelationExpr};
29use mz_ore::collections::CollectionExt;
30use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
31use mz_repr::network_policy_id::NetworkPolicyId;
32use mz_repr::optimize::OptimizerFeatureOverrides;
33use mz_repr::refresh_schedule::RefreshSchedule;
34use mz_repr::role_id::RoleId;
35use mz_repr::{
36 CatalogItemId, ColumnName, Diff, GlobalId, RelationDesc, RelationVersion,
37 RelationVersionSelector, SqlColumnType, Timestamp, VersionedRelationDesc,
38};
39use mz_sql::ast::display::AstDisplay;
40use mz_sql::ast::{
41 ColumnDef, ColumnOption, ColumnOptionDef, ColumnVersioned, Expr, Raw, RawDataType, Statement,
42 UnresolvedItemName, Value, WithOptionValue,
43};
44use mz_sql::catalog::{
45 CatalogClusterReplica, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem,
46 CatalogItemType as SqlCatalogItemType, CatalogItemType, CatalogSchema, CatalogType,
47 CatalogTypeDetails, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference,
48 RoleAttributes, RoleMembership, RoleVars, SystemObjectType,
49};
50use mz_sql::names::{
51 Aug, CommentObjectId, DatabaseId, DependencyIds, FullItemName, QualifiedItemName,
52 QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier,
53};
54use mz_sql::plan::{
55 ClusterSchedule, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, ConnectionDetails,
56 CreateClusterManagedPlan, CreateClusterPlan, CreateClusterVariant, CreateSourcePlan,
57 HirRelationExpr, NetworkPolicyRule, PlanError, WebhookBodyFormat, WebhookHeaders,
58 WebhookValidation,
59};
60use mz_sql::rbac;
61use mz_sql::session::vars::OwnedVarInput;
62use mz_storage_client::controller::IntrospectionType;
63use mz_storage_types::connections::inline::ReferencedConnection;
64use mz_storage_types::sinks::{SinkEnvelope, StorageSinkConnection};
65use mz_storage_types::sources::load_generator::LoadGenerator;
66use mz_storage_types::sources::{
67 GenericSourceConnection, SourceConnection, SourceDesc, SourceEnvelope, SourceExportDataConfig,
68 SourceExportDetails, Timeline,
69};
70use mz_transform::dataflow::DataflowMetainfo;
71use mz_transform::notice::OptimizerNotice;
72use serde::ser::SerializeSeq;
73use serde::{Deserialize, Serialize};
74use timely::progress::Antichain;
75use tracing::debug;
76
77use crate::builtin::{MZ_CATALOG_SERVER_CLUSTER, MZ_SYSTEM_CLUSTER};
78use crate::durable;
79use crate::durable::objects::item_type;
80
81pub trait UpdateFrom<T>: From<T> {
83 fn update_from(&mut self, from: T);
84}
85
86#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
87pub struct Database {
88 pub name: String,
89 pub id: DatabaseId,
90 pub oid: u32,
91 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
92 pub schemas_by_id: BTreeMap<SchemaId, Schema>,
93 pub schemas_by_name: BTreeMap<String, SchemaId>,
94 pub owner_id: RoleId,
95 pub privileges: PrivilegeMap,
96}
97
98impl From<Database> for durable::Database {
99 fn from(database: Database) -> durable::Database {
100 durable::Database {
101 id: database.id,
102 oid: database.oid,
103 name: database.name,
104 owner_id: database.owner_id,
105 privileges: database.privileges.into_all_values().collect(),
106 }
107 }
108}
109
110impl From<durable::Database> for Database {
111 fn from(
112 durable::Database {
113 id,
114 oid,
115 name,
116 owner_id,
117 privileges,
118 }: durable::Database,
119 ) -> Database {
120 Database {
121 id,
122 oid,
123 schemas_by_id: BTreeMap::new(),
124 schemas_by_name: BTreeMap::new(),
125 name,
126 owner_id,
127 privileges: PrivilegeMap::from_mz_acl_items(privileges),
128 }
129 }
130}
131
132impl UpdateFrom<durable::Database> for Database {
133 fn update_from(
134 &mut self,
135 durable::Database {
136 id,
137 oid,
138 name,
139 owner_id,
140 privileges,
141 }: durable::Database,
142 ) {
143 self.id = id;
144 self.oid = oid;
145 self.name = name;
146 self.owner_id = owner_id;
147 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
148 }
149}
150
151#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
152pub struct Schema {
153 pub name: QualifiedSchemaName,
154 pub id: SchemaSpecifier,
155 pub oid: u32,
156 pub items: BTreeMap<String, CatalogItemId>,
157 pub functions: BTreeMap<String, CatalogItemId>,
158 pub types: BTreeMap<String, CatalogItemId>,
159 pub owner_id: RoleId,
160 pub privileges: PrivilegeMap,
161}
162
163impl From<Schema> for durable::Schema {
164 fn from(schema: Schema) -> durable::Schema {
165 durable::Schema {
166 id: schema.id.into(),
167 oid: schema.oid,
168 name: schema.name.schema,
169 database_id: schema.name.database.id(),
170 owner_id: schema.owner_id,
171 privileges: schema.privileges.into_all_values().collect(),
172 }
173 }
174}
175
176impl From<durable::Schema> for Schema {
177 fn from(
178 durable::Schema {
179 id,
180 oid,
181 name,
182 database_id,
183 owner_id,
184 privileges,
185 }: durable::Schema,
186 ) -> Schema {
187 Schema {
188 name: QualifiedSchemaName {
189 database: database_id.into(),
190 schema: name,
191 },
192 id: id.into(),
193 oid,
194 items: BTreeMap::new(),
195 functions: BTreeMap::new(),
196 types: BTreeMap::new(),
197 owner_id,
198 privileges: PrivilegeMap::from_mz_acl_items(privileges),
199 }
200 }
201}
202
203impl UpdateFrom<durable::Schema> for Schema {
204 fn update_from(
205 &mut self,
206 durable::Schema {
207 id,
208 oid,
209 name,
210 database_id,
211 owner_id,
212 privileges,
213 }: durable::Schema,
214 ) {
215 self.name = QualifiedSchemaName {
216 database: database_id.into(),
217 schema: name,
218 };
219 self.id = id.into();
220 self.oid = oid;
221 self.owner_id = owner_id;
222 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
223 }
224}
225
226#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
227pub struct Role {
228 pub name: String,
229 pub id: RoleId,
230 pub oid: u32,
231 pub attributes: RoleAttributes,
232 pub membership: RoleMembership,
233 pub vars: RoleVars,
234}
235
236impl Role {
237 pub fn is_user(&self) -> bool {
238 self.id.is_user()
239 }
240
241 pub fn vars<'a>(&'a self) -> impl Iterator<Item = (&'a str, &'a OwnedVarInput)> {
242 self.vars.map.iter().map(|(name, val)| (name.as_str(), val))
243 }
244}
245
246impl From<Role> for durable::Role {
247 fn from(role: Role) -> durable::Role {
248 durable::Role {
249 id: role.id,
250 oid: role.oid,
251 name: role.name,
252 attributes: role.attributes,
253 membership: role.membership,
254 vars: role.vars,
255 }
256 }
257}
258
259impl From<durable::Role> for Role {
260 fn from(
261 durable::Role {
262 id,
263 oid,
264 name,
265 attributes,
266 membership,
267 vars,
268 }: durable::Role,
269 ) -> Self {
270 Role {
271 name,
272 id,
273 oid,
274 attributes,
275 membership,
276 vars,
277 }
278 }
279}
280
281impl UpdateFrom<durable::Role> for Role {
282 fn update_from(
283 &mut self,
284 durable::Role {
285 id,
286 oid,
287 name,
288 attributes,
289 membership,
290 vars,
291 }: durable::Role,
292 ) {
293 self.id = id;
294 self.oid = oid;
295 self.name = name;
296 self.attributes = attributes;
297 self.membership = membership;
298 self.vars = vars;
299 }
300}
301
302#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
303pub struct RoleAuth {
304 pub role_id: RoleId,
305 pub password_hash: Option<String>,
306 pub updated_at: u64,
307}
308
309impl From<RoleAuth> for durable::RoleAuth {
310 fn from(role_auth: RoleAuth) -> durable::RoleAuth {
311 durable::RoleAuth {
312 role_id: role_auth.role_id,
313 password_hash: role_auth.password_hash,
314 updated_at: role_auth.updated_at,
315 }
316 }
317}
318
319impl From<durable::RoleAuth> for RoleAuth {
320 fn from(
321 durable::RoleAuth {
322 role_id,
323 password_hash,
324 updated_at,
325 }: durable::RoleAuth,
326 ) -> RoleAuth {
327 RoleAuth {
328 role_id,
329 password_hash,
330 updated_at,
331 }
332 }
333}
334
335impl UpdateFrom<durable::RoleAuth> for RoleAuth {
336 fn update_from(&mut self, from: durable::RoleAuth) {
337 self.role_id = from.role_id;
338 self.password_hash = from.password_hash;
339 self.updated_at = from.updated_at;
340 }
341}
342
343#[derive(Debug, Serialize, Clone, PartialEq)]
344pub struct Cluster {
345 pub name: String,
346 pub id: ClusterId,
347 pub config: ClusterConfig,
348 #[serde(skip)]
349 pub log_indexes: BTreeMap<LogVariant, GlobalId>,
350 pub bound_objects: BTreeSet<CatalogItemId>,
353 pub replica_id_by_name_: BTreeMap<String, ReplicaId>,
354 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
355 pub replicas_by_id_: BTreeMap<ReplicaId, ClusterReplica>,
356 pub owner_id: RoleId,
357 pub privileges: PrivilegeMap,
358}
359
360impl Cluster {
361 pub fn role(&self) -> ClusterRole {
363 if self.name == MZ_SYSTEM_CLUSTER.name {
366 ClusterRole::SystemCritical
367 } else if self.name == MZ_CATALOG_SERVER_CLUSTER.name {
368 ClusterRole::System
369 } else {
370 ClusterRole::User
371 }
372 }
373
374 pub fn is_managed(&self) -> bool {
376 matches!(self.config.variant, ClusterVariant::Managed { .. })
377 }
378
379 pub fn user_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
381 self.replicas().filter(|r| !r.config.location.internal())
382 }
383
384 pub fn replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
386 self.replicas_by_id_.values()
387 }
388
389 pub fn replica(&self, replica_id: ReplicaId) -> Option<&ClusterReplica> {
391 self.replicas_by_id_.get(&replica_id)
392 }
393
394 pub fn replica_id(&self, name: &str) -> Option<ReplicaId> {
396 self.replica_id_by_name_.get(name).copied()
397 }
398
399 pub fn availability_zones(&self) -> Option<&[String]> {
401 match &self.config.variant {
402 ClusterVariant::Managed(managed) => Some(&managed.availability_zones),
403 ClusterVariant::Unmanaged => None,
404 }
405 }
406
407 pub fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
408 let name = self.name.clone();
409 let variant = match &self.config.variant {
410 ClusterVariant::Managed(ClusterVariantManaged {
411 size,
412 availability_zones,
413 logging,
414 replication_factor,
415 optimizer_feature_overrides,
416 schedule,
417 }) => {
418 let introspection = match logging {
419 ReplicaLogging {
420 log_logging,
421 interval: Some(interval),
422 } => Some(ComputeReplicaIntrospectionConfig {
423 debugging: *log_logging,
424 interval: interval.clone(),
425 }),
426 ReplicaLogging {
427 log_logging: _,
428 interval: None,
429 } => None,
430 };
431 let compute = ComputeReplicaConfig { introspection };
432 CreateClusterVariant::Managed(CreateClusterManagedPlan {
433 replication_factor: replication_factor.clone(),
434 size: size.clone(),
435 availability_zones: availability_zones.clone(),
436 compute,
437 optimizer_feature_overrides: optimizer_feature_overrides.clone(),
438 schedule: schedule.clone(),
439 })
440 }
441 ClusterVariant::Unmanaged => {
442 return Err(PlanError::Unsupported {
445 feature: "SHOW CREATE for unmanaged clusters".to_string(),
446 discussion_no: None,
447 });
448 }
449 };
450 let workload_class = self.config.workload_class.clone();
451 Ok(CreateClusterPlan {
452 name,
453 variant,
454 workload_class,
455 })
456 }
457}
458
459impl From<Cluster> for durable::Cluster {
460 fn from(cluster: Cluster) -> durable::Cluster {
461 durable::Cluster {
462 id: cluster.id,
463 name: cluster.name,
464 owner_id: cluster.owner_id,
465 privileges: cluster.privileges.into_all_values().collect(),
466 config: cluster.config.into(),
467 }
468 }
469}
470
471impl From<durable::Cluster> for Cluster {
472 fn from(
473 durable::Cluster {
474 id,
475 name,
476 owner_id,
477 privileges,
478 config,
479 }: durable::Cluster,
480 ) -> Self {
481 Cluster {
482 name: name.clone(),
483 id,
484 bound_objects: BTreeSet::new(),
485 log_indexes: BTreeMap::new(),
486 replica_id_by_name_: BTreeMap::new(),
487 replicas_by_id_: BTreeMap::new(),
488 owner_id,
489 privileges: PrivilegeMap::from_mz_acl_items(privileges),
490 config: config.into(),
491 }
492 }
493}
494
495impl UpdateFrom<durable::Cluster> for Cluster {
496 fn update_from(
497 &mut self,
498 durable::Cluster {
499 id,
500 name,
501 owner_id,
502 privileges,
503 config,
504 }: durable::Cluster,
505 ) {
506 self.id = id;
507 self.name = name;
508 self.owner_id = owner_id;
509 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
510 self.config = config.into();
511 }
512}
513
514#[derive(Debug, Serialize, Clone, PartialEq)]
515pub struct ClusterReplica {
516 pub name: String,
517 pub cluster_id: ClusterId,
518 pub replica_id: ReplicaId,
519 pub config: ReplicaConfig,
520 pub owner_id: RoleId,
521}
522
523impl From<ClusterReplica> for durable::ClusterReplica {
524 fn from(replica: ClusterReplica) -> durable::ClusterReplica {
525 durable::ClusterReplica {
526 cluster_id: replica.cluster_id,
527 replica_id: replica.replica_id,
528 name: replica.name,
529 config: replica.config.into(),
530 owner_id: replica.owner_id,
531 }
532 }
533}
534
535#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
536pub struct ClusterReplicaProcessStatus {
537 pub status: ClusterStatus,
538 pub time: DateTime<Utc>,
539}
540
541#[derive(Debug, Serialize, Clone, PartialEq)]
542pub struct SourceReferences {
543 pub updated_at: u64,
544 pub references: Vec<SourceReference>,
545}
546
547#[derive(Debug, Serialize, Clone, PartialEq)]
548pub struct SourceReference {
549 pub name: String,
550 pub namespace: Option<String>,
551 pub columns: Vec<String>,
552}
553
554impl From<SourceReference> for durable::SourceReference {
555 fn from(source_reference: SourceReference) -> durable::SourceReference {
556 durable::SourceReference {
557 name: source_reference.name,
558 namespace: source_reference.namespace,
559 columns: source_reference.columns,
560 }
561 }
562}
563
564impl SourceReferences {
565 pub fn to_durable(self, source_id: CatalogItemId) -> durable::SourceReferences {
566 durable::SourceReferences {
567 source_id,
568 updated_at: self.updated_at,
569 references: self.references.into_iter().map(Into::into).collect(),
570 }
571 }
572}
573
574impl From<durable::SourceReference> for SourceReference {
575 fn from(source_reference: durable::SourceReference) -> SourceReference {
576 SourceReference {
577 name: source_reference.name,
578 namespace: source_reference.namespace,
579 columns: source_reference.columns,
580 }
581 }
582}
583
584impl From<durable::SourceReferences> for SourceReferences {
585 fn from(source_references: durable::SourceReferences) -> SourceReferences {
586 SourceReferences {
587 updated_at: source_references.updated_at,
588 references: source_references
589 .references
590 .into_iter()
591 .map(|source_reference| source_reference.into())
592 .collect(),
593 }
594 }
595}
596
597impl From<mz_sql::plan::SourceReference> for SourceReference {
598 fn from(source_reference: mz_sql::plan::SourceReference) -> SourceReference {
599 SourceReference {
600 name: source_reference.name,
601 namespace: source_reference.namespace,
602 columns: source_reference.columns,
603 }
604 }
605}
606
607impl From<mz_sql::plan::SourceReferences> for SourceReferences {
608 fn from(source_references: mz_sql::plan::SourceReferences) -> SourceReferences {
609 SourceReferences {
610 updated_at: source_references.updated_at,
611 references: source_references
612 .references
613 .into_iter()
614 .map(|source_reference| source_reference.into())
615 .collect(),
616 }
617 }
618}
619
620impl From<SourceReferences> for mz_sql::plan::SourceReferences {
621 fn from(source_references: SourceReferences) -> mz_sql::plan::SourceReferences {
622 mz_sql::plan::SourceReferences {
623 updated_at: source_references.updated_at,
624 references: source_references
625 .references
626 .into_iter()
627 .map(|source_reference| source_reference.into())
628 .collect(),
629 }
630 }
631}
632
633impl From<SourceReference> for mz_sql::plan::SourceReference {
634 fn from(source_reference: SourceReference) -> mz_sql::plan::SourceReference {
635 mz_sql::plan::SourceReference {
636 name: source_reference.name,
637 namespace: source_reference.namespace,
638 columns: source_reference.columns,
639 }
640 }
641}
642
643#[derive(Clone, Debug, Serialize)]
644pub struct CatalogEntry {
645 pub item: CatalogItem,
646 #[serde(skip)]
647 pub referenced_by: Vec<CatalogItemId>,
648 #[serde(skip)]
652 pub used_by: Vec<CatalogItemId>,
653 pub id: CatalogItemId,
654 pub oid: u32,
655 pub name: QualifiedItemName,
656 pub owner_id: RoleId,
657 pub privileges: PrivilegeMap,
658}
659
660#[derive(Clone, Debug)]
675pub struct CatalogCollectionEntry {
676 pub entry: CatalogEntry,
677 pub version: RelationVersionSelector,
678}
679
680impl CatalogCollectionEntry {
681 pub fn relation_desc(&self) -> Option<Cow<'_, RelationDesc>> {
682 self.item().relation_desc(self.version)
683 }
684}
685
686impl mz_sql::catalog::CatalogCollectionItem for CatalogCollectionEntry {
687 fn relation_desc(&self) -> Option<Cow<'_, RelationDesc>> {
688 self.item().relation_desc(self.version)
689 }
690
691 fn global_id(&self) -> GlobalId {
692 self.entry
693 .item()
694 .global_id_for_version(self.version)
695 .expect("catalog corruption, missing version!")
696 }
697}
698
699impl Deref for CatalogCollectionEntry {
700 type Target = CatalogEntry;
701
702 fn deref(&self) -> &CatalogEntry {
703 &self.entry
704 }
705}
706
707impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry {
708 fn name(&self) -> &QualifiedItemName {
709 self.entry.name()
710 }
711
712 fn id(&self) -> CatalogItemId {
713 self.entry.id()
714 }
715
716 fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
717 Box::new(self.entry.global_ids())
718 }
719
720 fn oid(&self) -> u32 {
721 self.entry.oid()
722 }
723
724 fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
725 self.entry.func()
726 }
727
728 fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
729 self.entry.source_desc()
730 }
731
732 fn connection(
733 &self,
734 ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
735 {
736 mz_sql::catalog::CatalogItem::connection(&self.entry)
737 }
738
739 fn create_sql(&self) -> &str {
740 self.entry.create_sql()
741 }
742
743 fn item_type(&self) -> SqlCatalogItemType {
744 self.entry.item_type()
745 }
746
747 fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
748 self.entry.index_details()
749 }
750
751 fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
752 self.entry.writable_table_details()
753 }
754
755 fn replacement_target(&self) -> Option<CatalogItemId> {
756 self.entry.replacement_target()
757 }
758
759 fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
760 self.entry.type_details()
761 }
762
763 fn references(&self) -> &ResolvedIds {
764 self.entry.references()
765 }
766
767 fn uses(&self) -> BTreeSet<CatalogItemId> {
768 self.entry.uses()
769 }
770
771 fn referenced_by(&self) -> &[CatalogItemId] {
772 self.entry.referenced_by()
773 }
774
775 fn used_by(&self) -> &[CatalogItemId] {
776 self.entry.used_by()
777 }
778
779 fn subsource_details(
780 &self,
781 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
782 self.entry.subsource_details()
783 }
784
785 fn source_export_details(
786 &self,
787 ) -> Option<(
788 CatalogItemId,
789 &UnresolvedItemName,
790 &SourceExportDetails,
791 &SourceExportDataConfig<ReferencedConnection>,
792 )> {
793 self.entry.source_export_details()
794 }
795
796 fn is_progress_source(&self) -> bool {
797 self.entry.is_progress_source()
798 }
799
800 fn progress_id(&self) -> Option<CatalogItemId> {
801 self.entry.progress_id()
802 }
803
804 fn owner_id(&self) -> RoleId {
805 *self.entry.owner_id()
806 }
807
808 fn privileges(&self) -> &PrivilegeMap {
809 self.entry.privileges()
810 }
811
812 fn cluster_id(&self) -> Option<ClusterId> {
813 self.entry.item().cluster_id()
814 }
815
816 fn at_version(
817 &self,
818 version: RelationVersionSelector,
819 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
820 Box::new(CatalogCollectionEntry {
821 entry: self.entry.clone(),
822 version,
823 })
824 }
825
826 fn latest_version(&self) -> Option<RelationVersion> {
827 self.entry.latest_version()
828 }
829}
830
831#[derive(Debug, Clone, Serialize)]
832pub enum CatalogItem {
833 Table(Table),
834 Source(Source),
835 Log(Log),
836 View(View),
837 MaterializedView(MaterializedView),
838 Sink(Sink),
839 Index(Index),
840 Type(Type),
841 Func(Func),
842 Secret(Secret),
843 Connection(Connection),
844}
845
846impl From<CatalogEntry> for durable::Item {
847 fn from(entry: CatalogEntry) -> durable::Item {
848 let (create_sql, global_id, extra_versions) = entry.item.into_serialized();
849 durable::Item {
850 id: entry.id,
851 oid: entry.oid,
852 global_id,
853 schema_id: entry.name.qualifiers.schema_spec.into(),
854 name: entry.name.item,
855 create_sql,
856 owner_id: entry.owner_id,
857 privileges: entry.privileges.into_all_values().collect(),
858 extra_versions,
859 }
860 }
861}
862
863#[derive(Debug, Clone, Serialize)]
864pub struct Table {
865 pub create_sql: Option<String>,
867 pub desc: VersionedRelationDesc,
869 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
871 pub collections: BTreeMap<RelationVersion, GlobalId>,
872 #[serde(skip)]
874 pub conn_id: Option<ConnectionId>,
875 pub resolved_ids: ResolvedIds,
877 pub custom_logical_compaction_window: Option<CompactionWindow>,
879 pub is_retained_metrics_object: bool,
884 pub data_source: TableDataSource,
886}
887
888impl Table {
889 pub fn timeline(&self) -> Timeline {
890 match &self.data_source {
891 TableDataSource::TableWrites { .. } => Timeline::EpochMilliseconds,
894 TableDataSource::DataSource { timeline, .. } => timeline.clone(),
895 }
896 }
897
898 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
900 self.collections.values().copied()
901 }
902
903 pub fn global_id_writes(&self) -> GlobalId {
905 *self
906 .collections
907 .last_key_value()
908 .expect("at least one version of a table")
909 .1
910 }
911
912 pub fn collection_descs(
914 &self,
915 ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
916 self.collections.iter().map(|(version, gid)| {
917 let desc = self
918 .desc
919 .at_version(RelationVersionSelector::Specific(*version));
920 (*gid, *version, desc)
921 })
922 }
923
924 pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
926 let (version, _gid) = self
927 .collections
928 .iter()
929 .find(|(_version, gid)| *gid == id)
930 .expect("GlobalId to exist");
931 self.desc
932 .at_version(RelationVersionSelector::Specific(*version))
933 }
934}
935
936#[derive(Clone, Debug, Serialize)]
937pub enum TableDataSource {
938 TableWrites {
940 #[serde(skip)]
941 defaults: Vec<Expr<Aug>>,
942 },
943
944 DataSource {
947 desc: DataSourceDesc,
948 timeline: Timeline,
949 },
950}
951
952#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
953pub enum DataSourceDesc {
954 Ingestion {
956 desc: SourceDesc<ReferencedConnection>,
957 cluster_id: ClusterId,
958 },
959 OldSyntaxIngestion {
961 desc: SourceDesc<ReferencedConnection>,
962 cluster_id: ClusterId,
963 progress_subsource: CatalogItemId,
966 data_config: SourceExportDataConfig<ReferencedConnection>,
967 details: SourceExportDetails,
968 },
969 IngestionExport {
977 ingestion_id: CatalogItemId,
978 external_reference: UnresolvedItemName,
979 details: SourceExportDetails,
980 data_config: SourceExportDataConfig<ReferencedConnection>,
981 },
982 Introspection(IntrospectionType),
984 Progress,
986 Webhook {
988 validate_using: Option<WebhookValidation>,
990 body_format: WebhookBodyFormat,
992 headers: WebhookHeaders,
994 cluster_id: ClusterId,
996 },
997 Catalog,
999}
1000
1001impl From<IntrospectionType> for DataSourceDesc {
1002 fn from(typ: IntrospectionType) -> Self {
1003 Self::Introspection(typ)
1004 }
1005}
1006
1007impl DataSourceDesc {
1008 pub fn formats(&self) -> (Option<&str>, Option<&str>) {
1010 match &self {
1011 DataSourceDesc::Ingestion { .. } => (None, None),
1012 DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1013 match &data_config.encoding.as_ref() {
1014 Some(encoding) => match &encoding.key {
1015 Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1016 None => (None, Some(encoding.value.type_())),
1017 },
1018 None => (None, None),
1019 }
1020 }
1021 DataSourceDesc::IngestionExport { data_config, .. } => match &data_config.encoding {
1022 Some(encoding) => match &encoding.key {
1023 Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1024 None => (None, Some(encoding.value.type_())),
1025 },
1026 None => (None, None),
1027 },
1028 DataSourceDesc::Introspection(_)
1029 | DataSourceDesc::Webhook { .. }
1030 | DataSourceDesc::Progress
1031 | DataSourceDesc::Catalog => (None, None),
1032 }
1033 }
1034
1035 pub fn envelope(&self) -> Option<&str> {
1037 fn envelope_string(envelope: &SourceEnvelope) -> &str {
1042 match envelope {
1043 SourceEnvelope::None(_) => "none",
1044 SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
1045 mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert",
1046 mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
1047 "debezium"
1051 }
1052 mz_storage_types::sources::envelope::UpsertStyle::ValueErrInline { .. } => {
1053 "upsert-value-err-inline"
1054 }
1055 },
1056 SourceEnvelope::CdcV2 => {
1057 "materialize"
1060 }
1061 }
1062 }
1063
1064 match self {
1065 DataSourceDesc::Ingestion { .. } => None,
1070 DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1071 Some(envelope_string(&data_config.envelope))
1072 }
1073 DataSourceDesc::IngestionExport { data_config, .. } => {
1074 Some(envelope_string(&data_config.envelope))
1075 }
1076 DataSourceDesc::Introspection(_)
1077 | DataSourceDesc::Webhook { .. }
1078 | DataSourceDesc::Progress
1079 | DataSourceDesc::Catalog => None,
1080 }
1081 }
1082}
1083
1084#[derive(Debug, Clone, Serialize)]
1085pub struct Source {
1086 pub create_sql: Option<String>,
1088 pub global_id: GlobalId,
1090 #[serde(skip)]
1092 pub data_source: DataSourceDesc,
1093 pub desc: RelationDesc,
1095 pub timeline: Timeline,
1097 pub resolved_ids: ResolvedIds,
1099 pub custom_logical_compaction_window: Option<CompactionWindow>,
1103 pub is_retained_metrics_object: bool,
1106}
1107
1108impl Source {
1109 pub fn new(
1116 plan: CreateSourcePlan,
1117 global_id: GlobalId,
1118 resolved_ids: ResolvedIds,
1119 custom_logical_compaction_window: Option<CompactionWindow>,
1120 is_retained_metrics_object: bool,
1121 ) -> Source {
1122 Source {
1123 create_sql: Some(plan.source.create_sql),
1124 data_source: match plan.source.data_source {
1125 mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1126 desc,
1127 cluster_id: plan
1128 .in_cluster
1129 .expect("ingestion-based sources must be given a cluster ID"),
1130 },
1131 mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1132 desc,
1133 progress_subsource,
1134 data_config,
1135 details,
1136 } => DataSourceDesc::OldSyntaxIngestion {
1137 desc,
1138 cluster_id: plan
1139 .in_cluster
1140 .expect("ingestion-based sources must be given a cluster ID"),
1141 progress_subsource,
1142 data_config,
1143 details,
1144 },
1145 mz_sql::plan::DataSourceDesc::Progress => {
1146 assert!(
1147 plan.in_cluster.is_none(),
1148 "subsources must not have a host config or cluster_id defined"
1149 );
1150 DataSourceDesc::Progress
1151 }
1152 mz_sql::plan::DataSourceDesc::IngestionExport {
1153 ingestion_id,
1154 external_reference,
1155 details,
1156 data_config,
1157 } => {
1158 assert!(
1159 plan.in_cluster.is_none(),
1160 "subsources must not have a host config or cluster_id defined"
1161 );
1162 DataSourceDesc::IngestionExport {
1163 ingestion_id,
1164 external_reference,
1165 details,
1166 data_config,
1167 }
1168 }
1169 mz_sql::plan::DataSourceDesc::Webhook {
1170 validate_using,
1171 body_format,
1172 headers,
1173 cluster_id,
1174 } => {
1175 mz_ore::soft_assert_or_log!(
1176 cluster_id.is_none(),
1177 "cluster_id set at Source level for Webhooks"
1178 );
1179 DataSourceDesc::Webhook {
1180 validate_using,
1181 body_format,
1182 headers,
1183 cluster_id: plan
1184 .in_cluster
1185 .expect("webhook sources must be given a cluster ID"),
1186 }
1187 }
1188 },
1189 desc: plan.source.desc,
1190 global_id,
1191 timeline: plan.timeline,
1192 resolved_ids,
1193 custom_logical_compaction_window: plan
1194 .source
1195 .compaction_window
1196 .or(custom_logical_compaction_window),
1197 is_retained_metrics_object,
1198 }
1199 }
1200
1201 pub fn source_type(&self) -> &str {
1203 match &self.data_source {
1204 DataSourceDesc::Ingestion { desc, .. }
1205 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.name(),
1206 DataSourceDesc::Progress => "progress",
1207 DataSourceDesc::IngestionExport { .. } => "subsource",
1208 DataSourceDesc::Introspection(_) | DataSourceDesc::Catalog => "source",
1209 DataSourceDesc::Webhook { .. } => "webhook",
1210 }
1211 }
1212
1213 pub fn connection_id(&self) -> Option<CatalogItemId> {
1215 match &self.data_source {
1216 DataSourceDesc::Ingestion { desc, .. }
1217 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.connection_id(),
1218 DataSourceDesc::IngestionExport { .. }
1219 | DataSourceDesc::Introspection(_)
1220 | DataSourceDesc::Webhook { .. }
1221 | DataSourceDesc::Progress
1222 | DataSourceDesc::Catalog => None,
1223 }
1224 }
1225
1226 pub fn global_id(&self) -> GlobalId {
1228 self.global_id
1229 }
1230
1231 pub fn user_controllable_persist_shard_count(&self) -> i64 {
1239 match &self.data_source {
1240 DataSourceDesc::Ingestion { .. } => 0,
1241 DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1242 match &desc.connection {
1243 GenericSourceConnection::Postgres(_)
1247 | GenericSourceConnection::MySql(_)
1248 | GenericSourceConnection::SqlServer(_) => 0,
1249 GenericSourceConnection::LoadGenerator(lg) => match lg.load_generator {
1250 LoadGenerator::Clock
1252 | LoadGenerator::Counter { .. }
1253 | LoadGenerator::Datums
1254 | LoadGenerator::KeyValue(_) => 1,
1255 LoadGenerator::Auction
1256 | LoadGenerator::Marketing
1257 | LoadGenerator::Tpch { .. } => 0,
1258 },
1259 GenericSourceConnection::Kafka(_) => 1,
1260 }
1261 }
1262 DataSourceDesc::IngestionExport { .. } => 1,
1265 DataSourceDesc::Webhook { .. } => 1,
1266 DataSourceDesc::Introspection(_)
1269 | DataSourceDesc::Progress
1270 | DataSourceDesc::Catalog => 0,
1271 }
1272 }
1273}
1274
1275#[derive(Debug, Clone, Serialize)]
1276pub struct Log {
1277 pub variant: LogVariant,
1279 pub global_id: GlobalId,
1281}
1282
1283impl Log {
1284 pub fn global_id(&self) -> GlobalId {
1286 self.global_id
1287 }
1288}
1289
1290#[derive(Debug, Clone, Serialize)]
1291pub struct Sink {
1292 pub create_sql: String,
1294 pub global_id: GlobalId,
1296 pub from: GlobalId,
1298 pub connection: StorageSinkConnection<ReferencedConnection>,
1300 pub envelope: SinkEnvelope,
1304 pub with_snapshot: bool,
1306 pub version: u64,
1308 pub resolved_ids: ResolvedIds,
1310 pub cluster_id: ClusterId,
1312 pub commit_interval: Option<Duration>,
1314}
1315
1316impl Sink {
1317 pub fn sink_type(&self) -> &str {
1318 self.connection.name()
1319 }
1320
1321 pub fn envelope(&self) -> Option<&str> {
1323 match &self.envelope {
1324 SinkEnvelope::Debezium => Some("debezium"),
1325 SinkEnvelope::Upsert => Some("upsert"),
1326 SinkEnvelope::Append => Some("append"),
1327 }
1328 }
1329
1330 pub fn combined_format(&self) -> Option<Cow<'_, str>> {
1335 match &self.connection {
1336 StorageSinkConnection::Kafka(connection) => Some(connection.format.get_format_name()),
1337 StorageSinkConnection::Iceberg(_) => None,
1338 }
1339 }
1340
1341 pub fn formats(&self) -> Option<(Option<&str>, &str)> {
1343 match &self.connection {
1344 StorageSinkConnection::Kafka(connection) => {
1345 let key_format = connection
1346 .format
1347 .key_format
1348 .as_ref()
1349 .map(|f| f.get_format_name());
1350 let value_format = connection.format.value_format.get_format_name();
1351 Some((key_format, value_format))
1352 }
1353 StorageSinkConnection::Iceberg(_) => None,
1354 }
1355 }
1356
1357 pub fn connection_id(&self) -> Option<CatalogItemId> {
1358 self.connection.connection_id()
1359 }
1360
1361 pub fn global_id(&self) -> GlobalId {
1363 self.global_id
1364 }
1365}
1366
1367#[derive(Debug, Clone, Serialize)]
1368pub struct View {
1369 pub create_sql: String,
1371 pub global_id: GlobalId,
1373 pub raw_expr: Arc<HirRelationExpr>,
1375 pub locally_optimized_expr: Arc<OptimizedMirRelationExpr>,
1377 pub desc: RelationDesc,
1379 pub conn_id: Option<ConnectionId>,
1381 pub resolved_ids: ResolvedIds,
1383 pub dependencies: DependencyIds,
1385}
1386
1387impl View {
1388 pub fn global_id(&self) -> GlobalId {
1390 self.global_id
1391 }
1392}
1393
1394#[derive(Debug, Clone, Serialize)]
1395pub struct MaterializedView {
1396 pub create_sql: String,
1398 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
1400 pub collections: BTreeMap<RelationVersion, GlobalId>,
1401 pub raw_expr: Arc<HirRelationExpr>,
1403 pub locally_optimized_expr: Arc<OptimizedMirRelationExpr>,
1405 pub desc: VersionedRelationDesc,
1407 pub resolved_ids: ResolvedIds,
1409 pub dependencies: DependencyIds,
1411 pub replacement_target: Option<CatalogItemId>,
1413 pub cluster_id: ClusterId,
1415 pub target_replica: Option<ReplicaId>,
1417 pub non_null_assertions: Vec<usize>,
1421 pub custom_logical_compaction_window: Option<CompactionWindow>,
1423 pub refresh_schedule: Option<RefreshSchedule>,
1425 pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1430 #[serde(skip)]
1436 pub optimized_plan: Option<Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
1437 #[serde(skip)]
1439 pub physical_plan: Option<Arc<DataflowDescription<ComputePlan>>>,
1440 #[serde(skip)]
1442 pub dataflow_metainfo: Option<DataflowMetainfo<Arc<OptimizerNotice>>>,
1443}
1444
1445impl MaterializedView {
1446 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1448 self.collections.values().copied()
1449 }
1450
1451 pub fn global_id_writes(&self) -> GlobalId {
1454 *self
1455 .collections
1456 .last_key_value()
1457 .expect("at least one version of a materialized view")
1458 .1
1459 }
1460
1461 pub fn collection_descs(
1463 &self,
1464 ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
1465 self.collections.iter().map(|(version, gid)| {
1466 let desc = self
1467 .desc
1468 .at_version(RelationVersionSelector::Specific(*version));
1469 (*gid, *version, desc)
1470 })
1471 }
1472
1473 pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
1475 let (version, _gid) = self
1476 .collections
1477 .iter()
1478 .find(|(_version, gid)| *gid == id)
1479 .expect("GlobalId to exist");
1480 self.desc
1481 .at_version(RelationVersionSelector::Specific(*version))
1482 }
1483
1484 pub fn apply_replacement(&mut self, replacement: Self) {
1486 let target_id = replacement
1487 .replacement_target
1488 .expect("replacement has target");
1489
1490 fn parse(create_sql: &str) -> mz_sql::ast::CreateMaterializedViewStatement<Raw> {
1491 let res = mz_sql::parse::parse(create_sql).unwrap_or_else(|e| {
1492 panic!("invalid create_sql persisted in catalog: {e}\n{create_sql}");
1493 });
1494 if let Statement::CreateMaterializedView(cmvs) = res.into_element().ast {
1495 cmvs
1496 } else {
1497 panic!("invalid MV create_sql persisted in catalog\n{create_sql}");
1498 }
1499 }
1500
1501 let old_stmt = parse(&self.create_sql);
1502 let rpl_stmt = parse(&replacement.create_sql);
1503 let new_stmt = mz_sql::ast::CreateMaterializedViewStatement {
1504 if_exists: old_stmt.if_exists,
1505 name: old_stmt.name,
1506 columns: rpl_stmt.columns,
1507 replacement_for: None,
1508 in_cluster: rpl_stmt.in_cluster,
1509 in_cluster_replica: rpl_stmt.in_cluster_replica,
1510 query: rpl_stmt.query,
1511 as_of: rpl_stmt.as_of,
1512 with_options: rpl_stmt.with_options,
1513 };
1514 let create_sql = new_stmt.to_ast_string_stable();
1515
1516 let mut collections = std::mem::take(&mut self.collections);
1517 let latest_version = collections.keys().max().expect("at least one version");
1521 let new_version = latest_version.bump();
1522 collections.insert(new_version, replacement.global_id_writes());
1523
1524 let mut resolved_ids = replacement.resolved_ids;
1525 resolved_ids.remove_item(&target_id);
1526 let mut dependencies = replacement.dependencies;
1527 dependencies.0.remove(&target_id);
1528
1529 *self = Self {
1530 create_sql,
1531 collections,
1532 raw_expr: replacement.raw_expr,
1533 locally_optimized_expr: replacement.locally_optimized_expr,
1534 desc: replacement.desc,
1535 resolved_ids,
1536 dependencies,
1537 replacement_target: None,
1538 cluster_id: replacement.cluster_id,
1539 target_replica: replacement.target_replica,
1540 non_null_assertions: replacement.non_null_assertions,
1541 custom_logical_compaction_window: replacement.custom_logical_compaction_window,
1542 refresh_schedule: replacement.refresh_schedule,
1543 initial_as_of: replacement.initial_as_of,
1544 optimized_plan: replacement.optimized_plan,
1545 physical_plan: replacement.physical_plan,
1546 dataflow_metainfo: replacement.dataflow_metainfo,
1547 };
1548 }
1549}
1550
1551#[derive(Debug, Clone, Serialize)]
1552pub struct Index {
1553 pub create_sql: String,
1555 pub global_id: GlobalId,
1557 pub on: GlobalId,
1559 pub keys: Arc<[MirScalarExpr]>,
1561 pub conn_id: Option<ConnectionId>,
1563 pub resolved_ids: ResolvedIds,
1565 pub cluster_id: ClusterId,
1567 pub custom_logical_compaction_window: Option<CompactionWindow>,
1569 pub is_retained_metrics_object: bool,
1574 #[serde(skip)]
1580 pub optimized_plan: Option<Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
1581 #[serde(skip)]
1583 pub physical_plan: Option<Arc<DataflowDescription<ComputePlan>>>,
1584 #[serde(skip)]
1586 pub dataflow_metainfo: Option<DataflowMetainfo<Arc<OptimizerNotice>>>,
1587}
1588
1589impl Index {
1590 pub fn global_id(&self) -> GlobalId {
1592 self.global_id
1593 }
1594}
1595
1596#[derive(Debug, Clone, Serialize)]
1597pub struct Type {
1598 pub create_sql: Option<String>,
1600 pub global_id: GlobalId,
1602 #[serde(skip)]
1603 pub details: CatalogTypeDetails<IdReference>,
1604 pub resolved_ids: ResolvedIds,
1606}
1607
1608#[derive(Debug, Clone, Serialize)]
1609pub struct Func {
1610 #[serde(skip)]
1612 pub inner: &'static mz_sql::func::Func,
1613 pub global_id: GlobalId,
1615}
1616
1617#[derive(Debug, Clone, Serialize)]
1618pub struct Secret {
1619 pub create_sql: String,
1621 pub global_id: GlobalId,
1623}
1624
1625#[derive(Debug, Clone, Serialize)]
1626pub struct Connection {
1627 pub create_sql: String,
1629 pub global_id: GlobalId,
1631 pub details: ConnectionDetails,
1633 pub resolved_ids: ResolvedIds,
1635}
1636
1637impl Connection {
1638 pub fn global_id(&self) -> GlobalId {
1640 self.global_id
1641 }
1642}
1643
1644#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
1645pub struct NetworkPolicy {
1646 pub name: String,
1647 pub id: NetworkPolicyId,
1648 pub oid: u32,
1649 pub rules: Vec<NetworkPolicyRule>,
1650 pub owner_id: RoleId,
1651 pub privileges: PrivilegeMap,
1652}
1653
1654impl From<NetworkPolicy> for durable::NetworkPolicy {
1655 fn from(policy: NetworkPolicy) -> durable::NetworkPolicy {
1656 durable::NetworkPolicy {
1657 id: policy.id,
1658 oid: policy.oid,
1659 name: policy.name,
1660 rules: policy.rules,
1661 owner_id: policy.owner_id,
1662 privileges: policy.privileges.into_all_values().collect(),
1663 }
1664 }
1665}
1666
1667impl From<durable::NetworkPolicy> for NetworkPolicy {
1668 fn from(
1669 durable::NetworkPolicy {
1670 id,
1671 oid,
1672 name,
1673 rules,
1674 owner_id,
1675 privileges,
1676 }: durable::NetworkPolicy,
1677 ) -> Self {
1678 NetworkPolicy {
1679 id,
1680 oid,
1681 name,
1682 rules,
1683 owner_id,
1684 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1685 }
1686 }
1687}
1688
1689impl UpdateFrom<durable::NetworkPolicy> for NetworkPolicy {
1690 fn update_from(
1691 &mut self,
1692 durable::NetworkPolicy {
1693 id,
1694 oid,
1695 name,
1696 rules,
1697 owner_id,
1698 privileges,
1699 }: durable::NetworkPolicy,
1700 ) {
1701 self.id = id;
1702 self.oid = oid;
1703 self.name = name;
1704 self.rules = rules;
1705 self.owner_id = owner_id;
1706 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1707 }
1708}
1709
1710impl CatalogItem {
1711 pub fn typ(&self) -> mz_sql::catalog::CatalogItemType {
1713 match self {
1714 CatalogItem::Table(_) => CatalogItemType::Table,
1715 CatalogItem::Source(_) => CatalogItemType::Source,
1716 CatalogItem::Log(_) => CatalogItemType::Source,
1717 CatalogItem::Sink(_) => CatalogItemType::Sink,
1718 CatalogItem::View(_) => CatalogItemType::View,
1719 CatalogItem::MaterializedView(_) => CatalogItemType::MaterializedView,
1720 CatalogItem::Index(_) => CatalogItemType::Index,
1721 CatalogItem::Type(_) => CatalogItemType::Type,
1722 CatalogItem::Func(_) => CatalogItemType::Func,
1723 CatalogItem::Secret(_) => CatalogItemType::Secret,
1724 CatalogItem::Connection(_) => CatalogItemType::Connection,
1725 }
1726 }
1727
1728 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1730 let gid = match self {
1731 CatalogItem::Source(source) => source.global_id,
1732 CatalogItem::Log(log) => log.global_id,
1733 CatalogItem::Sink(sink) => sink.global_id,
1734 CatalogItem::View(view) => view.global_id,
1735 CatalogItem::MaterializedView(mv) => {
1736 return itertools::Either::Left(mv.collections.values().copied());
1737 }
1738 CatalogItem::Index(index) => index.global_id,
1739 CatalogItem::Func(func) => func.global_id,
1740 CatalogItem::Type(ty) => ty.global_id,
1741 CatalogItem::Secret(secret) => secret.global_id,
1742 CatalogItem::Connection(conn) => conn.global_id,
1743 CatalogItem::Table(table) => {
1744 return itertools::Either::Left(table.collections.values().copied());
1745 }
1746 };
1747 itertools::Either::Right(std::iter::once(gid))
1748 }
1749
1750 pub fn latest_global_id(&self) -> GlobalId {
1754 match self {
1755 CatalogItem::Source(source) => source.global_id,
1756 CatalogItem::Log(log) => log.global_id,
1757 CatalogItem::Sink(sink) => sink.global_id,
1758 CatalogItem::View(view) => view.global_id,
1759 CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
1760 CatalogItem::Index(index) => index.global_id,
1761 CatalogItem::Func(func) => func.global_id,
1762 CatalogItem::Type(ty) => ty.global_id,
1763 CatalogItem::Secret(secret) => secret.global_id,
1764 CatalogItem::Connection(conn) => conn.global_id,
1765 CatalogItem::Table(table) => table.global_id_writes(),
1766 }
1767 }
1768
1769 pub fn optimized_plan(&self) -> Option<&Arc<DataflowDescription<OptimizedMirRelationExpr>>> {
1771 match self {
1772 CatalogItem::Index(idx) => idx.optimized_plan.as_ref(),
1773 CatalogItem::MaterializedView(mv) => mv.optimized_plan.as_ref(),
1774 _ => None,
1775 }
1776 }
1777
1778 pub fn physical_plan(&self) -> Option<&Arc<DataflowDescription<ComputePlan>>> {
1780 match self {
1781 CatalogItem::Index(idx) => idx.physical_plan.as_ref(),
1782 CatalogItem::MaterializedView(mv) => mv.physical_plan.as_ref(),
1783 _ => None,
1784 }
1785 }
1786
1787 pub fn dataflow_metainfo(&self) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
1789 match self {
1790 CatalogItem::Index(idx) => idx.dataflow_metainfo.as_ref(),
1791 CatalogItem::MaterializedView(mv) => mv.dataflow_metainfo.as_ref(),
1792 _ => None,
1793 }
1794 }
1795
1796 pub fn plan_fields_mut(
1801 &mut self,
1802 ) -> Option<(
1803 &mut Option<Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
1804 &mut Option<Arc<DataflowDescription<ComputePlan>>>,
1805 &mut Option<DataflowMetainfo<Arc<OptimizerNotice>>>,
1806 )> {
1807 match self {
1808 CatalogItem::Index(idx) => Some((
1809 &mut idx.optimized_plan,
1810 &mut idx.physical_plan,
1811 &mut idx.dataflow_metainfo,
1812 )),
1813 CatalogItem::MaterializedView(mv) => Some((
1814 &mut mv.optimized_plan,
1815 &mut mv.physical_plan,
1816 &mut mv.dataflow_metainfo,
1817 )),
1818 _ => None,
1819 }
1820 }
1821
1822 pub fn is_storage_collection(&self) -> bool {
1824 match self {
1825 CatalogItem::Table(_)
1826 | CatalogItem::Source(_)
1827 | CatalogItem::MaterializedView(_)
1828 | CatalogItem::Sink(_) => true,
1829 CatalogItem::Log(_)
1830 | CatalogItem::View(_)
1831 | CatalogItem::Index(_)
1832 | CatalogItem::Type(_)
1833 | CatalogItem::Func(_)
1834 | CatalogItem::Secret(_)
1835 | CatalogItem::Connection(_) => false,
1836 }
1837 }
1838
1839 pub fn relation_desc(&self, version: RelationVersionSelector) -> Option<Cow<'_, RelationDesc>> {
1848 match &self {
1849 CatalogItem::Source(src) => Some(Cow::Borrowed(&src.desc)),
1850 CatalogItem::Log(log) => Some(Cow::Owned(log.variant.desc())),
1851 CatalogItem::Table(tbl) => Some(Cow::Owned(tbl.desc.at_version(version))),
1852 CatalogItem::View(view) => Some(Cow::Borrowed(&view.desc)),
1853 CatalogItem::MaterializedView(mview) => {
1854 Some(Cow::Owned(mview.desc.at_version(version)))
1855 }
1856 CatalogItem::Func(_)
1857 | CatalogItem::Index(_)
1858 | CatalogItem::Sink(_)
1859 | CatalogItem::Secret(_)
1860 | CatalogItem::Connection(_)
1861 | CatalogItem::Type(_) => None,
1862 }
1863 }
1864
1865 pub fn func(
1866 &self,
1867 entry: &CatalogEntry,
1868 ) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
1869 match &self {
1870 CatalogItem::Func(func) => Ok(func.inner),
1871 _ => Err(SqlCatalogError::UnexpectedType {
1872 name: entry.name().item.to_string(),
1873 actual_type: entry.item_type(),
1874 expected_type: CatalogItemType::Func,
1875 }),
1876 }
1877 }
1878
1879 pub fn source_desc(
1880 &self,
1881 entry: &CatalogEntry,
1882 ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
1883 match &self {
1884 CatalogItem::Source(source) => match &source.data_source {
1885 DataSourceDesc::Ingestion { desc, .. }
1886 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => Ok(Some(desc)),
1887 DataSourceDesc::IngestionExport { .. }
1888 | DataSourceDesc::Introspection(_)
1889 | DataSourceDesc::Webhook { .. }
1890 | DataSourceDesc::Progress
1891 | DataSourceDesc::Catalog => Ok(None),
1892 },
1893 _ => Err(SqlCatalogError::UnexpectedType {
1894 name: entry.name().item.to_string(),
1895 actual_type: entry.item_type(),
1896 expected_type: CatalogItemType::Source,
1897 }),
1898 }
1899 }
1900
1901 pub fn is_progress_source(&self) -> bool {
1903 matches!(
1904 self,
1905 CatalogItem::Source(Source {
1906 data_source: DataSourceDesc::Progress,
1907 ..
1908 })
1909 )
1910 }
1911
1912 pub fn references(&self) -> &ResolvedIds {
1915 static EMPTY: LazyLock<ResolvedIds> = LazyLock::new(ResolvedIds::empty);
1916 match self {
1917 CatalogItem::Func(_) => &*EMPTY,
1918 CatalogItem::Index(idx) => &idx.resolved_ids,
1919 CatalogItem::Sink(sink) => &sink.resolved_ids,
1920 CatalogItem::Source(source) => &source.resolved_ids,
1921 CatalogItem::Log(_) => &*EMPTY,
1922 CatalogItem::Table(table) => &table.resolved_ids,
1923 CatalogItem::Type(typ) => &typ.resolved_ids,
1924 CatalogItem::View(view) => &view.resolved_ids,
1925 CatalogItem::MaterializedView(mview) => &mview.resolved_ids,
1926 CatalogItem::Secret(_) => &*EMPTY,
1927 CatalogItem::Connection(connection) => &connection.resolved_ids,
1928 }
1929 }
1930
1931 pub fn uses(&self) -> BTreeSet<CatalogItemId> {
1937 let mut uses: BTreeSet<_> = self.references().items().copied().collect();
1938 match self {
1939 CatalogItem::Func(_) => {}
1942 CatalogItem::Index(_) => {}
1943 CatalogItem::Sink(_) => {}
1944 CatalogItem::Source(_) => {}
1945 CatalogItem::Log(_) => {}
1946 CatalogItem::Table(_) => {}
1947 CatalogItem::Type(_) => {}
1948 CatalogItem::View(view) => uses.extend(view.dependencies.0.iter().copied()),
1949 CatalogItem::MaterializedView(mview) => {
1950 uses.extend(mview.dependencies.0.iter().copied())
1951 }
1952 CatalogItem::Secret(_) => {}
1953 CatalogItem::Connection(_) => {}
1954 }
1955 uses
1956 }
1957
1958 pub fn conn_id(&self) -> Option<&ConnectionId> {
1961 match self {
1962 CatalogItem::View(view) => view.conn_id.as_ref(),
1963 CatalogItem::Index(index) => index.conn_id.as_ref(),
1964 CatalogItem::Table(table) => table.conn_id.as_ref(),
1965 CatalogItem::Log(_)
1966 | CatalogItem::Source(_)
1967 | CatalogItem::Sink(_)
1968 | CatalogItem::MaterializedView(_)
1969 | CatalogItem::Secret(_)
1970 | CatalogItem::Type(_)
1971 | CatalogItem::Func(_)
1972 | CatalogItem::Connection(_) => None,
1973 }
1974 }
1975
1976 pub fn set_conn_id(&mut self, conn_id: Option<ConnectionId>) {
1979 match self {
1980 CatalogItem::View(view) => view.conn_id = conn_id,
1981 CatalogItem::Index(index) => index.conn_id = conn_id,
1982 CatalogItem::Table(table) => table.conn_id = conn_id,
1983 CatalogItem::Log(_)
1984 | CatalogItem::Source(_)
1985 | CatalogItem::Sink(_)
1986 | CatalogItem::MaterializedView(_)
1987 | CatalogItem::Secret(_)
1988 | CatalogItem::Type(_)
1989 | CatalogItem::Func(_)
1990 | CatalogItem::Connection(_) => (),
1991 }
1992 }
1993
1994 pub fn is_temporary(&self) -> bool {
1996 self.conn_id().is_some()
1997 }
1998
1999 pub fn rename_schema_refs(
2000 &self,
2001 database_name: &str,
2002 cur_schema_name: &str,
2003 new_schema_name: &str,
2004 ) -> Result<CatalogItem, (String, String)> {
2005 let do_rewrite = |create_sql: String| -> Result<String, (String, String)> {
2006 let mut create_stmt = mz_sql::parse::parse(&create_sql)
2007 .expect("invalid create sql persisted to catalog")
2008 .into_element()
2009 .ast;
2010
2011 mz_sql::ast::transform::create_stmt_rename_schema_refs(
2013 &mut create_stmt,
2014 database_name,
2015 cur_schema_name,
2016 new_schema_name,
2017 )?;
2018
2019 Ok(create_stmt.to_ast_string_stable())
2020 };
2021
2022 match self {
2023 CatalogItem::Table(i) => {
2024 let mut i = i.clone();
2025 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2026 Ok(CatalogItem::Table(i))
2027 }
2028 CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
2029 CatalogItem::Source(i) => {
2030 let mut i = i.clone();
2031 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2032 Ok(CatalogItem::Source(i))
2033 }
2034 CatalogItem::Sink(i) => {
2035 let mut i = i.clone();
2036 i.create_sql = do_rewrite(i.create_sql)?;
2037 Ok(CatalogItem::Sink(i))
2038 }
2039 CatalogItem::View(i) => {
2040 let mut i = i.clone();
2041 i.create_sql = do_rewrite(i.create_sql)?;
2042 Ok(CatalogItem::View(i))
2043 }
2044 CatalogItem::MaterializedView(i) => {
2045 let mut i = i.clone();
2046 i.create_sql = do_rewrite(i.create_sql)?;
2047 Ok(CatalogItem::MaterializedView(i))
2048 }
2049 CatalogItem::Index(i) => {
2050 let mut i = i.clone();
2051 i.create_sql = do_rewrite(i.create_sql)?;
2052 Ok(CatalogItem::Index(i))
2053 }
2054 CatalogItem::Secret(i) => {
2055 let mut i = i.clone();
2056 i.create_sql = do_rewrite(i.create_sql)?;
2057 Ok(CatalogItem::Secret(i))
2058 }
2059 CatalogItem::Connection(i) => {
2060 let mut i = i.clone();
2061 i.create_sql = do_rewrite(i.create_sql)?;
2062 Ok(CatalogItem::Connection(i))
2063 }
2064 CatalogItem::Type(i) => {
2065 let mut i = i.clone();
2066 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2067 Ok(CatalogItem::Type(i))
2068 }
2069 CatalogItem::Func(i) => Ok(CatalogItem::Func(i.clone())),
2070 }
2071 }
2072
2073 pub fn rename_item_refs(
2077 &self,
2078 from: FullItemName,
2079 to_item_name: String,
2080 rename_self: bool,
2081 ) -> Result<CatalogItem, String> {
2082 let do_rewrite = |create_sql: String| -> Result<String, String> {
2083 let mut create_stmt = mz_sql::parse::parse(&create_sql)
2084 .expect("invalid create sql persisted to catalog")
2085 .into_element()
2086 .ast;
2087 if rename_self {
2088 mz_sql::ast::transform::create_stmt_rename(&mut create_stmt, to_item_name.clone());
2089 }
2090 mz_sql::ast::transform::create_stmt_rename_refs(&mut create_stmt, from, to_item_name)?;
2092 Ok(create_stmt.to_ast_string_stable())
2093 };
2094
2095 match self {
2096 CatalogItem::Table(i) => {
2097 let mut i = i.clone();
2098 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2099 Ok(CatalogItem::Table(i))
2100 }
2101 CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
2102 CatalogItem::Source(i) => {
2103 let mut i = i.clone();
2104 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2105 Ok(CatalogItem::Source(i))
2106 }
2107 CatalogItem::Sink(i) => {
2108 let mut i = i.clone();
2109 i.create_sql = do_rewrite(i.create_sql)?;
2110 Ok(CatalogItem::Sink(i))
2111 }
2112 CatalogItem::View(i) => {
2113 let mut i = i.clone();
2114 i.create_sql = do_rewrite(i.create_sql)?;
2115 Ok(CatalogItem::View(i))
2116 }
2117 CatalogItem::MaterializedView(i) => {
2118 let mut i = i.clone();
2119 i.create_sql = do_rewrite(i.create_sql)?;
2120 Ok(CatalogItem::MaterializedView(i))
2121 }
2122 CatalogItem::Index(i) => {
2123 let mut i = i.clone();
2124 i.create_sql = do_rewrite(i.create_sql)?;
2125 Ok(CatalogItem::Index(i))
2126 }
2127 CatalogItem::Secret(i) => {
2128 let mut i = i.clone();
2129 i.create_sql = do_rewrite(i.create_sql)?;
2130 Ok(CatalogItem::Secret(i))
2131 }
2132 CatalogItem::Func(_) | CatalogItem::Type(_) => {
2133 unreachable!("{}s cannot be renamed", self.typ())
2134 }
2135 CatalogItem::Connection(i) => {
2136 let mut i = i.clone();
2137 i.create_sql = do_rewrite(i.create_sql)?;
2138 Ok(CatalogItem::Connection(i))
2139 }
2140 }
2141 }
2142
2143 pub fn replace_item_refs(&self, old_id: CatalogItemId, new_id: CatalogItemId) -> CatalogItem {
2145 let do_rewrite = |create_sql: String| -> String {
2146 let mut create_stmt = mz_sql::parse::parse(&create_sql)
2147 .expect("invalid create sql persisted to catalog")
2148 .into_element()
2149 .ast;
2150 mz_sql::ast::transform::create_stmt_replace_ids(
2151 &mut create_stmt,
2152 &[(old_id, new_id)].into(),
2153 );
2154 create_stmt.to_ast_string_stable()
2155 };
2156
2157 match self {
2158 CatalogItem::Table(i) => {
2159 let mut i = i.clone();
2160 i.create_sql = i.create_sql.map(do_rewrite);
2161 CatalogItem::Table(i)
2162 }
2163 CatalogItem::Log(i) => CatalogItem::Log(i.clone()),
2164 CatalogItem::Source(i) => {
2165 let mut i = i.clone();
2166 i.create_sql = i.create_sql.map(do_rewrite);
2167 CatalogItem::Source(i)
2168 }
2169 CatalogItem::Sink(i) => {
2170 let mut i = i.clone();
2171 i.create_sql = do_rewrite(i.create_sql);
2172 CatalogItem::Sink(i)
2173 }
2174 CatalogItem::View(i) => {
2175 let mut i = i.clone();
2176 i.create_sql = do_rewrite(i.create_sql);
2177 CatalogItem::View(i)
2178 }
2179 CatalogItem::MaterializedView(i) => {
2180 let mut i = i.clone();
2181 i.create_sql = do_rewrite(i.create_sql);
2182 CatalogItem::MaterializedView(i)
2183 }
2184 CatalogItem::Index(i) => {
2185 let mut i = i.clone();
2186 i.create_sql = do_rewrite(i.create_sql);
2187 CatalogItem::Index(i)
2188 }
2189 CatalogItem::Secret(i) => {
2190 let mut i = i.clone();
2191 i.create_sql = do_rewrite(i.create_sql);
2192 CatalogItem::Secret(i)
2193 }
2194 CatalogItem::Func(_) | CatalogItem::Type(_) => {
2195 unreachable!("references of {}s cannot be replaced", self.typ())
2196 }
2197 CatalogItem::Connection(i) => {
2198 let mut i = i.clone();
2199 i.create_sql = do_rewrite(i.create_sql);
2200 CatalogItem::Connection(i)
2201 }
2202 }
2203 }
2204 pub fn update_retain_history(
2207 &mut self,
2208 value: Option<Value>,
2209 window: CompactionWindow,
2210 ) -> Result<Option<WithOptionValue<Raw>>, ()> {
2211 let update = |mut ast: &mut Statement<Raw>| {
2212 macro_rules! update_retain_history {
2214 ( $stmt:ident, $opt:ident, $name:ident ) => {{
2215 let pos = $stmt
2217 .with_options
2218 .iter()
2219 .rposition(|o| o.name == mz_sql_parser::ast::$name::RetainHistory);
2221 if let Some(value) = value {
2222 let next = mz_sql_parser::ast::$opt {
2223 name: mz_sql_parser::ast::$name::RetainHistory,
2224 value: Some(WithOptionValue::RetainHistoryFor(value)),
2225 };
2226 if let Some(idx) = pos {
2227 let previous = $stmt.with_options[idx].clone();
2228 $stmt.with_options[idx] = next;
2229 previous.value
2230 } else {
2231 $stmt.with_options.push(next);
2232 None
2233 }
2234 } else {
2235 if let Some(idx) = pos {
2236 $stmt.with_options.swap_remove(idx).value
2237 } else {
2238 None
2239 }
2240 }
2241 }};
2242 }
2243 let previous = match &mut ast {
2244 Statement::CreateTable(stmt) => {
2245 update_retain_history!(stmt, TableOption, TableOptionName)
2246 }
2247 Statement::CreateIndex(stmt) => {
2248 update_retain_history!(stmt, IndexOption, IndexOptionName)
2249 }
2250 Statement::CreateSource(stmt) => {
2251 update_retain_history!(stmt, CreateSourceOption, CreateSourceOptionName)
2252 }
2253 Statement::CreateMaterializedView(stmt) => {
2254 update_retain_history!(stmt, MaterializedViewOption, MaterializedViewOptionName)
2255 }
2256 _ => {
2257 return Err(());
2258 }
2259 };
2260 Ok(previous)
2261 };
2262
2263 let res = self.update_sql(update)?;
2264 let cw = self
2265 .custom_logical_compaction_window_mut()
2266 .expect("item must have compaction window");
2267 *cw = Some(window);
2268 Ok(res)
2269 }
2270
2271 pub fn update_timestamp_interval(
2274 &mut self,
2275 value: Option<Value>,
2276 interval: Duration,
2277 ) -> Result<Option<WithOptionValue<Raw>>, ()> {
2278 let update = |ast: &mut Statement<Raw>| match ast {
2279 Statement::CreateSource(stmt) => {
2280 let pos = stmt.with_options.iter().rposition(|o| {
2281 o.name == mz_sql_parser::ast::CreateSourceOptionName::TimestampInterval
2282 });
2283 let previous = if let Some(value) = value {
2284 let next = mz_sql_parser::ast::CreateSourceOption {
2285 name: mz_sql_parser::ast::CreateSourceOptionName::TimestampInterval,
2286 value: Some(WithOptionValue::Value(value)),
2287 };
2288 if let Some(idx) = pos {
2289 let previous = stmt.with_options[idx].clone();
2290 stmt.with_options[idx] = next;
2291 previous.value
2292 } else {
2293 stmt.with_options.push(next);
2294 None
2295 }
2296 } else if let Some(idx) = pos {
2297 stmt.with_options.swap_remove(idx).value
2298 } else {
2299 None
2300 };
2301 Ok(previous)
2302 }
2303 _ => Err(()),
2304 };
2305
2306 let previous = self.update_sql(update)?;
2307
2308 match self {
2310 CatalogItem::Source(source) => {
2311 match &mut source.data_source {
2312 DataSourceDesc::Ingestion { desc, .. }
2313 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
2314 desc.timestamp_interval = interval;
2315 }
2316 _ => return Err(()),
2317 }
2318 Ok(previous)
2319 }
2320 _ => Err(()),
2321 }
2322 }
2323
2324 pub fn add_column(
2325 &mut self,
2326 name: ColumnName,
2327 typ: SqlColumnType,
2328 sql: RawDataType,
2329 ) -> Result<RelationVersion, PlanError> {
2330 let CatalogItem::Table(table) = self else {
2331 return Err(PlanError::Unsupported {
2332 feature: "adding columns to a non-Table".to_string(),
2333 discussion_no: None,
2334 });
2335 };
2336 let next_version = table.desc.add_column(name.clone(), typ);
2337
2338 let update = |mut ast: &mut Statement<Raw>| match &mut ast {
2339 Statement::CreateTable(stmt) => {
2340 let version = ColumnOptionDef {
2341 name: None,
2342 option: ColumnOption::Versioned {
2343 action: ColumnVersioned::Added,
2344 version: next_version.into(),
2345 },
2346 };
2347 let column = ColumnDef {
2348 name: name.into(),
2349 data_type: sql,
2350 collation: None,
2351 options: vec![version],
2352 };
2353 stmt.columns.push(column);
2354 Ok(())
2355 }
2356 _ => Err(()),
2357 };
2358
2359 self.update_sql(update)
2360 .map_err(|()| PlanError::Unstructured("expected CREATE TABLE statement".to_string()))?;
2361 Ok(next_version)
2362 }
2363
2364 pub fn update_sql<F, T>(&mut self, f: F) -> Result<T, ()>
2367 where
2368 F: FnOnce(&mut Statement<Raw>) -> Result<T, ()>,
2369 {
2370 let create_sql = match self {
2371 CatalogItem::Table(Table { create_sql, .. })
2372 | CatalogItem::Type(Type { create_sql, .. })
2373 | CatalogItem::Source(Source { create_sql, .. }) => create_sql.as_mut(),
2374 CatalogItem::Sink(Sink { create_sql, .. })
2375 | CatalogItem::View(View { create_sql, .. })
2376 | CatalogItem::MaterializedView(MaterializedView { create_sql, .. })
2377 | CatalogItem::Index(Index { create_sql, .. })
2378 | CatalogItem::Secret(Secret { create_sql, .. })
2379 | CatalogItem::Connection(Connection { create_sql, .. }) => Some(create_sql),
2380 CatalogItem::Func(_) | CatalogItem::Log(_) => None,
2381 };
2382 let Some(create_sql) = create_sql else {
2383 return Err(());
2384 };
2385 let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
2386 .expect("non-system items must be parseable")
2387 .into_element()
2388 .ast;
2389 debug!("rewrite: {}", ast.to_ast_string_redacted());
2390 let t = f(&mut ast)?;
2391 *create_sql = ast.to_ast_string_stable();
2392 debug!("rewrote: {}", ast.to_ast_string_redacted());
2393 Ok(t)
2394 }
2395
2396 pub fn is_compute_object_on_cluster(&self) -> Option<ClusterId> {
2403 match self {
2404 CatalogItem::Index(index) => Some(index.cluster_id),
2405 CatalogItem::Table(_)
2406 | CatalogItem::Source(_)
2407 | CatalogItem::Log(_)
2408 | CatalogItem::View(_)
2409 | CatalogItem::MaterializedView(_)
2410 | CatalogItem::Sink(_)
2411 | CatalogItem::Type(_)
2412 | CatalogItem::Func(_)
2413 | CatalogItem::Secret(_)
2414 | CatalogItem::Connection(_) => None,
2415 }
2416 }
2417
2418 pub fn cluster_id(&self) -> Option<ClusterId> {
2419 match self {
2420 CatalogItem::MaterializedView(mv) => Some(mv.cluster_id),
2421 CatalogItem::Index(index) => Some(index.cluster_id),
2422 CatalogItem::Source(source) => match &source.data_source {
2423 DataSourceDesc::Ingestion { cluster_id, .. }
2424 | DataSourceDesc::OldSyntaxIngestion { cluster_id, .. } => Some(*cluster_id),
2425 DataSourceDesc::IngestionExport { .. } => None,
2429 DataSourceDesc::Webhook { cluster_id, .. } => Some(*cluster_id),
2430 DataSourceDesc::Introspection(_)
2431 | DataSourceDesc::Progress
2432 | DataSourceDesc::Catalog => None,
2433 },
2434 CatalogItem::Sink(sink) => Some(sink.cluster_id),
2435 CatalogItem::Table(_)
2436 | CatalogItem::Log(_)
2437 | CatalogItem::View(_)
2438 | CatalogItem::Type(_)
2439 | CatalogItem::Func(_)
2440 | CatalogItem::Secret(_)
2441 | CatalogItem::Connection(_) => None,
2442 }
2443 }
2444
2445 pub fn custom_logical_compaction_window(&self) -> Option<CompactionWindow> {
2448 match self {
2449 CatalogItem::Table(table) => table.custom_logical_compaction_window,
2450 CatalogItem::Source(source) => source.custom_logical_compaction_window,
2451 CatalogItem::Index(index) => index.custom_logical_compaction_window,
2452 CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
2453 CatalogItem::Log(_)
2454 | CatalogItem::View(_)
2455 | CatalogItem::Sink(_)
2456 | CatalogItem::Type(_)
2457 | CatalogItem::Func(_)
2458 | CatalogItem::Secret(_)
2459 | CatalogItem::Connection(_) => None,
2460 }
2461 }
2462
2463 pub fn custom_logical_compaction_window_mut(
2467 &mut self,
2468 ) -> Option<&mut Option<CompactionWindow>> {
2469 let cw = match self {
2470 CatalogItem::Table(table) => &mut table.custom_logical_compaction_window,
2471 CatalogItem::Source(source) => &mut source.custom_logical_compaction_window,
2472 CatalogItem::Index(index) => &mut index.custom_logical_compaction_window,
2473 CatalogItem::MaterializedView(mview) => &mut mview.custom_logical_compaction_window,
2474 CatalogItem::Log(_)
2475 | CatalogItem::View(_)
2476 | CatalogItem::Sink(_)
2477 | CatalogItem::Type(_)
2478 | CatalogItem::Func(_)
2479 | CatalogItem::Secret(_)
2480 | CatalogItem::Connection(_) => return None,
2481 };
2482 Some(cw)
2483 }
2484
2485 pub fn initial_logical_compaction_window(&self) -> Option<CompactionWindow> {
2493 let custom_logical_compaction_window = match self {
2494 CatalogItem::Table(_)
2495 | CatalogItem::Source(_)
2496 | CatalogItem::Index(_)
2497 | CatalogItem::MaterializedView(_) => self.custom_logical_compaction_window(),
2498 CatalogItem::Log(_)
2499 | CatalogItem::View(_)
2500 | CatalogItem::Sink(_)
2501 | CatalogItem::Type(_)
2502 | CatalogItem::Func(_)
2503 | CatalogItem::Secret(_)
2504 | CatalogItem::Connection(_) => return None,
2505 };
2506 Some(custom_logical_compaction_window.unwrap_or(CompactionWindow::Default))
2507 }
2508
2509 pub fn is_retained_metrics_object(&self) -> bool {
2513 match self {
2514 CatalogItem::Table(table) => table.is_retained_metrics_object,
2515 CatalogItem::Source(source) => source.is_retained_metrics_object,
2516 CatalogItem::Index(index) => index.is_retained_metrics_object,
2517 CatalogItem::Log(_)
2518 | CatalogItem::View(_)
2519 | CatalogItem::MaterializedView(_)
2520 | CatalogItem::Sink(_)
2521 | CatalogItem::Type(_)
2522 | CatalogItem::Func(_)
2523 | CatalogItem::Secret(_)
2524 | CatalogItem::Connection(_) => false,
2525 }
2526 }
2527
2528 pub fn to_serialized(&self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2529 match self {
2530 CatalogItem::Table(table) => {
2531 let create_sql = table
2532 .create_sql
2533 .clone()
2534 .expect("builtin tables cannot be serialized");
2535 let mut collections = table.collections.clone();
2536 let global_id = collections
2537 .remove(&RelationVersion::root())
2538 .expect("at least one version");
2539 (create_sql, global_id, collections)
2540 }
2541 CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2542 CatalogItem::Source(source) => {
2543 assert!(
2544 !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2545 "cannot serialize introspection/builtin sources",
2546 );
2547 let create_sql = source
2548 .create_sql
2549 .clone()
2550 .expect("builtin sources cannot be serialized");
2551 (create_sql, source.global_id, BTreeMap::new())
2552 }
2553 CatalogItem::View(view) => (view.create_sql.clone(), view.global_id, BTreeMap::new()),
2554 CatalogItem::MaterializedView(mview) => {
2555 let mut collections = mview.collections.clone();
2556 let global_id = collections
2557 .remove(&RelationVersion::root())
2558 .expect("at least one version");
2559 (mview.create_sql.clone(), global_id, collections)
2560 }
2561 CatalogItem::Index(index) => {
2562 (index.create_sql.clone(), index.global_id, BTreeMap::new())
2563 }
2564 CatalogItem::Sink(sink) => (sink.create_sql.clone(), sink.global_id, BTreeMap::new()),
2565 CatalogItem::Type(typ) => {
2566 let create_sql = typ
2567 .create_sql
2568 .clone()
2569 .expect("builtin types cannot be serialized");
2570 (create_sql, typ.global_id, BTreeMap::new())
2571 }
2572 CatalogItem::Secret(secret) => {
2573 (secret.create_sql.clone(), secret.global_id, BTreeMap::new())
2574 }
2575 CatalogItem::Connection(connection) => (
2576 connection.create_sql.clone(),
2577 connection.global_id,
2578 BTreeMap::new(),
2579 ),
2580 CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2581 }
2582 }
2583
2584 pub fn into_serialized(self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2585 match self {
2586 CatalogItem::Table(mut table) => {
2587 let create_sql = table
2588 .create_sql
2589 .expect("builtin tables cannot be serialized");
2590 let global_id = table
2591 .collections
2592 .remove(&RelationVersion::root())
2593 .expect("at least one version");
2594 (create_sql, global_id, table.collections)
2595 }
2596 CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2597 CatalogItem::Source(source) => {
2598 assert!(
2599 !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2600 "cannot serialize introspection/builtin sources",
2601 );
2602 let create_sql = source
2603 .create_sql
2604 .expect("builtin sources cannot be serialized");
2605 (create_sql, source.global_id, BTreeMap::new())
2606 }
2607 CatalogItem::View(view) => (view.create_sql, view.global_id, BTreeMap::new()),
2608 CatalogItem::MaterializedView(mut mview) => {
2609 let global_id = mview
2610 .collections
2611 .remove(&RelationVersion::root())
2612 .expect("at least one version");
2613 (mview.create_sql, global_id, mview.collections)
2614 }
2615 CatalogItem::Index(index) => (index.create_sql, index.global_id, BTreeMap::new()),
2616 CatalogItem::Sink(sink) => (sink.create_sql, sink.global_id, BTreeMap::new()),
2617 CatalogItem::Type(typ) => {
2618 let create_sql = typ.create_sql.expect("builtin types cannot be serialized");
2619 (create_sql, typ.global_id, BTreeMap::new())
2620 }
2621 CatalogItem::Secret(secret) => (secret.create_sql, secret.global_id, BTreeMap::new()),
2622 CatalogItem::Connection(connection) => {
2623 (connection.create_sql, connection.global_id, BTreeMap::new())
2624 }
2625 CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2626 }
2627 }
2628
2629 pub fn global_id_for_version(&self, version: RelationVersionSelector) -> Option<GlobalId> {
2632 let collections = match self {
2633 CatalogItem::MaterializedView(mv) => &mv.collections,
2634 CatalogItem::Table(table) => &table.collections,
2635 CatalogItem::Source(source) => return Some(source.global_id),
2636 CatalogItem::Log(log) => return Some(log.global_id),
2637 CatalogItem::View(view) => return Some(view.global_id),
2638 CatalogItem::Sink(sink) => return Some(sink.global_id),
2639 CatalogItem::Index(index) => return Some(index.global_id),
2640 CatalogItem::Type(ty) => return Some(ty.global_id),
2641 CatalogItem::Func(func) => return Some(func.global_id),
2642 CatalogItem::Secret(secret) => return Some(secret.global_id),
2643 CatalogItem::Connection(conn) => return Some(conn.global_id),
2644 };
2645 match version {
2646 RelationVersionSelector::Latest => collections.values().last().copied(),
2647 RelationVersionSelector::Specific(version) => collections.get(&version).copied(),
2648 }
2649 }
2650}
2651
2652impl CatalogEntry {
2653 pub fn relation_desc_latest(&self) -> Option<Cow<'_, RelationDesc>> {
2656 self.item.relation_desc(RelationVersionSelector::Latest)
2657 }
2658
2659 pub fn has_columns(&self) -> bool {
2661 match self.item() {
2662 CatalogItem::Type(Type { details, .. }) => {
2663 matches!(details.typ, CatalogType::Record { .. })
2664 }
2665 _ => self.relation_desc_latest().is_some(),
2666 }
2667 }
2668
2669 pub fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
2671 self.item.func(self)
2672 }
2673
2674 pub fn index(&self) -> Option<&Index> {
2676 match self.item() {
2677 CatalogItem::Index(idx) => Some(idx),
2678 _ => None,
2679 }
2680 }
2681
2682 pub fn materialized_view(&self) -> Option<&MaterializedView> {
2684 match self.item() {
2685 CatalogItem::MaterializedView(mv) => Some(mv),
2686 _ => None,
2687 }
2688 }
2689
2690 pub fn table(&self) -> Option<&Table> {
2692 match self.item() {
2693 CatalogItem::Table(tbl) => Some(tbl),
2694 _ => None,
2695 }
2696 }
2697
2698 pub fn source(&self) -> Option<&Source> {
2700 match self.item() {
2701 CatalogItem::Source(src) => Some(src),
2702 _ => None,
2703 }
2704 }
2705
2706 pub fn sink(&self) -> Option<&Sink> {
2708 match self.item() {
2709 CatalogItem::Sink(sink) => Some(sink),
2710 _ => None,
2711 }
2712 }
2713
2714 pub fn secret(&self) -> Option<&Secret> {
2716 match self.item() {
2717 CatalogItem::Secret(secret) => Some(secret),
2718 _ => None,
2719 }
2720 }
2721
2722 pub fn connection(&self) -> Result<&Connection, SqlCatalogError> {
2723 match self.item() {
2724 CatalogItem::Connection(connection) => Ok(connection),
2725 _ => {
2726 let db_name = match self.name().qualifiers.database_spec {
2727 ResolvedDatabaseSpecifier::Ambient => "".to_string(),
2728 ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
2729 };
2730 Err(SqlCatalogError::UnknownConnection(format!(
2731 "{}{}.{}",
2732 db_name,
2733 self.name().qualifiers.schema_spec,
2734 self.name().item
2735 )))
2736 }
2737 }
2738 }
2739
2740 pub fn source_desc(
2743 &self,
2744 ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
2745 self.item.source_desc(self)
2746 }
2747
2748 pub fn is_connection(&self) -> bool {
2750 matches!(self.item(), CatalogItem::Connection(_))
2751 }
2752
2753 pub fn is_table(&self) -> bool {
2755 matches!(self.item(), CatalogItem::Table(_))
2756 }
2757
2758 pub fn is_source(&self) -> bool {
2761 matches!(self.item(), CatalogItem::Source(_))
2762 }
2763
2764 pub fn subsource_details(
2767 &self,
2768 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
2769 match &self.item() {
2770 CatalogItem::Source(source) => match &source.data_source {
2771 DataSourceDesc::IngestionExport {
2772 ingestion_id,
2773 external_reference,
2774 details,
2775 data_config: _,
2776 } => Some((*ingestion_id, external_reference, details)),
2777 _ => None,
2778 },
2779 _ => None,
2780 }
2781 }
2782
2783 pub fn source_export_details(
2786 &self,
2787 ) -> Option<(
2788 CatalogItemId,
2789 &UnresolvedItemName,
2790 &SourceExportDetails,
2791 &SourceExportDataConfig<ReferencedConnection>,
2792 )> {
2793 match &self.item() {
2794 CatalogItem::Source(source) => match &source.data_source {
2795 DataSourceDesc::IngestionExport {
2796 ingestion_id,
2797 external_reference,
2798 details,
2799 data_config,
2800 } => Some((*ingestion_id, external_reference, details, data_config)),
2801 _ => None,
2802 },
2803 CatalogItem::Table(table) => match &table.data_source {
2804 TableDataSource::DataSource {
2805 desc:
2806 DataSourceDesc::IngestionExport {
2807 ingestion_id,
2808 external_reference,
2809 details,
2810 data_config,
2811 },
2812 timeline: _,
2813 } => Some((*ingestion_id, external_reference, details, data_config)),
2814 _ => None,
2815 },
2816 _ => None,
2817 }
2818 }
2819
2820 pub fn is_progress_source(&self) -> bool {
2822 self.item().is_progress_source()
2823 }
2824
2825 pub fn progress_id(&self) -> Option<CatalogItemId> {
2827 match &self.item() {
2828 CatalogItem::Source(source) => match &source.data_source {
2829 DataSourceDesc::Ingestion { .. } => Some(self.id),
2830 DataSourceDesc::OldSyntaxIngestion {
2831 progress_subsource, ..
2832 } => Some(*progress_subsource),
2833 DataSourceDesc::IngestionExport { .. }
2834 | DataSourceDesc::Introspection(_)
2835 | DataSourceDesc::Progress
2836 | DataSourceDesc::Webhook { .. }
2837 | DataSourceDesc::Catalog => None,
2838 },
2839 CatalogItem::Table(_)
2840 | CatalogItem::Log(_)
2841 | CatalogItem::View(_)
2842 | CatalogItem::MaterializedView(_)
2843 | CatalogItem::Sink(_)
2844 | CatalogItem::Index(_)
2845 | CatalogItem::Type(_)
2846 | CatalogItem::Func(_)
2847 | CatalogItem::Secret(_)
2848 | CatalogItem::Connection(_) => None,
2849 }
2850 }
2851
2852 pub fn is_sink(&self) -> bool {
2854 matches!(self.item(), CatalogItem::Sink(_))
2855 }
2856
2857 pub fn is_materialized_view(&self) -> bool {
2859 matches!(self.item(), CatalogItem::MaterializedView(_))
2860 }
2861
2862 pub fn is_view(&self) -> bool {
2864 matches!(self.item(), CatalogItem::View(_))
2865 }
2866
2867 pub fn is_secret(&self) -> bool {
2869 matches!(self.item(), CatalogItem::Secret(_))
2870 }
2871
2872 pub fn is_introspection_source(&self) -> bool {
2874 matches!(self.item(), CatalogItem::Log(_))
2875 }
2876
2877 pub fn is_index(&self) -> bool {
2879 matches!(self.item(), CatalogItem::Index(_))
2880 }
2881
2882 pub fn is_relation(&self) -> bool {
2884 mz_sql::catalog::ObjectType::from(self.item_type()).is_relation()
2885 }
2886
2887 pub fn references(&self) -> &ResolvedIds {
2890 self.item.references()
2891 }
2892
2893 pub fn uses(&self) -> BTreeSet<CatalogItemId> {
2899 self.item.uses()
2900 }
2901
2902 pub fn item(&self) -> &CatalogItem {
2904 &self.item
2905 }
2906
2907 pub fn item_mut(&mut self) -> &mut CatalogItem {
2910 &mut self.item
2911 }
2912
2913 pub fn id(&self) -> CatalogItemId {
2915 self.id
2916 }
2917
2918 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2920 self.item().global_ids()
2921 }
2922
2923 pub fn latest_global_id(&self) -> GlobalId {
2924 self.item().latest_global_id()
2925 }
2926
2927 pub fn oid(&self) -> u32 {
2929 self.oid
2930 }
2931
2932 pub fn name(&self) -> &QualifiedItemName {
2934 &self.name
2935 }
2936
2937 pub fn referenced_by(&self) -> &[CatalogItemId] {
2939 &self.referenced_by
2940 }
2941
2942 pub fn used_by(&self) -> &[CatalogItemId] {
2944 &self.used_by
2945 }
2946
2947 pub fn conn_id(&self) -> Option<&ConnectionId> {
2950 self.item.conn_id()
2951 }
2952
2953 pub fn owner_id(&self) -> &RoleId {
2955 &self.owner_id
2956 }
2957
2958 pub fn privileges(&self) -> &PrivilegeMap {
2960 &self.privileges
2961 }
2962
2963 pub fn comment_object_id(&self) -> CommentObjectId {
2965 use CatalogItemType::*;
2966 match self.item_type() {
2967 Table => CommentObjectId::Table(self.id),
2968 Source => CommentObjectId::Source(self.id),
2969 Sink => CommentObjectId::Sink(self.id),
2970 View => CommentObjectId::View(self.id),
2971 MaterializedView => CommentObjectId::MaterializedView(self.id),
2972 Index => CommentObjectId::Index(self.id),
2973 Func => CommentObjectId::Func(self.id),
2974 Connection => CommentObjectId::Connection(self.id),
2975 Type => CommentObjectId::Type(self.id),
2976 Secret => CommentObjectId::Secret(self.id),
2977 }
2978 }
2979}
2980
2981#[derive(Debug, Clone, Default)]
2982pub struct CommentsMap {
2983 map: BTreeMap<CommentObjectId, BTreeMap<Option<usize>, String>>,
2984}
2985
2986impl CommentsMap {
2987 pub fn update_comment(
2988 &mut self,
2989 object_id: CommentObjectId,
2990 sub_component: Option<usize>,
2991 comment: Option<String>,
2992 ) -> Option<String> {
2993 let object_comments = self.map.entry(object_id).or_default();
2994
2995 let (empty, prev) = if let Some(comment) = comment {
2997 let prev = object_comments.insert(sub_component, comment);
2998 (false, prev)
2999 } else {
3000 let prev = object_comments.remove(&sub_component);
3001 (object_comments.is_empty(), prev)
3002 };
3003
3004 if empty {
3006 self.map.remove(&object_id);
3007 }
3008
3009 prev
3011 }
3012
3013 pub fn drop_comments(
3019 &mut self,
3020 object_ids: &BTreeSet<CommentObjectId>,
3021 ) -> Vec<(CommentObjectId, Option<usize>, String)> {
3022 let mut removed_comments = Vec::new();
3023
3024 for object_id in object_ids {
3025 if let Some(comments) = self.map.remove(object_id) {
3026 let removed = comments
3027 .into_iter()
3028 .map(|(sub_comp, comment)| (object_id.clone(), sub_comp, comment));
3029 removed_comments.extend(removed);
3030 }
3031 }
3032
3033 removed_comments
3034 }
3035
3036 pub fn iter(&self) -> impl Iterator<Item = (CommentObjectId, Option<usize>, &str)> {
3037 self.map
3038 .iter()
3039 .map(|(id, comments)| {
3040 comments
3041 .iter()
3042 .map(|(pos, comment)| (*id, *pos, comment.as_str()))
3043 })
3044 .flatten()
3045 }
3046
3047 pub fn get_object_comments(
3048 &self,
3049 object_id: CommentObjectId,
3050 ) -> Option<&BTreeMap<Option<usize>, String>> {
3051 self.map.get(&object_id)
3052 }
3053}
3054
3055impl Serialize for CommentsMap {
3056 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
3057 where
3058 S: serde::Serializer,
3059 {
3060 let comment_count = self
3061 .map
3062 .iter()
3063 .map(|(_object_id, comments)| comments.len())
3064 .sum();
3065
3066 let mut seq = serializer.serialize_seq(Some(comment_count))?;
3067 for (object_id, sub) in &self.map {
3068 for (sub_component, comment) in sub {
3069 seq.serialize_element(&(
3070 format!("{object_id:?}"),
3071 format!("{sub_component:?}"),
3072 comment,
3073 ))?;
3074 }
3075 }
3076 seq.end()
3077 }
3078}
3079
3080#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
3081pub struct DefaultPrivileges {
3082 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
3083 privileges: BTreeMap<DefaultPrivilegeObject, RoleDefaultPrivileges>,
3084}
3085
3086#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
3089struct RoleDefaultPrivileges(
3090 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
3092 BTreeMap<RoleId, DefaultPrivilegeAclItem>,
3093);
3094
3095impl Deref for RoleDefaultPrivileges {
3096 type Target = BTreeMap<RoleId, DefaultPrivilegeAclItem>;
3097
3098 fn deref(&self) -> &Self::Target {
3099 &self.0
3100 }
3101}
3102
3103impl DerefMut for RoleDefaultPrivileges {
3104 fn deref_mut(&mut self) -> &mut Self::Target {
3105 &mut self.0
3106 }
3107}
3108
3109impl DefaultPrivileges {
3110 pub fn grant(&mut self, object: DefaultPrivilegeObject, privilege: DefaultPrivilegeAclItem) {
3112 if privilege.acl_mode.is_empty() {
3113 return;
3114 }
3115
3116 let privileges = self.privileges.entry(object).or_default();
3117 if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
3118 default_privilege.acl_mode |= privilege.acl_mode;
3119 } else {
3120 privileges.insert(privilege.grantee, privilege);
3121 }
3122 }
3123
3124 pub fn revoke(&mut self, object: &DefaultPrivilegeObject, privilege: &DefaultPrivilegeAclItem) {
3126 if let Some(privileges) = self.privileges.get_mut(object) {
3127 if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
3128 default_privilege.acl_mode =
3129 default_privilege.acl_mode.difference(privilege.acl_mode);
3130 if default_privilege.acl_mode.is_empty() {
3131 privileges.remove(&privilege.grantee);
3132 }
3133 }
3134 if privileges.is_empty() {
3135 self.privileges.remove(object);
3136 }
3137 }
3138 }
3139
3140 pub fn get_privileges_for_grantee(
3143 &self,
3144 object: &DefaultPrivilegeObject,
3145 grantee: &RoleId,
3146 ) -> Option<&AclMode> {
3147 self.privileges
3148 .get(object)
3149 .and_then(|privileges| privileges.get(grantee))
3150 .map(|privilege| &privilege.acl_mode)
3151 }
3152
3153 pub fn get_applicable_privileges(
3155 &self,
3156 role_id: RoleId,
3157 database_id: Option<DatabaseId>,
3158 schema_id: Option<SchemaId>,
3159 object_type: mz_sql::catalog::ObjectType,
3160 ) -> impl Iterator<Item = DefaultPrivilegeAclItem> + '_ {
3161 let privilege_object_type = if object_type.is_relation() {
3165 mz_sql::catalog::ObjectType::Table
3166 } else {
3167 object_type
3168 };
3169 let valid_acl_mode = rbac::all_object_privileges(SystemObjectType::Object(object_type));
3170
3171 [
3175 DefaultPrivilegeObject {
3176 role_id,
3177 database_id,
3178 schema_id,
3179 object_type: privilege_object_type,
3180 },
3181 DefaultPrivilegeObject {
3182 role_id,
3183 database_id,
3184 schema_id: None,
3185 object_type: privilege_object_type,
3186 },
3187 DefaultPrivilegeObject {
3188 role_id,
3189 database_id: None,
3190 schema_id: None,
3191 object_type: privilege_object_type,
3192 },
3193 DefaultPrivilegeObject {
3194 role_id: RoleId::Public,
3195 database_id,
3196 schema_id,
3197 object_type: privilege_object_type,
3198 },
3199 DefaultPrivilegeObject {
3200 role_id: RoleId::Public,
3201 database_id,
3202 schema_id: None,
3203 object_type: privilege_object_type,
3204 },
3205 DefaultPrivilegeObject {
3206 role_id: RoleId::Public,
3207 database_id: None,
3208 schema_id: None,
3209 object_type: privilege_object_type,
3210 },
3211 ]
3212 .into_iter()
3213 .filter_map(|object| self.privileges.get(&object))
3214 .flat_map(|acl_map| acl_map.values())
3215 .fold(
3217 BTreeMap::new(),
3218 |mut accum, DefaultPrivilegeAclItem { grantee, acl_mode }| {
3219 let accum_acl_mode = accum.entry(grantee).or_insert_with(AclMode::empty);
3220 *accum_acl_mode |= *acl_mode;
3221 accum
3222 },
3223 )
3224 .into_iter()
3225 .map(move |(grantee, acl_mode)| (grantee, acl_mode & valid_acl_mode))
3230 .filter(|(_, acl_mode)| !acl_mode.is_empty())
3232 .map(|(grantee, acl_mode)| DefaultPrivilegeAclItem {
3233 grantee: *grantee,
3234 acl_mode,
3235 })
3236 }
3237
3238 pub fn iter(
3239 &self,
3240 ) -> impl Iterator<
3241 Item = (
3242 &DefaultPrivilegeObject,
3243 impl Iterator<Item = &DefaultPrivilegeAclItem>,
3244 ),
3245 > {
3246 self.privileges
3247 .iter()
3248 .map(|(object, acl_map)| (object, acl_map.values()))
3249 }
3250}
3251
3252#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3253pub struct ClusterConfig {
3254 pub variant: ClusterVariant,
3255 pub workload_class: Option<String>,
3256}
3257
3258impl ClusterConfig {
3259 pub fn features(&self) -> Option<&OptimizerFeatureOverrides> {
3260 match &self.variant {
3261 ClusterVariant::Managed(managed) => Some(&managed.optimizer_feature_overrides),
3262 ClusterVariant::Unmanaged => None,
3263 }
3264 }
3265}
3266
3267impl From<ClusterConfig> for durable::ClusterConfig {
3268 fn from(config: ClusterConfig) -> Self {
3269 Self {
3270 variant: config.variant.into(),
3271 workload_class: config.workload_class,
3272 }
3273 }
3274}
3275
3276impl From<durable::ClusterConfig> for ClusterConfig {
3277 fn from(config: durable::ClusterConfig) -> Self {
3278 Self {
3279 variant: config.variant.into(),
3280 workload_class: config.workload_class,
3281 }
3282 }
3283}
3284
3285#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3286pub struct ClusterVariantManaged {
3287 pub size: String,
3288 pub availability_zones: Vec<String>,
3289 pub logging: ReplicaLogging,
3290 pub replication_factor: u32,
3291 pub optimizer_feature_overrides: OptimizerFeatureOverrides,
3292 pub schedule: ClusterSchedule,
3293}
3294
3295impl From<ClusterVariantManaged> for durable::ClusterVariantManaged {
3296 fn from(managed: ClusterVariantManaged) -> Self {
3297 Self {
3298 size: managed.size,
3299 availability_zones: managed.availability_zones,
3300 logging: managed.logging,
3301 replication_factor: managed.replication_factor,
3302 optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3303 schedule: managed.schedule,
3304 }
3305 }
3306}
3307
3308impl From<durable::ClusterVariantManaged> for ClusterVariantManaged {
3309 fn from(managed: durable::ClusterVariantManaged) -> Self {
3310 Self {
3311 size: managed.size,
3312 availability_zones: managed.availability_zones,
3313 logging: managed.logging,
3314 replication_factor: managed.replication_factor,
3315 optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3316 schedule: managed.schedule,
3317 }
3318 }
3319}
3320
3321#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3322pub enum ClusterVariant {
3323 Managed(ClusterVariantManaged),
3324 Unmanaged,
3325}
3326
3327impl From<ClusterVariant> for durable::ClusterVariant {
3328 fn from(variant: ClusterVariant) -> Self {
3329 match variant {
3330 ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3331 ClusterVariant::Unmanaged => Self::Unmanaged,
3332 }
3333 }
3334}
3335
3336impl From<durable::ClusterVariant> for ClusterVariant {
3337 fn from(variant: durable::ClusterVariant) -> Self {
3338 match variant {
3339 durable::ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3340 durable::ClusterVariant::Unmanaged => Self::Unmanaged,
3341 }
3342 }
3343}
3344
3345impl mz_sql::catalog::CatalogDatabase for Database {
3346 fn name(&self) -> &str {
3347 &self.name
3348 }
3349
3350 fn id(&self) -> DatabaseId {
3351 self.id
3352 }
3353
3354 fn has_schemas(&self) -> bool {
3355 !self.schemas_by_name.is_empty()
3356 }
3357
3358 fn schema_ids(&self) -> &BTreeMap<String, SchemaId> {
3359 &self.schemas_by_name
3360 }
3361
3362 #[allow(clippy::as_conversions)]
3364 fn schemas(&self) -> Vec<&dyn CatalogSchema> {
3365 self.schemas_by_id
3366 .values()
3367 .map(|schema| schema as &dyn CatalogSchema)
3368 .collect()
3369 }
3370
3371 fn owner_id(&self) -> RoleId {
3372 self.owner_id
3373 }
3374
3375 fn privileges(&self) -> &PrivilegeMap {
3376 &self.privileges
3377 }
3378}
3379
3380impl mz_sql::catalog::CatalogSchema for Schema {
3381 fn database(&self) -> &ResolvedDatabaseSpecifier {
3382 &self.name.database
3383 }
3384
3385 fn name(&self) -> &QualifiedSchemaName {
3386 &self.name
3387 }
3388
3389 fn id(&self) -> &SchemaSpecifier {
3390 &self.id
3391 }
3392
3393 fn has_items(&self) -> bool {
3394 !self.items.is_empty()
3395 }
3396
3397 fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_> {
3398 Box::new(
3399 self.items
3400 .values()
3401 .chain(self.functions.values())
3402 .chain(self.types.values())
3403 .copied(),
3404 )
3405 }
3406
3407 fn owner_id(&self) -> RoleId {
3408 self.owner_id
3409 }
3410
3411 fn privileges(&self) -> &PrivilegeMap {
3412 &self.privileges
3413 }
3414}
3415
3416impl mz_sql::catalog::CatalogRole for Role {
3417 fn name(&self) -> &str {
3418 &self.name
3419 }
3420
3421 fn id(&self) -> RoleId {
3422 self.id
3423 }
3424
3425 fn membership(&self) -> &BTreeMap<RoleId, RoleId> {
3426 &self.membership.map
3427 }
3428
3429 fn attributes(&self) -> &RoleAttributes {
3430 &self.attributes
3431 }
3432
3433 fn vars(&self) -> &BTreeMap<String, OwnedVarInput> {
3434 &self.vars.map
3435 }
3436}
3437
3438impl mz_sql::catalog::CatalogNetworkPolicy for NetworkPolicy {
3439 fn name(&self) -> &str {
3440 &self.name
3441 }
3442
3443 fn id(&self) -> NetworkPolicyId {
3444 self.id
3445 }
3446
3447 fn owner_id(&self) -> RoleId {
3448 self.owner_id
3449 }
3450
3451 fn privileges(&self) -> &PrivilegeMap {
3452 &self.privileges
3453 }
3454}
3455
3456impl mz_sql::catalog::CatalogCluster<'_> for Cluster {
3457 fn name(&self) -> &str {
3458 &self.name
3459 }
3460
3461 fn id(&self) -> ClusterId {
3462 self.id
3463 }
3464
3465 fn bound_objects(&self) -> &BTreeSet<CatalogItemId> {
3466 &self.bound_objects
3467 }
3468
3469 fn replica_ids(&self) -> &BTreeMap<String, ReplicaId> {
3470 &self.replica_id_by_name_
3471 }
3472
3473 #[allow(clippy::as_conversions)]
3475 fn replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>> {
3476 self.replicas()
3477 .map(|replica| replica as &dyn CatalogClusterReplica)
3478 .collect()
3479 }
3480
3481 fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica<'_> {
3482 self.replica(id).expect("catalog out of sync")
3483 }
3484
3485 fn owner_id(&self) -> RoleId {
3486 self.owner_id
3487 }
3488
3489 fn privileges(&self) -> &PrivilegeMap {
3490 &self.privileges
3491 }
3492
3493 fn is_managed(&self) -> bool {
3494 self.is_managed()
3495 }
3496
3497 fn managed_size(&self) -> Option<&str> {
3498 match &self.config.variant {
3499 ClusterVariant::Managed(ClusterVariantManaged { size, .. }) => Some(size),
3500 ClusterVariant::Unmanaged => None,
3501 }
3502 }
3503
3504 fn schedule(&self) -> Option<&ClusterSchedule> {
3505 match &self.config.variant {
3506 ClusterVariant::Managed(ClusterVariantManaged { schedule, .. }) => Some(schedule),
3507 ClusterVariant::Unmanaged => None,
3508 }
3509 }
3510
3511 fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
3512 self.try_to_plan()
3513 }
3514}
3515
3516impl mz_sql::catalog::CatalogClusterReplica<'_> for ClusterReplica {
3517 fn name(&self) -> &str {
3518 &self.name
3519 }
3520
3521 fn cluster_id(&self) -> ClusterId {
3522 self.cluster_id
3523 }
3524
3525 fn replica_id(&self) -> ReplicaId {
3526 self.replica_id
3527 }
3528
3529 fn owner_id(&self) -> RoleId {
3530 self.owner_id
3531 }
3532
3533 fn internal(&self) -> bool {
3534 self.config.location.internal()
3535 }
3536}
3537
3538impl mz_sql::catalog::CatalogItem for CatalogEntry {
3539 fn name(&self) -> &QualifiedItemName {
3540 self.name()
3541 }
3542
3543 fn id(&self) -> CatalogItemId {
3544 self.id()
3545 }
3546
3547 fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
3548 Box::new(self.global_ids())
3549 }
3550
3551 fn oid(&self) -> u32 {
3552 self.oid()
3553 }
3554
3555 fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
3556 self.func()
3557 }
3558
3559 fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
3560 self.source_desc()
3561 }
3562
3563 fn connection(
3564 &self,
3565 ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
3566 {
3567 Ok(self.connection()?.details.to_connection())
3568 }
3569
3570 fn create_sql(&self) -> &str {
3571 match self.item() {
3572 CatalogItem::Table(Table { create_sql, .. }) => {
3573 create_sql.as_deref().unwrap_or("<builtin>")
3574 }
3575 CatalogItem::Source(Source { create_sql, .. }) => {
3576 create_sql.as_deref().unwrap_or("<builtin>")
3577 }
3578 CatalogItem::Sink(Sink { create_sql, .. }) => create_sql,
3579 CatalogItem::View(View { create_sql, .. }) => create_sql,
3580 CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) => create_sql,
3581 CatalogItem::Index(Index { create_sql, .. }) => create_sql,
3582 CatalogItem::Type(Type { create_sql, .. }) => {
3583 create_sql.as_deref().unwrap_or("<builtin>")
3584 }
3585 CatalogItem::Secret(Secret { create_sql, .. }) => create_sql,
3586 CatalogItem::Connection(Connection { create_sql, .. }) => create_sql,
3587 CatalogItem::Func(_) => "<builtin>",
3588 CatalogItem::Log(_) => "<builtin>",
3589 }
3590 }
3591
3592 fn item_type(&self) -> SqlCatalogItemType {
3593 self.item().typ()
3594 }
3595
3596 fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
3597 if let CatalogItem::Index(Index { keys, on, .. }) = self.item() {
3598 Some((keys, *on))
3599 } else {
3600 None
3601 }
3602 }
3603
3604 fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
3605 if let CatalogItem::Table(Table {
3606 data_source: TableDataSource::TableWrites { defaults },
3607 ..
3608 }) = self.item()
3609 {
3610 Some(defaults.as_slice())
3611 } else {
3612 None
3613 }
3614 }
3615
3616 fn replacement_target(&self) -> Option<CatalogItemId> {
3617 if let CatalogItem::MaterializedView(mv) = self.item() {
3618 mv.replacement_target
3619 } else {
3620 None
3621 }
3622 }
3623
3624 fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
3625 if let CatalogItem::Type(Type { details, .. }) = self.item() {
3626 Some(details)
3627 } else {
3628 None
3629 }
3630 }
3631
3632 fn references(&self) -> &ResolvedIds {
3633 self.references()
3634 }
3635
3636 fn uses(&self) -> BTreeSet<CatalogItemId> {
3637 self.uses()
3638 }
3639
3640 fn referenced_by(&self) -> &[CatalogItemId] {
3641 self.referenced_by()
3642 }
3643
3644 fn used_by(&self) -> &[CatalogItemId] {
3645 self.used_by()
3646 }
3647
3648 fn subsource_details(
3649 &self,
3650 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
3651 self.subsource_details()
3652 }
3653
3654 fn source_export_details(
3655 &self,
3656 ) -> Option<(
3657 CatalogItemId,
3658 &UnresolvedItemName,
3659 &SourceExportDetails,
3660 &SourceExportDataConfig<ReferencedConnection>,
3661 )> {
3662 self.source_export_details()
3663 }
3664
3665 fn is_progress_source(&self) -> bool {
3666 self.is_progress_source()
3667 }
3668
3669 fn progress_id(&self) -> Option<CatalogItemId> {
3670 self.progress_id()
3671 }
3672
3673 fn owner_id(&self) -> RoleId {
3674 self.owner_id
3675 }
3676
3677 fn privileges(&self) -> &PrivilegeMap {
3678 &self.privileges
3679 }
3680
3681 fn cluster_id(&self) -> Option<ClusterId> {
3682 self.item().cluster_id()
3683 }
3684
3685 fn at_version(
3686 &self,
3687 version: RelationVersionSelector,
3688 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
3689 Box::new(CatalogCollectionEntry {
3690 entry: self.clone(),
3691 version,
3692 })
3693 }
3694
3695 fn latest_version(&self) -> Option<RelationVersion> {
3696 self.table().map(|t| t.desc.latest_version())
3697 }
3698}
3699
3700#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3702pub struct StateUpdate {
3703 pub kind: StateUpdateKind,
3704 pub ts: Timestamp,
3705 pub diff: StateDiff,
3706}
3707
3708#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3712pub enum StateUpdateKind {
3713 Role(durable::objects::Role),
3714 RoleAuth(durable::objects::RoleAuth),
3715 Database(durable::objects::Database),
3716 Schema(durable::objects::Schema),
3717 DefaultPrivilege(durable::objects::DefaultPrivilege),
3718 SystemPrivilege(MzAclItem),
3719 SystemConfiguration(durable::objects::SystemConfiguration),
3720 Cluster(durable::objects::Cluster),
3721 NetworkPolicy(durable::objects::NetworkPolicy),
3722 IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3723 ClusterReplica(durable::objects::ClusterReplica),
3724 SourceReferences(durable::objects::SourceReferences),
3725 SystemObjectMapping(durable::objects::SystemObjectMapping),
3726 TemporaryItem(TemporaryItem),
3730 Item(durable::objects::Item),
3731 Comment(durable::objects::Comment),
3732 AuditLog(durable::objects::AuditLog),
3733 StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3735 UnfinalizedShard(durable::objects::UnfinalizedShard),
3736}
3737
3738#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
3740pub enum StateDiff {
3741 Retraction,
3742 Addition,
3743}
3744
3745impl From<StateDiff> for Diff {
3746 fn from(diff: StateDiff) -> Self {
3747 match diff {
3748 StateDiff::Retraction => Diff::MINUS_ONE,
3749 StateDiff::Addition => Diff::ONE,
3750 }
3751 }
3752}
3753impl TryFrom<Diff> for StateDiff {
3754 type Error = String;
3755
3756 fn try_from(diff: Diff) -> Result<Self, Self::Error> {
3757 match diff {
3758 Diff::MINUS_ONE => Ok(Self::Retraction),
3759 Diff::ONE => Ok(Self::Addition),
3760 diff => Err(format!("invalid diff {diff}")),
3761 }
3762 }
3763}
3764
3765#[derive(Debug, Clone, Ord, PartialOrd, PartialEq, Eq)]
3767pub struct TemporaryItem {
3768 pub id: CatalogItemId,
3769 pub oid: u32,
3770 pub global_id: GlobalId,
3771 pub schema_id: SchemaId,
3772 pub name: String,
3773 pub conn_id: Option<ConnectionId>,
3774 pub create_sql: String,
3775 pub owner_id: RoleId,
3776 pub privileges: Vec<MzAclItem>,
3777 pub extra_versions: BTreeMap<RelationVersion, GlobalId>,
3778}
3779
3780impl From<CatalogEntry> for TemporaryItem {
3781 fn from(entry: CatalogEntry) -> Self {
3782 let conn_id = entry.conn_id().cloned();
3783 let (create_sql, global_id, extra_versions) = entry.item.to_serialized();
3784
3785 TemporaryItem {
3786 id: entry.id,
3787 oid: entry.oid,
3788 global_id,
3789 schema_id: entry.name.qualifiers.schema_spec.into(),
3790 name: entry.name.item,
3791 conn_id,
3792 create_sql,
3793 owner_id: entry.owner_id,
3794 privileges: entry.privileges.into_all_values().collect(),
3795 extra_versions,
3796 }
3797 }
3798}
3799
3800impl TemporaryItem {
3801 pub fn item_type(&self) -> CatalogItemType {
3802 item_type(&self.create_sql)
3803 }
3804}
3805
3806#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
3808pub enum BootstrapStateUpdateKind {
3809 Role(durable::objects::Role),
3810 RoleAuth(durable::objects::RoleAuth),
3811 Database(durable::objects::Database),
3812 Schema(durable::objects::Schema),
3813 DefaultPrivilege(durable::objects::DefaultPrivilege),
3814 SystemPrivilege(MzAclItem),
3815 SystemConfiguration(durable::objects::SystemConfiguration),
3816 Cluster(durable::objects::Cluster),
3817 NetworkPolicy(durable::objects::NetworkPolicy),
3818 IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3819 ClusterReplica(durable::objects::ClusterReplica),
3820 SourceReferences(durable::objects::SourceReferences),
3821 SystemObjectMapping(durable::objects::SystemObjectMapping),
3822 Item(durable::objects::Item),
3823 Comment(durable::objects::Comment),
3824 AuditLog(durable::objects::AuditLog),
3825 StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3827 UnfinalizedShard(durable::objects::UnfinalizedShard),
3828}
3829
3830impl From<BootstrapStateUpdateKind> for StateUpdateKind {
3831 fn from(value: BootstrapStateUpdateKind) -> Self {
3832 match value {
3833 BootstrapStateUpdateKind::Role(kind) => StateUpdateKind::Role(kind),
3834 BootstrapStateUpdateKind::RoleAuth(kind) => StateUpdateKind::RoleAuth(kind),
3835 BootstrapStateUpdateKind::Database(kind) => StateUpdateKind::Database(kind),
3836 BootstrapStateUpdateKind::Schema(kind) => StateUpdateKind::Schema(kind),
3837 BootstrapStateUpdateKind::DefaultPrivilege(kind) => {
3838 StateUpdateKind::DefaultPrivilege(kind)
3839 }
3840 BootstrapStateUpdateKind::SystemPrivilege(kind) => {
3841 StateUpdateKind::SystemPrivilege(kind)
3842 }
3843 BootstrapStateUpdateKind::SystemConfiguration(kind) => {
3844 StateUpdateKind::SystemConfiguration(kind)
3845 }
3846 BootstrapStateUpdateKind::SourceReferences(kind) => {
3847 StateUpdateKind::SourceReferences(kind)
3848 }
3849 BootstrapStateUpdateKind::Cluster(kind) => StateUpdateKind::Cluster(kind),
3850 BootstrapStateUpdateKind::NetworkPolicy(kind) => StateUpdateKind::NetworkPolicy(kind),
3851 BootstrapStateUpdateKind::IntrospectionSourceIndex(kind) => {
3852 StateUpdateKind::IntrospectionSourceIndex(kind)
3853 }
3854 BootstrapStateUpdateKind::ClusterReplica(kind) => StateUpdateKind::ClusterReplica(kind),
3855 BootstrapStateUpdateKind::SystemObjectMapping(kind) => {
3856 StateUpdateKind::SystemObjectMapping(kind)
3857 }
3858 BootstrapStateUpdateKind::Item(kind) => StateUpdateKind::Item(kind),
3859 BootstrapStateUpdateKind::Comment(kind) => StateUpdateKind::Comment(kind),
3860 BootstrapStateUpdateKind::AuditLog(kind) => StateUpdateKind::AuditLog(kind),
3861 BootstrapStateUpdateKind::StorageCollectionMetadata(kind) => {
3862 StateUpdateKind::StorageCollectionMetadata(kind)
3863 }
3864 BootstrapStateUpdateKind::UnfinalizedShard(kind) => {
3865 StateUpdateKind::UnfinalizedShard(kind)
3866 }
3867 }
3868 }
3869}
3870
3871impl TryFrom<StateUpdateKind> for BootstrapStateUpdateKind {
3872 type Error = TemporaryItem;
3873
3874 fn try_from(value: StateUpdateKind) -> Result<Self, Self::Error> {
3875 match value {
3876 StateUpdateKind::Role(kind) => Ok(BootstrapStateUpdateKind::Role(kind)),
3877 StateUpdateKind::RoleAuth(kind) => Ok(BootstrapStateUpdateKind::RoleAuth(kind)),
3878 StateUpdateKind::Database(kind) => Ok(BootstrapStateUpdateKind::Database(kind)),
3879 StateUpdateKind::Schema(kind) => Ok(BootstrapStateUpdateKind::Schema(kind)),
3880 StateUpdateKind::DefaultPrivilege(kind) => {
3881 Ok(BootstrapStateUpdateKind::DefaultPrivilege(kind))
3882 }
3883 StateUpdateKind::SystemPrivilege(kind) => {
3884 Ok(BootstrapStateUpdateKind::SystemPrivilege(kind))
3885 }
3886 StateUpdateKind::SystemConfiguration(kind) => {
3887 Ok(BootstrapStateUpdateKind::SystemConfiguration(kind))
3888 }
3889 StateUpdateKind::Cluster(kind) => Ok(BootstrapStateUpdateKind::Cluster(kind)),
3890 StateUpdateKind::NetworkPolicy(kind) => {
3891 Ok(BootstrapStateUpdateKind::NetworkPolicy(kind))
3892 }
3893 StateUpdateKind::IntrospectionSourceIndex(kind) => {
3894 Ok(BootstrapStateUpdateKind::IntrospectionSourceIndex(kind))
3895 }
3896 StateUpdateKind::ClusterReplica(kind) => {
3897 Ok(BootstrapStateUpdateKind::ClusterReplica(kind))
3898 }
3899 StateUpdateKind::SourceReferences(kind) => {
3900 Ok(BootstrapStateUpdateKind::SourceReferences(kind))
3901 }
3902 StateUpdateKind::SystemObjectMapping(kind) => {
3903 Ok(BootstrapStateUpdateKind::SystemObjectMapping(kind))
3904 }
3905 StateUpdateKind::TemporaryItem(kind) => Err(kind),
3906 StateUpdateKind::Item(kind) => Ok(BootstrapStateUpdateKind::Item(kind)),
3907 StateUpdateKind::Comment(kind) => Ok(BootstrapStateUpdateKind::Comment(kind)),
3908 StateUpdateKind::AuditLog(kind) => Ok(BootstrapStateUpdateKind::AuditLog(kind)),
3909 StateUpdateKind::StorageCollectionMetadata(kind) => {
3910 Ok(BootstrapStateUpdateKind::StorageCollectionMetadata(kind))
3911 }
3912 StateUpdateKind::UnfinalizedShard(kind) => {
3913 Ok(BootstrapStateUpdateKind::UnfinalizedShard(kind))
3914 }
3915 }
3916 }
3917}