1use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use chrono::{DateTime, Utc};
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_compute_client::logging::LogVariant;
24use mz_compute_types::dataflows::DataflowDescription;
25use mz_compute_types::plan::Plan as ComputePlan;
26use mz_controller::clusters::{ClusterRole, ClusterStatus, ReplicaConfig, ReplicaLogging};
27use mz_controller_types::{ClusterId, ReplicaId};
28use mz_expr::{MirScalarExpr, OptimizedMirRelationExpr};
29use mz_ore::collections::CollectionExt;
30use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
31use mz_repr::network_policy_id::NetworkPolicyId;
32use mz_repr::optimize::OptimizerFeatureOverrides;
33use mz_repr::refresh_schedule::RefreshSchedule;
34use mz_repr::role_id::RoleId;
35use mz_repr::{
36 CatalogItemId, ColumnName, Diff, GlobalId, RelationDesc, RelationVersion,
37 RelationVersionSelector, SqlColumnType, Timestamp, VersionedRelationDesc,
38};
39use mz_sql::ast::display::AstDisplay;
40use mz_sql::ast::{
41 ColumnDef, ColumnOption, ColumnOptionDef, ColumnVersioned, Expr, Raw, RawDataType, Statement,
42 UnresolvedItemName, Value, WithOptionValue,
43};
44use mz_sql::catalog::{
45 CatalogClusterReplica, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem,
46 CatalogItemType as SqlCatalogItemType, CatalogItemType, CatalogSchema, CatalogType,
47 CatalogTypeDetails, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference,
48 RoleAttributes, RoleMembership, RoleVars, SystemObjectType,
49};
50use mz_sql::names::{
51 Aug, CommentObjectId, DatabaseId, DependencyIds, FullItemName, QualifiedItemName,
52 QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier,
53};
54use mz_sql::plan::{
55 ClusterSchedule, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, ConnectionDetails,
56 CreateClusterManagedPlan, CreateClusterPlan, CreateClusterVariant, CreateSourcePlan,
57 HirRelationExpr, NetworkPolicyRule, PlanError, WebhookBodyFormat, WebhookHeaders,
58 WebhookValidation,
59};
60use mz_sql::rbac;
61use mz_sql::session::vars::OwnedVarInput;
62use mz_storage_client::controller::IntrospectionType;
63use mz_storage_types::connections::inline::ReferencedConnection;
64use mz_storage_types::sinks::{SinkEnvelope, StorageSinkConnection};
65use mz_storage_types::sources::load_generator::LoadGenerator;
66use mz_storage_types::sources::{
67 GenericSourceConnection, SourceConnection, SourceDesc, SourceEnvelope, SourceExportDataConfig,
68 SourceExportDetails, Timeline,
69};
70use mz_transform::dataflow::DataflowMetainfo;
71use mz_transform::notice::OptimizerNotice;
72use serde::ser::SerializeSeq;
73use serde::{Deserialize, Serialize};
74use timely::progress::Antichain;
75use tracing::debug;
76
77use crate::builtin::{MZ_CATALOG_SERVER_CLUSTER, MZ_SYSTEM_CLUSTER};
78use crate::durable;
79use crate::durable::objects::item_type;
80
81pub trait UpdateFrom<T>: From<T> {
83 fn update_from(&mut self, from: T);
84}
85
86#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
87pub struct Database {
88 pub name: String,
89 pub id: DatabaseId,
90 pub oid: u32,
91 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
92 pub schemas_by_id: BTreeMap<SchemaId, Schema>,
93 pub schemas_by_name: BTreeMap<String, SchemaId>,
94 pub owner_id: RoleId,
95 pub privileges: PrivilegeMap,
96}
97
98impl From<Database> for durable::Database {
99 fn from(database: Database) -> durable::Database {
100 durable::Database {
101 id: database.id,
102 oid: database.oid,
103 name: database.name,
104 owner_id: database.owner_id,
105 privileges: database.privileges.into_all_values().collect(),
106 }
107 }
108}
109
110impl From<durable::Database> for Database {
111 fn from(
112 durable::Database {
113 id,
114 oid,
115 name,
116 owner_id,
117 privileges,
118 }: durable::Database,
119 ) -> Database {
120 Database {
121 id,
122 oid,
123 schemas_by_id: BTreeMap::new(),
124 schemas_by_name: BTreeMap::new(),
125 name,
126 owner_id,
127 privileges: PrivilegeMap::from_mz_acl_items(privileges),
128 }
129 }
130}
131
132impl UpdateFrom<durable::Database> for Database {
133 fn update_from(
134 &mut self,
135 durable::Database {
136 id,
137 oid,
138 name,
139 owner_id,
140 privileges,
141 }: durable::Database,
142 ) {
143 self.id = id;
144 self.oid = oid;
145 self.name = name;
146 self.owner_id = owner_id;
147 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
148 }
149}
150
151#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
152pub struct Schema {
153 pub name: QualifiedSchemaName,
154 pub id: SchemaSpecifier,
155 pub oid: u32,
156 pub items: BTreeMap<String, CatalogItemId>,
157 pub functions: BTreeMap<String, CatalogItemId>,
158 pub types: BTreeMap<String, CatalogItemId>,
159 pub owner_id: RoleId,
160 pub privileges: PrivilegeMap,
161}
162
163impl From<Schema> for durable::Schema {
164 fn from(schema: Schema) -> durable::Schema {
165 durable::Schema {
166 id: schema.id.into(),
167 oid: schema.oid,
168 name: schema.name.schema,
169 database_id: schema.name.database.id(),
170 owner_id: schema.owner_id,
171 privileges: schema.privileges.into_all_values().collect(),
172 }
173 }
174}
175
176impl From<durable::Schema> for Schema {
177 fn from(
178 durable::Schema {
179 id,
180 oid,
181 name,
182 database_id,
183 owner_id,
184 privileges,
185 }: durable::Schema,
186 ) -> Schema {
187 Schema {
188 name: QualifiedSchemaName {
189 database: database_id.into(),
190 schema: name,
191 },
192 id: id.into(),
193 oid,
194 items: BTreeMap::new(),
195 functions: BTreeMap::new(),
196 types: BTreeMap::new(),
197 owner_id,
198 privileges: PrivilegeMap::from_mz_acl_items(privileges),
199 }
200 }
201}
202
203impl UpdateFrom<durable::Schema> for Schema {
204 fn update_from(
205 &mut self,
206 durable::Schema {
207 id,
208 oid,
209 name,
210 database_id,
211 owner_id,
212 privileges,
213 }: durable::Schema,
214 ) {
215 self.name = QualifiedSchemaName {
216 database: database_id.into(),
217 schema: name,
218 };
219 self.id = id.into();
220 self.oid = oid;
221 self.owner_id = owner_id;
222 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
223 }
224}
225
226#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
227pub struct Role {
228 pub name: String,
229 pub id: RoleId,
230 pub oid: u32,
231 pub attributes: RoleAttributes,
232 pub membership: RoleMembership,
233 pub vars: RoleVars,
234}
235
236impl Role {
237 pub fn is_user(&self) -> bool {
238 self.id.is_user()
239 }
240
241 pub fn vars<'a>(&'a self) -> impl Iterator<Item = (&'a str, &'a OwnedVarInput)> {
242 self.vars.map.iter().map(|(name, val)| (name.as_str(), val))
243 }
244}
245
246impl From<Role> for durable::Role {
247 fn from(role: Role) -> durable::Role {
248 durable::Role {
249 id: role.id,
250 oid: role.oid,
251 name: role.name,
252 attributes: role.attributes,
253 membership: role.membership,
254 vars: role.vars,
255 }
256 }
257}
258
259impl From<durable::Role> for Role {
260 fn from(
261 durable::Role {
262 id,
263 oid,
264 name,
265 attributes,
266 membership,
267 vars,
268 }: durable::Role,
269 ) -> Self {
270 Role {
271 name,
272 id,
273 oid,
274 attributes,
275 membership,
276 vars,
277 }
278 }
279}
280
281impl UpdateFrom<durable::Role> for Role {
282 fn update_from(
283 &mut self,
284 durable::Role {
285 id,
286 oid,
287 name,
288 attributes,
289 membership,
290 vars,
291 }: durable::Role,
292 ) {
293 self.id = id;
294 self.oid = oid;
295 self.name = name;
296 self.attributes = attributes;
297 self.membership = membership;
298 self.vars = vars;
299 }
300}
301
302#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
303pub struct RoleAuth {
304 pub role_id: RoleId,
305 pub password_hash: Option<String>,
306 pub updated_at: u64,
307}
308
309impl From<RoleAuth> for durable::RoleAuth {
310 fn from(role_auth: RoleAuth) -> durable::RoleAuth {
311 durable::RoleAuth {
312 role_id: role_auth.role_id,
313 password_hash: role_auth.password_hash,
314 updated_at: role_auth.updated_at,
315 }
316 }
317}
318
319impl From<durable::RoleAuth> for RoleAuth {
320 fn from(
321 durable::RoleAuth {
322 role_id,
323 password_hash,
324 updated_at,
325 }: durable::RoleAuth,
326 ) -> RoleAuth {
327 RoleAuth {
328 role_id,
329 password_hash,
330 updated_at,
331 }
332 }
333}
334
335impl UpdateFrom<durable::RoleAuth> for RoleAuth {
336 fn update_from(&mut self, from: durable::RoleAuth) {
337 self.role_id = from.role_id;
338 self.password_hash = from.password_hash;
339 }
340}
341
342#[derive(Debug, Serialize, Clone, PartialEq)]
343pub struct Cluster {
344 pub name: String,
345 pub id: ClusterId,
346 pub config: ClusterConfig,
347 #[serde(skip)]
348 pub log_indexes: BTreeMap<LogVariant, GlobalId>,
349 pub bound_objects: BTreeSet<CatalogItemId>,
352 pub replica_id_by_name_: BTreeMap<String, ReplicaId>,
353 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
354 pub replicas_by_id_: BTreeMap<ReplicaId, ClusterReplica>,
355 pub owner_id: RoleId,
356 pub privileges: PrivilegeMap,
357}
358
359impl Cluster {
360 pub fn role(&self) -> ClusterRole {
362 if self.name == MZ_SYSTEM_CLUSTER.name {
365 ClusterRole::SystemCritical
366 } else if self.name == MZ_CATALOG_SERVER_CLUSTER.name {
367 ClusterRole::System
368 } else {
369 ClusterRole::User
370 }
371 }
372
373 pub fn is_managed(&self) -> bool {
375 matches!(self.config.variant, ClusterVariant::Managed { .. })
376 }
377
378 pub fn user_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
380 self.replicas().filter(|r| !r.config.location.internal())
381 }
382
383 pub fn replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
385 self.replicas_by_id_.values()
386 }
387
388 pub fn replica(&self, replica_id: ReplicaId) -> Option<&ClusterReplica> {
390 self.replicas_by_id_.get(&replica_id)
391 }
392
393 pub fn replica_id(&self, name: &str) -> Option<ReplicaId> {
395 self.replica_id_by_name_.get(name).copied()
396 }
397
398 pub fn availability_zones(&self) -> Option<&[String]> {
400 match &self.config.variant {
401 ClusterVariant::Managed(managed) => Some(&managed.availability_zones),
402 ClusterVariant::Unmanaged => None,
403 }
404 }
405
406 pub fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
407 let name = self.name.clone();
408 let variant = match &self.config.variant {
409 ClusterVariant::Managed(ClusterVariantManaged {
410 size,
411 availability_zones,
412 logging,
413 replication_factor,
414 optimizer_feature_overrides,
415 schedule,
416 }) => {
417 let introspection = match logging {
418 ReplicaLogging {
419 log_logging,
420 interval: Some(interval),
421 } => Some(ComputeReplicaIntrospectionConfig {
422 debugging: *log_logging,
423 interval: interval.clone(),
424 }),
425 ReplicaLogging {
426 log_logging: _,
427 interval: None,
428 } => None,
429 };
430 let compute = ComputeReplicaConfig { introspection };
431 CreateClusterVariant::Managed(CreateClusterManagedPlan {
432 replication_factor: replication_factor.clone(),
433 size: size.clone(),
434 availability_zones: availability_zones.clone(),
435 compute,
436 optimizer_feature_overrides: optimizer_feature_overrides.clone(),
437 schedule: schedule.clone(),
438 })
439 }
440 ClusterVariant::Unmanaged => {
441 return Err(PlanError::Unsupported {
444 feature: "SHOW CREATE for unmanaged clusters".to_string(),
445 discussion_no: None,
446 });
447 }
448 };
449 let workload_class = self.config.workload_class.clone();
450 Ok(CreateClusterPlan {
451 name,
452 variant,
453 workload_class,
454 })
455 }
456}
457
458impl From<Cluster> for durable::Cluster {
459 fn from(cluster: Cluster) -> durable::Cluster {
460 durable::Cluster {
461 id: cluster.id,
462 name: cluster.name,
463 owner_id: cluster.owner_id,
464 privileges: cluster.privileges.into_all_values().collect(),
465 config: cluster.config.into(),
466 }
467 }
468}
469
470impl From<durable::Cluster> for Cluster {
471 fn from(
472 durable::Cluster {
473 id,
474 name,
475 owner_id,
476 privileges,
477 config,
478 }: durable::Cluster,
479 ) -> Self {
480 Cluster {
481 name: name.clone(),
482 id,
483 bound_objects: BTreeSet::new(),
484 log_indexes: BTreeMap::new(),
485 replica_id_by_name_: BTreeMap::new(),
486 replicas_by_id_: BTreeMap::new(),
487 owner_id,
488 privileges: PrivilegeMap::from_mz_acl_items(privileges),
489 config: config.into(),
490 }
491 }
492}
493
494impl UpdateFrom<durable::Cluster> for Cluster {
495 fn update_from(
496 &mut self,
497 durable::Cluster {
498 id,
499 name,
500 owner_id,
501 privileges,
502 config,
503 }: durable::Cluster,
504 ) {
505 self.id = id;
506 self.name = name;
507 self.owner_id = owner_id;
508 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
509 self.config = config.into();
510 }
511}
512
513#[derive(Debug, Serialize, Clone, PartialEq)]
514pub struct ClusterReplica {
515 pub name: String,
516 pub cluster_id: ClusterId,
517 pub replica_id: ReplicaId,
518 pub config: ReplicaConfig,
519 pub owner_id: RoleId,
520}
521
522impl From<ClusterReplica> for durable::ClusterReplica {
523 fn from(replica: ClusterReplica) -> durable::ClusterReplica {
524 durable::ClusterReplica {
525 cluster_id: replica.cluster_id,
526 replica_id: replica.replica_id,
527 name: replica.name,
528 config: replica.config.into(),
529 owner_id: replica.owner_id,
530 }
531 }
532}
533
534#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
535pub struct ClusterReplicaProcessStatus {
536 pub status: ClusterStatus,
537 pub time: DateTime<Utc>,
538}
539
540#[derive(Debug, Serialize, Clone, PartialEq)]
541pub struct SourceReferences {
542 pub updated_at: u64,
543 pub references: Vec<SourceReference>,
544}
545
546#[derive(Debug, Serialize, Clone, PartialEq)]
547pub struct SourceReference {
548 pub name: String,
549 pub namespace: Option<String>,
550 pub columns: Vec<String>,
551}
552
553impl From<SourceReference> for durable::SourceReference {
554 fn from(source_reference: SourceReference) -> durable::SourceReference {
555 durable::SourceReference {
556 name: source_reference.name,
557 namespace: source_reference.namespace,
558 columns: source_reference.columns,
559 }
560 }
561}
562
563impl SourceReferences {
564 pub fn to_durable(self, source_id: CatalogItemId) -> durable::SourceReferences {
565 durable::SourceReferences {
566 source_id,
567 updated_at: self.updated_at,
568 references: self.references.into_iter().map(Into::into).collect(),
569 }
570 }
571}
572
573impl From<durable::SourceReference> for SourceReference {
574 fn from(source_reference: durable::SourceReference) -> SourceReference {
575 SourceReference {
576 name: source_reference.name,
577 namespace: source_reference.namespace,
578 columns: source_reference.columns,
579 }
580 }
581}
582
583impl From<durable::SourceReferences> for SourceReferences {
584 fn from(source_references: durable::SourceReferences) -> SourceReferences {
585 SourceReferences {
586 updated_at: source_references.updated_at,
587 references: source_references
588 .references
589 .into_iter()
590 .map(|source_reference| source_reference.into())
591 .collect(),
592 }
593 }
594}
595
596impl From<mz_sql::plan::SourceReference> for SourceReference {
597 fn from(source_reference: mz_sql::plan::SourceReference) -> SourceReference {
598 SourceReference {
599 name: source_reference.name,
600 namespace: source_reference.namespace,
601 columns: source_reference.columns,
602 }
603 }
604}
605
606impl From<mz_sql::plan::SourceReferences> for SourceReferences {
607 fn from(source_references: mz_sql::plan::SourceReferences) -> SourceReferences {
608 SourceReferences {
609 updated_at: source_references.updated_at,
610 references: source_references
611 .references
612 .into_iter()
613 .map(|source_reference| source_reference.into())
614 .collect(),
615 }
616 }
617}
618
619impl From<SourceReferences> for mz_sql::plan::SourceReferences {
620 fn from(source_references: SourceReferences) -> mz_sql::plan::SourceReferences {
621 mz_sql::plan::SourceReferences {
622 updated_at: source_references.updated_at,
623 references: source_references
624 .references
625 .into_iter()
626 .map(|source_reference| source_reference.into())
627 .collect(),
628 }
629 }
630}
631
632impl From<SourceReference> for mz_sql::plan::SourceReference {
633 fn from(source_reference: SourceReference) -> mz_sql::plan::SourceReference {
634 mz_sql::plan::SourceReference {
635 name: source_reference.name,
636 namespace: source_reference.namespace,
637 columns: source_reference.columns,
638 }
639 }
640}
641
642#[derive(Clone, Debug, Serialize)]
643pub struct CatalogEntry {
644 pub item: CatalogItem,
645 #[serde(skip)]
646 pub referenced_by: Vec<CatalogItemId>,
647 #[serde(skip)]
651 pub used_by: Vec<CatalogItemId>,
652 pub id: CatalogItemId,
653 pub oid: u32,
654 pub name: QualifiedItemName,
655 pub owner_id: RoleId,
656 pub privileges: PrivilegeMap,
657}
658
659#[derive(Clone, Debug)]
674pub struct CatalogCollectionEntry {
675 pub entry: CatalogEntry,
676 pub version: RelationVersionSelector,
677}
678
679impl CatalogCollectionEntry {
680 pub fn relation_desc(&self) -> Option<Cow<'_, RelationDesc>> {
681 self.item().relation_desc(self.version)
682 }
683}
684
685impl mz_sql::catalog::CatalogCollectionItem for CatalogCollectionEntry {
686 fn relation_desc(&self) -> Option<Cow<'_, RelationDesc>> {
687 self.item().relation_desc(self.version)
688 }
689
690 fn global_id(&self) -> GlobalId {
691 self.entry
692 .item()
693 .global_id_for_version(self.version)
694 .expect("catalog corruption, missing version!")
695 }
696}
697
698impl Deref for CatalogCollectionEntry {
699 type Target = CatalogEntry;
700
701 fn deref(&self) -> &CatalogEntry {
702 &self.entry
703 }
704}
705
706impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry {
707 fn name(&self) -> &QualifiedItemName {
708 self.entry.name()
709 }
710
711 fn id(&self) -> CatalogItemId {
712 self.entry.id()
713 }
714
715 fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
716 Box::new(self.entry.global_ids())
717 }
718
719 fn oid(&self) -> u32 {
720 self.entry.oid()
721 }
722
723 fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
724 self.entry.func()
725 }
726
727 fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
728 self.entry.source_desc()
729 }
730
731 fn connection(
732 &self,
733 ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
734 {
735 mz_sql::catalog::CatalogItem::connection(&self.entry)
736 }
737
738 fn create_sql(&self) -> &str {
739 self.entry.create_sql()
740 }
741
742 fn item_type(&self) -> SqlCatalogItemType {
743 self.entry.item_type()
744 }
745
746 fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
747 self.entry.index_details()
748 }
749
750 fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
751 self.entry.writable_table_details()
752 }
753
754 fn replacement_target(&self) -> Option<CatalogItemId> {
755 self.entry.replacement_target()
756 }
757
758 fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
759 self.entry.type_details()
760 }
761
762 fn references(&self) -> &ResolvedIds {
763 self.entry.references()
764 }
765
766 fn uses(&self) -> BTreeSet<CatalogItemId> {
767 self.entry.uses()
768 }
769
770 fn referenced_by(&self) -> &[CatalogItemId] {
771 self.entry.referenced_by()
772 }
773
774 fn used_by(&self) -> &[CatalogItemId] {
775 self.entry.used_by()
776 }
777
778 fn subsource_details(
779 &self,
780 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
781 self.entry.subsource_details()
782 }
783
784 fn source_export_details(
785 &self,
786 ) -> Option<(
787 CatalogItemId,
788 &UnresolvedItemName,
789 &SourceExportDetails,
790 &SourceExportDataConfig<ReferencedConnection>,
791 )> {
792 self.entry.source_export_details()
793 }
794
795 fn is_progress_source(&self) -> bool {
796 self.entry.is_progress_source()
797 }
798
799 fn progress_id(&self) -> Option<CatalogItemId> {
800 self.entry.progress_id()
801 }
802
803 fn owner_id(&self) -> RoleId {
804 *self.entry.owner_id()
805 }
806
807 fn privileges(&self) -> &PrivilegeMap {
808 self.entry.privileges()
809 }
810
811 fn cluster_id(&self) -> Option<ClusterId> {
812 self.entry.item().cluster_id()
813 }
814
815 fn at_version(
816 &self,
817 version: RelationVersionSelector,
818 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
819 Box::new(CatalogCollectionEntry {
820 entry: self.entry.clone(),
821 version,
822 })
823 }
824
825 fn latest_version(&self) -> Option<RelationVersion> {
826 self.entry.latest_version()
827 }
828}
829
830#[derive(Debug, Clone, Serialize)]
831pub enum CatalogItem {
832 Table(Table),
833 Source(Source),
834 Log(Log),
835 View(View),
836 MaterializedView(MaterializedView),
837 Sink(Sink),
838 Index(Index),
839 Type(Type),
840 Func(Func),
841 Secret(Secret),
842 Connection(Connection),
843}
844
845impl From<CatalogEntry> for durable::Item {
846 fn from(entry: CatalogEntry) -> durable::Item {
847 let (create_sql, global_id, extra_versions) = entry.item.into_serialized();
848 durable::Item {
849 id: entry.id,
850 oid: entry.oid,
851 global_id,
852 schema_id: entry.name.qualifiers.schema_spec.into(),
853 name: entry.name.item,
854 create_sql,
855 owner_id: entry.owner_id,
856 privileges: entry.privileges.into_all_values().collect(),
857 extra_versions,
858 }
859 }
860}
861
862#[derive(Debug, Clone, Serialize)]
863pub struct Table {
864 pub create_sql: Option<String>,
866 pub desc: VersionedRelationDesc,
868 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
870 pub collections: BTreeMap<RelationVersion, GlobalId>,
871 #[serde(skip)]
873 pub conn_id: Option<ConnectionId>,
874 pub resolved_ids: ResolvedIds,
876 pub custom_logical_compaction_window: Option<CompactionWindow>,
878 pub is_retained_metrics_object: bool,
883 pub data_source: TableDataSource,
885}
886
887impl Table {
888 pub fn timeline(&self) -> Timeline {
889 match &self.data_source {
890 TableDataSource::TableWrites { .. } => Timeline::EpochMilliseconds,
893 TableDataSource::DataSource { timeline, .. } => timeline.clone(),
894 }
895 }
896
897 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
899 self.collections.values().copied()
900 }
901
902 pub fn global_id_writes(&self) -> GlobalId {
904 *self
905 .collections
906 .last_key_value()
907 .expect("at least one version of a table")
908 .1
909 }
910
911 pub fn collection_descs(
913 &self,
914 ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
915 self.collections.iter().map(|(version, gid)| {
916 let desc = self
917 .desc
918 .at_version(RelationVersionSelector::Specific(*version));
919 (*gid, *version, desc)
920 })
921 }
922
923 pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
925 let (version, _gid) = self
926 .collections
927 .iter()
928 .find(|(_version, gid)| *gid == id)
929 .expect("GlobalId to exist");
930 self.desc
931 .at_version(RelationVersionSelector::Specific(*version))
932 }
933}
934
935#[derive(Clone, Debug, Serialize)]
936pub enum TableDataSource {
937 TableWrites {
939 #[serde(skip)]
940 defaults: Vec<Expr<Aug>>,
941 },
942
943 DataSource {
946 desc: DataSourceDesc,
947 timeline: Timeline,
948 },
949}
950
951#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
952pub enum DataSourceDesc {
953 Ingestion {
955 desc: SourceDesc<ReferencedConnection>,
956 cluster_id: ClusterId,
957 },
958 OldSyntaxIngestion {
960 desc: SourceDesc<ReferencedConnection>,
961 cluster_id: ClusterId,
962 progress_subsource: CatalogItemId,
965 data_config: SourceExportDataConfig<ReferencedConnection>,
966 details: SourceExportDetails,
967 },
968 IngestionExport {
976 ingestion_id: CatalogItemId,
977 external_reference: UnresolvedItemName,
978 details: SourceExportDetails,
979 data_config: SourceExportDataConfig<ReferencedConnection>,
980 },
981 Introspection(IntrospectionType),
983 Progress,
985 Webhook {
987 validate_using: Option<WebhookValidation>,
989 body_format: WebhookBodyFormat,
991 headers: WebhookHeaders,
993 cluster_id: ClusterId,
995 },
996 Catalog,
998}
999
1000impl From<IntrospectionType> for DataSourceDesc {
1001 fn from(typ: IntrospectionType) -> Self {
1002 Self::Introspection(typ)
1003 }
1004}
1005
1006impl DataSourceDesc {
1007 pub fn formats(&self) -> (Option<&str>, Option<&str>) {
1009 match &self {
1010 DataSourceDesc::Ingestion { .. } => (None, None),
1011 DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1012 match &data_config.encoding.as_ref() {
1013 Some(encoding) => match &encoding.key {
1014 Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1015 None => (None, Some(encoding.value.type_())),
1016 },
1017 None => (None, None),
1018 }
1019 }
1020 DataSourceDesc::IngestionExport { data_config, .. } => match &data_config.encoding {
1021 Some(encoding) => match &encoding.key {
1022 Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1023 None => (None, Some(encoding.value.type_())),
1024 },
1025 None => (None, None),
1026 },
1027 DataSourceDesc::Introspection(_)
1028 | DataSourceDesc::Webhook { .. }
1029 | DataSourceDesc::Progress
1030 | DataSourceDesc::Catalog => (None, None),
1031 }
1032 }
1033
1034 pub fn envelope(&self) -> Option<&str> {
1036 fn envelope_string(envelope: &SourceEnvelope) -> &str {
1041 match envelope {
1042 SourceEnvelope::None(_) => "none",
1043 SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
1044 mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert",
1045 mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
1046 "debezium"
1050 }
1051 mz_storage_types::sources::envelope::UpsertStyle::ValueErrInline { .. } => {
1052 "upsert-value-err-inline"
1053 }
1054 },
1055 SourceEnvelope::CdcV2 => {
1056 "materialize"
1059 }
1060 }
1061 }
1062
1063 match self {
1064 DataSourceDesc::Ingestion { .. } => None,
1069 DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1070 Some(envelope_string(&data_config.envelope))
1071 }
1072 DataSourceDesc::IngestionExport { data_config, .. } => {
1073 Some(envelope_string(&data_config.envelope))
1074 }
1075 DataSourceDesc::Introspection(_)
1076 | DataSourceDesc::Webhook { .. }
1077 | DataSourceDesc::Progress
1078 | DataSourceDesc::Catalog => None,
1079 }
1080 }
1081}
1082
1083#[derive(Debug, Clone, Serialize)]
1084pub struct Source {
1085 pub create_sql: Option<String>,
1087 pub global_id: GlobalId,
1089 #[serde(skip)]
1091 pub data_source: DataSourceDesc,
1092 pub desc: RelationDesc,
1094 pub timeline: Timeline,
1096 pub resolved_ids: ResolvedIds,
1098 pub custom_logical_compaction_window: Option<CompactionWindow>,
1102 pub is_retained_metrics_object: bool,
1105}
1106
1107impl Source {
1108 pub fn new(
1115 plan: CreateSourcePlan,
1116 global_id: GlobalId,
1117 resolved_ids: ResolvedIds,
1118 custom_logical_compaction_window: Option<CompactionWindow>,
1119 is_retained_metrics_object: bool,
1120 ) -> Source {
1121 Source {
1122 create_sql: Some(plan.source.create_sql),
1123 data_source: match plan.source.data_source {
1124 mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1125 desc,
1126 cluster_id: plan
1127 .in_cluster
1128 .expect("ingestion-based sources must be given a cluster ID"),
1129 },
1130 mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1131 desc,
1132 progress_subsource,
1133 data_config,
1134 details,
1135 } => DataSourceDesc::OldSyntaxIngestion {
1136 desc,
1137 cluster_id: plan
1138 .in_cluster
1139 .expect("ingestion-based sources must be given a cluster ID"),
1140 progress_subsource,
1141 data_config,
1142 details,
1143 },
1144 mz_sql::plan::DataSourceDesc::Progress => {
1145 assert!(
1146 plan.in_cluster.is_none(),
1147 "subsources must not have a host config or cluster_id defined"
1148 );
1149 DataSourceDesc::Progress
1150 }
1151 mz_sql::plan::DataSourceDesc::IngestionExport {
1152 ingestion_id,
1153 external_reference,
1154 details,
1155 data_config,
1156 } => {
1157 assert!(
1158 plan.in_cluster.is_none(),
1159 "subsources must not have a host config or cluster_id defined"
1160 );
1161 DataSourceDesc::IngestionExport {
1162 ingestion_id,
1163 external_reference,
1164 details,
1165 data_config,
1166 }
1167 }
1168 mz_sql::plan::DataSourceDesc::Webhook {
1169 validate_using,
1170 body_format,
1171 headers,
1172 cluster_id,
1173 } => {
1174 mz_ore::soft_assert_or_log!(
1175 cluster_id.is_none(),
1176 "cluster_id set at Source level for Webhooks"
1177 );
1178 DataSourceDesc::Webhook {
1179 validate_using,
1180 body_format,
1181 headers,
1182 cluster_id: plan
1183 .in_cluster
1184 .expect("webhook sources must be given a cluster ID"),
1185 }
1186 }
1187 },
1188 desc: plan.source.desc,
1189 global_id,
1190 timeline: plan.timeline,
1191 resolved_ids,
1192 custom_logical_compaction_window: plan
1193 .source
1194 .compaction_window
1195 .or(custom_logical_compaction_window),
1196 is_retained_metrics_object,
1197 }
1198 }
1199
1200 pub fn source_type(&self) -> &str {
1202 match &self.data_source {
1203 DataSourceDesc::Ingestion { desc, .. }
1204 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.name(),
1205 DataSourceDesc::Progress => "progress",
1206 DataSourceDesc::IngestionExport { .. } => "subsource",
1207 DataSourceDesc::Introspection(_) | DataSourceDesc::Catalog => "source",
1208 DataSourceDesc::Webhook { .. } => "webhook",
1209 }
1210 }
1211
1212 pub fn connection_id(&self) -> Option<CatalogItemId> {
1214 match &self.data_source {
1215 DataSourceDesc::Ingestion { desc, .. }
1216 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.connection_id(),
1217 DataSourceDesc::IngestionExport { .. }
1218 | DataSourceDesc::Introspection(_)
1219 | DataSourceDesc::Webhook { .. }
1220 | DataSourceDesc::Progress
1221 | DataSourceDesc::Catalog => None,
1222 }
1223 }
1224
1225 pub fn global_id(&self) -> GlobalId {
1227 self.global_id
1228 }
1229
1230 pub fn user_controllable_persist_shard_count(&self) -> i64 {
1238 match &self.data_source {
1239 DataSourceDesc::Ingestion { .. } => 0,
1240 DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1241 match &desc.connection {
1242 GenericSourceConnection::Postgres(_)
1246 | GenericSourceConnection::MySql(_)
1247 | GenericSourceConnection::SqlServer(_) => 0,
1248 GenericSourceConnection::LoadGenerator(lg) => match lg.load_generator {
1249 LoadGenerator::Clock
1251 | LoadGenerator::Counter { .. }
1252 | LoadGenerator::Datums
1253 | LoadGenerator::KeyValue(_) => 1,
1254 LoadGenerator::Auction
1255 | LoadGenerator::Marketing
1256 | LoadGenerator::Tpch { .. } => 0,
1257 },
1258 GenericSourceConnection::Kafka(_) => 1,
1259 }
1260 }
1261 DataSourceDesc::IngestionExport { .. } => 1,
1264 DataSourceDesc::Webhook { .. } => 1,
1265 DataSourceDesc::Introspection(_)
1268 | DataSourceDesc::Progress
1269 | DataSourceDesc::Catalog => 0,
1270 }
1271 }
1272}
1273
1274#[derive(Debug, Clone, Serialize)]
1275pub struct Log {
1276 pub variant: LogVariant,
1278 pub global_id: GlobalId,
1280}
1281
1282impl Log {
1283 pub fn global_id(&self) -> GlobalId {
1285 self.global_id
1286 }
1287}
1288
1289#[derive(Debug, Clone, Serialize)]
1290pub struct Sink {
1291 pub create_sql: String,
1293 pub global_id: GlobalId,
1295 pub from: GlobalId,
1297 pub connection: StorageSinkConnection<ReferencedConnection>,
1299 pub envelope: SinkEnvelope,
1303 pub with_snapshot: bool,
1305 pub version: u64,
1307 pub resolved_ids: ResolvedIds,
1309 pub cluster_id: ClusterId,
1311 pub commit_interval: Option<Duration>,
1313}
1314
1315impl Sink {
1316 pub fn sink_type(&self) -> &str {
1317 self.connection.name()
1318 }
1319
1320 pub fn envelope(&self) -> Option<&str> {
1322 match &self.envelope {
1323 SinkEnvelope::Debezium => Some("debezium"),
1324 SinkEnvelope::Upsert => Some("upsert"),
1325 SinkEnvelope::Append => Some("append"),
1326 }
1327 }
1328
1329 pub fn combined_format(&self) -> Option<Cow<'_, str>> {
1334 match &self.connection {
1335 StorageSinkConnection::Kafka(connection) => Some(connection.format.get_format_name()),
1336 StorageSinkConnection::Iceberg(_) => None,
1337 }
1338 }
1339
1340 pub fn formats(&self) -> Option<(Option<&str>, &str)> {
1342 match &self.connection {
1343 StorageSinkConnection::Kafka(connection) => {
1344 let key_format = connection
1345 .format
1346 .key_format
1347 .as_ref()
1348 .map(|f| f.get_format_name());
1349 let value_format = connection.format.value_format.get_format_name();
1350 Some((key_format, value_format))
1351 }
1352 StorageSinkConnection::Iceberg(_) => None,
1353 }
1354 }
1355
1356 pub fn connection_id(&self) -> Option<CatalogItemId> {
1357 self.connection.connection_id()
1358 }
1359
1360 pub fn global_id(&self) -> GlobalId {
1362 self.global_id
1363 }
1364}
1365
1366#[derive(Debug, Clone, Serialize)]
1367pub struct View {
1368 pub create_sql: String,
1370 pub global_id: GlobalId,
1372 pub raw_expr: Arc<HirRelationExpr>,
1374 pub locally_optimized_expr: Arc<OptimizedMirRelationExpr>,
1376 pub desc: RelationDesc,
1378 pub conn_id: Option<ConnectionId>,
1380 pub resolved_ids: ResolvedIds,
1382 pub dependencies: DependencyIds,
1384}
1385
1386impl View {
1387 pub fn global_id(&self) -> GlobalId {
1389 self.global_id
1390 }
1391}
1392
1393#[derive(Debug, Clone, Serialize)]
1394pub struct MaterializedView {
1395 pub create_sql: String,
1397 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
1399 pub collections: BTreeMap<RelationVersion, GlobalId>,
1400 pub raw_expr: Arc<HirRelationExpr>,
1402 pub locally_optimized_expr: Arc<OptimizedMirRelationExpr>,
1404 pub desc: VersionedRelationDesc,
1406 pub resolved_ids: ResolvedIds,
1408 pub dependencies: DependencyIds,
1410 pub replacement_target: Option<CatalogItemId>,
1412 pub cluster_id: ClusterId,
1414 pub target_replica: Option<ReplicaId>,
1416 pub non_null_assertions: Vec<usize>,
1420 pub custom_logical_compaction_window: Option<CompactionWindow>,
1422 pub refresh_schedule: Option<RefreshSchedule>,
1424 pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1429 #[serde(skip)]
1435 pub optimized_plan: Option<Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
1436 #[serde(skip)]
1438 pub physical_plan: Option<Arc<DataflowDescription<ComputePlan>>>,
1439 #[serde(skip)]
1441 pub dataflow_metainfo: Option<DataflowMetainfo<Arc<OptimizerNotice>>>,
1442}
1443
1444impl MaterializedView {
1445 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1447 self.collections.values().copied()
1448 }
1449
1450 pub fn global_id_writes(&self) -> GlobalId {
1453 *self
1454 .collections
1455 .last_key_value()
1456 .expect("at least one version of a materialized view")
1457 .1
1458 }
1459
1460 pub fn collection_descs(
1462 &self,
1463 ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
1464 self.collections.iter().map(|(version, gid)| {
1465 let desc = self
1466 .desc
1467 .at_version(RelationVersionSelector::Specific(*version));
1468 (*gid, *version, desc)
1469 })
1470 }
1471
1472 pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
1474 let (version, _gid) = self
1475 .collections
1476 .iter()
1477 .find(|(_version, gid)| *gid == id)
1478 .expect("GlobalId to exist");
1479 self.desc
1480 .at_version(RelationVersionSelector::Specific(*version))
1481 }
1482
1483 pub fn apply_replacement(&mut self, replacement: Self) {
1485 let target_id = replacement
1486 .replacement_target
1487 .expect("replacement has target");
1488
1489 fn parse(create_sql: &str) -> mz_sql::ast::CreateMaterializedViewStatement<Raw> {
1490 let res = mz_sql::parse::parse(create_sql).unwrap_or_else(|e| {
1491 panic!("invalid create_sql persisted in catalog: {e}\n{create_sql}");
1492 });
1493 if let Statement::CreateMaterializedView(cmvs) = res.into_element().ast {
1494 cmvs
1495 } else {
1496 panic!("invalid MV create_sql persisted in catalog\n{create_sql}");
1497 }
1498 }
1499
1500 let old_stmt = parse(&self.create_sql);
1501 let rpl_stmt = parse(&replacement.create_sql);
1502 let new_stmt = mz_sql::ast::CreateMaterializedViewStatement {
1503 if_exists: old_stmt.if_exists,
1504 name: old_stmt.name,
1505 columns: rpl_stmt.columns,
1506 replacement_for: None,
1507 in_cluster: rpl_stmt.in_cluster,
1508 in_cluster_replica: rpl_stmt.in_cluster_replica,
1509 query: rpl_stmt.query,
1510 as_of: rpl_stmt.as_of,
1511 with_options: rpl_stmt.with_options,
1512 };
1513 let create_sql = new_stmt.to_ast_string_stable();
1514
1515 let mut collections = std::mem::take(&mut self.collections);
1516 let latest_version = collections.keys().max().expect("at least one version");
1520 let new_version = latest_version.bump();
1521 collections.insert(new_version, replacement.global_id_writes());
1522
1523 let mut resolved_ids = replacement.resolved_ids;
1524 resolved_ids.remove_item(&target_id);
1525 let mut dependencies = replacement.dependencies;
1526 dependencies.0.remove(&target_id);
1527
1528 *self = Self {
1529 create_sql,
1530 collections,
1531 raw_expr: replacement.raw_expr,
1532 locally_optimized_expr: replacement.locally_optimized_expr,
1533 desc: replacement.desc,
1534 resolved_ids,
1535 dependencies,
1536 replacement_target: None,
1537 cluster_id: replacement.cluster_id,
1538 target_replica: replacement.target_replica,
1539 non_null_assertions: replacement.non_null_assertions,
1540 custom_logical_compaction_window: replacement.custom_logical_compaction_window,
1541 refresh_schedule: replacement.refresh_schedule,
1542 initial_as_of: replacement.initial_as_of,
1543 optimized_plan: replacement.optimized_plan,
1544 physical_plan: replacement.physical_plan,
1545 dataflow_metainfo: replacement.dataflow_metainfo,
1546 };
1547 }
1548}
1549
1550#[derive(Debug, Clone, Serialize)]
1551pub struct Index {
1552 pub create_sql: String,
1554 pub global_id: GlobalId,
1556 pub on: GlobalId,
1558 pub keys: Arc<[MirScalarExpr]>,
1560 pub conn_id: Option<ConnectionId>,
1562 pub resolved_ids: ResolvedIds,
1564 pub cluster_id: ClusterId,
1566 pub custom_logical_compaction_window: Option<CompactionWindow>,
1568 pub is_retained_metrics_object: bool,
1573 #[serde(skip)]
1579 pub optimized_plan: Option<Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
1580 #[serde(skip)]
1582 pub physical_plan: Option<Arc<DataflowDescription<ComputePlan>>>,
1583 #[serde(skip)]
1585 pub dataflow_metainfo: Option<DataflowMetainfo<Arc<OptimizerNotice>>>,
1586}
1587
1588impl Index {
1589 pub fn global_id(&self) -> GlobalId {
1591 self.global_id
1592 }
1593}
1594
1595#[derive(Debug, Clone, Serialize)]
1596pub struct Type {
1597 pub create_sql: Option<String>,
1599 pub global_id: GlobalId,
1601 #[serde(skip)]
1602 pub details: CatalogTypeDetails<IdReference>,
1603 pub resolved_ids: ResolvedIds,
1605}
1606
1607#[derive(Debug, Clone, Serialize)]
1608pub struct Func {
1609 #[serde(skip)]
1611 pub inner: &'static mz_sql::func::Func,
1612 pub global_id: GlobalId,
1614}
1615
1616#[derive(Debug, Clone, Serialize)]
1617pub struct Secret {
1618 pub create_sql: String,
1620 pub global_id: GlobalId,
1622}
1623
1624#[derive(Debug, Clone, Serialize)]
1625pub struct Connection {
1626 pub create_sql: String,
1628 pub global_id: GlobalId,
1630 pub details: ConnectionDetails,
1632 pub resolved_ids: ResolvedIds,
1634}
1635
1636impl Connection {
1637 pub fn global_id(&self) -> GlobalId {
1639 self.global_id
1640 }
1641}
1642
1643#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
1644pub struct NetworkPolicy {
1645 pub name: String,
1646 pub id: NetworkPolicyId,
1647 pub oid: u32,
1648 pub rules: Vec<NetworkPolicyRule>,
1649 pub owner_id: RoleId,
1650 pub privileges: PrivilegeMap,
1651}
1652
1653impl From<NetworkPolicy> for durable::NetworkPolicy {
1654 fn from(policy: NetworkPolicy) -> durable::NetworkPolicy {
1655 durable::NetworkPolicy {
1656 id: policy.id,
1657 oid: policy.oid,
1658 name: policy.name,
1659 rules: policy.rules,
1660 owner_id: policy.owner_id,
1661 privileges: policy.privileges.into_all_values().collect(),
1662 }
1663 }
1664}
1665
1666impl From<durable::NetworkPolicy> for NetworkPolicy {
1667 fn from(
1668 durable::NetworkPolicy {
1669 id,
1670 oid,
1671 name,
1672 rules,
1673 owner_id,
1674 privileges,
1675 }: durable::NetworkPolicy,
1676 ) -> Self {
1677 NetworkPolicy {
1678 id,
1679 oid,
1680 name,
1681 rules,
1682 owner_id,
1683 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1684 }
1685 }
1686}
1687
1688impl UpdateFrom<durable::NetworkPolicy> for NetworkPolicy {
1689 fn update_from(
1690 &mut self,
1691 durable::NetworkPolicy {
1692 id,
1693 oid,
1694 name,
1695 rules,
1696 owner_id,
1697 privileges,
1698 }: durable::NetworkPolicy,
1699 ) {
1700 self.id = id;
1701 self.oid = oid;
1702 self.name = name;
1703 self.rules = rules;
1704 self.owner_id = owner_id;
1705 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1706 }
1707}
1708
1709impl CatalogItem {
1710 pub fn typ(&self) -> mz_sql::catalog::CatalogItemType {
1712 match self {
1713 CatalogItem::Table(_) => CatalogItemType::Table,
1714 CatalogItem::Source(_) => CatalogItemType::Source,
1715 CatalogItem::Log(_) => CatalogItemType::Source,
1716 CatalogItem::Sink(_) => CatalogItemType::Sink,
1717 CatalogItem::View(_) => CatalogItemType::View,
1718 CatalogItem::MaterializedView(_) => CatalogItemType::MaterializedView,
1719 CatalogItem::Index(_) => CatalogItemType::Index,
1720 CatalogItem::Type(_) => CatalogItemType::Type,
1721 CatalogItem::Func(_) => CatalogItemType::Func,
1722 CatalogItem::Secret(_) => CatalogItemType::Secret,
1723 CatalogItem::Connection(_) => CatalogItemType::Connection,
1724 }
1725 }
1726
1727 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1729 let gid = match self {
1730 CatalogItem::Source(source) => source.global_id,
1731 CatalogItem::Log(log) => log.global_id,
1732 CatalogItem::Sink(sink) => sink.global_id,
1733 CatalogItem::View(view) => view.global_id,
1734 CatalogItem::MaterializedView(mv) => {
1735 return itertools::Either::Left(mv.collections.values().copied());
1736 }
1737 CatalogItem::Index(index) => index.global_id,
1738 CatalogItem::Func(func) => func.global_id,
1739 CatalogItem::Type(ty) => ty.global_id,
1740 CatalogItem::Secret(secret) => secret.global_id,
1741 CatalogItem::Connection(conn) => conn.global_id,
1742 CatalogItem::Table(table) => {
1743 return itertools::Either::Left(table.collections.values().copied());
1744 }
1745 };
1746 itertools::Either::Right(std::iter::once(gid))
1747 }
1748
1749 pub fn latest_global_id(&self) -> GlobalId {
1753 match self {
1754 CatalogItem::Source(source) => source.global_id,
1755 CatalogItem::Log(log) => log.global_id,
1756 CatalogItem::Sink(sink) => sink.global_id,
1757 CatalogItem::View(view) => view.global_id,
1758 CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
1759 CatalogItem::Index(index) => index.global_id,
1760 CatalogItem::Func(func) => func.global_id,
1761 CatalogItem::Type(ty) => ty.global_id,
1762 CatalogItem::Secret(secret) => secret.global_id,
1763 CatalogItem::Connection(conn) => conn.global_id,
1764 CatalogItem::Table(table) => table.global_id_writes(),
1765 }
1766 }
1767
1768 pub fn optimized_plan(&self) -> Option<&Arc<DataflowDescription<OptimizedMirRelationExpr>>> {
1770 match self {
1771 CatalogItem::Index(idx) => idx.optimized_plan.as_ref(),
1772 CatalogItem::MaterializedView(mv) => mv.optimized_plan.as_ref(),
1773 _ => None,
1774 }
1775 }
1776
1777 pub fn physical_plan(&self) -> Option<&Arc<DataflowDescription<ComputePlan>>> {
1779 match self {
1780 CatalogItem::Index(idx) => idx.physical_plan.as_ref(),
1781 CatalogItem::MaterializedView(mv) => mv.physical_plan.as_ref(),
1782 _ => None,
1783 }
1784 }
1785
1786 pub fn dataflow_metainfo(&self) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
1788 match self {
1789 CatalogItem::Index(idx) => idx.dataflow_metainfo.as_ref(),
1790 CatalogItem::MaterializedView(mv) => mv.dataflow_metainfo.as_ref(),
1791 _ => None,
1792 }
1793 }
1794
1795 pub fn plan_fields_mut(
1800 &mut self,
1801 ) -> Option<(
1802 &mut Option<Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
1803 &mut Option<Arc<DataflowDescription<ComputePlan>>>,
1804 &mut Option<DataflowMetainfo<Arc<OptimizerNotice>>>,
1805 )> {
1806 match self {
1807 CatalogItem::Index(idx) => Some((
1808 &mut idx.optimized_plan,
1809 &mut idx.physical_plan,
1810 &mut idx.dataflow_metainfo,
1811 )),
1812 CatalogItem::MaterializedView(mv) => Some((
1813 &mut mv.optimized_plan,
1814 &mut mv.physical_plan,
1815 &mut mv.dataflow_metainfo,
1816 )),
1817 _ => None,
1818 }
1819 }
1820
1821 pub fn is_storage_collection(&self) -> bool {
1823 match self {
1824 CatalogItem::Table(_)
1825 | CatalogItem::Source(_)
1826 | CatalogItem::MaterializedView(_)
1827 | CatalogItem::Sink(_) => true,
1828 CatalogItem::Log(_)
1829 | CatalogItem::View(_)
1830 | CatalogItem::Index(_)
1831 | CatalogItem::Type(_)
1832 | CatalogItem::Func(_)
1833 | CatalogItem::Secret(_)
1834 | CatalogItem::Connection(_) => false,
1835 }
1836 }
1837
1838 pub fn relation_desc(&self, version: RelationVersionSelector) -> Option<Cow<'_, RelationDesc>> {
1847 match &self {
1848 CatalogItem::Source(src) => Some(Cow::Borrowed(&src.desc)),
1849 CatalogItem::Log(log) => Some(Cow::Owned(log.variant.desc())),
1850 CatalogItem::Table(tbl) => Some(Cow::Owned(tbl.desc.at_version(version))),
1851 CatalogItem::View(view) => Some(Cow::Borrowed(&view.desc)),
1852 CatalogItem::MaterializedView(mview) => {
1853 Some(Cow::Owned(mview.desc.at_version(version)))
1854 }
1855 CatalogItem::Func(_)
1856 | CatalogItem::Index(_)
1857 | CatalogItem::Sink(_)
1858 | CatalogItem::Secret(_)
1859 | CatalogItem::Connection(_)
1860 | CatalogItem::Type(_) => None,
1861 }
1862 }
1863
1864 pub fn func(
1865 &self,
1866 entry: &CatalogEntry,
1867 ) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
1868 match &self {
1869 CatalogItem::Func(func) => Ok(func.inner),
1870 _ => Err(SqlCatalogError::UnexpectedType {
1871 name: entry.name().item.to_string(),
1872 actual_type: entry.item_type(),
1873 expected_type: CatalogItemType::Func,
1874 }),
1875 }
1876 }
1877
1878 pub fn source_desc(
1879 &self,
1880 entry: &CatalogEntry,
1881 ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
1882 match &self {
1883 CatalogItem::Source(source) => match &source.data_source {
1884 DataSourceDesc::Ingestion { desc, .. }
1885 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => Ok(Some(desc)),
1886 DataSourceDesc::IngestionExport { .. }
1887 | DataSourceDesc::Introspection(_)
1888 | DataSourceDesc::Webhook { .. }
1889 | DataSourceDesc::Progress
1890 | DataSourceDesc::Catalog => Ok(None),
1891 },
1892 _ => Err(SqlCatalogError::UnexpectedType {
1893 name: entry.name().item.to_string(),
1894 actual_type: entry.item_type(),
1895 expected_type: CatalogItemType::Source,
1896 }),
1897 }
1898 }
1899
1900 pub fn is_progress_source(&self) -> bool {
1902 matches!(
1903 self,
1904 CatalogItem::Source(Source {
1905 data_source: DataSourceDesc::Progress,
1906 ..
1907 })
1908 )
1909 }
1910
1911 pub fn references(&self) -> &ResolvedIds {
1914 static EMPTY: LazyLock<ResolvedIds> = LazyLock::new(ResolvedIds::empty);
1915 match self {
1916 CatalogItem::Func(_) => &*EMPTY,
1917 CatalogItem::Index(idx) => &idx.resolved_ids,
1918 CatalogItem::Sink(sink) => &sink.resolved_ids,
1919 CatalogItem::Source(source) => &source.resolved_ids,
1920 CatalogItem::Log(_) => &*EMPTY,
1921 CatalogItem::Table(table) => &table.resolved_ids,
1922 CatalogItem::Type(typ) => &typ.resolved_ids,
1923 CatalogItem::View(view) => &view.resolved_ids,
1924 CatalogItem::MaterializedView(mview) => &mview.resolved_ids,
1925 CatalogItem::Secret(_) => &*EMPTY,
1926 CatalogItem::Connection(connection) => &connection.resolved_ids,
1927 }
1928 }
1929
1930 pub fn uses(&self) -> BTreeSet<CatalogItemId> {
1936 let mut uses: BTreeSet<_> = self.references().items().copied().collect();
1937 match self {
1938 CatalogItem::Func(_) => {}
1941 CatalogItem::Index(_) => {}
1942 CatalogItem::Sink(_) => {}
1943 CatalogItem::Source(_) => {}
1944 CatalogItem::Log(_) => {}
1945 CatalogItem::Table(_) => {}
1946 CatalogItem::Type(_) => {}
1947 CatalogItem::View(view) => uses.extend(view.dependencies.0.iter().copied()),
1948 CatalogItem::MaterializedView(mview) => {
1949 uses.extend(mview.dependencies.0.iter().copied())
1950 }
1951 CatalogItem::Secret(_) => {}
1952 CatalogItem::Connection(_) => {}
1953 }
1954 uses
1955 }
1956
1957 pub fn conn_id(&self) -> Option<&ConnectionId> {
1960 match self {
1961 CatalogItem::View(view) => view.conn_id.as_ref(),
1962 CatalogItem::Index(index) => index.conn_id.as_ref(),
1963 CatalogItem::Table(table) => table.conn_id.as_ref(),
1964 CatalogItem::Log(_)
1965 | CatalogItem::Source(_)
1966 | CatalogItem::Sink(_)
1967 | CatalogItem::MaterializedView(_)
1968 | CatalogItem::Secret(_)
1969 | CatalogItem::Type(_)
1970 | CatalogItem::Func(_)
1971 | CatalogItem::Connection(_) => None,
1972 }
1973 }
1974
1975 pub fn set_conn_id(&mut self, conn_id: Option<ConnectionId>) {
1978 match self {
1979 CatalogItem::View(view) => view.conn_id = conn_id,
1980 CatalogItem::Index(index) => index.conn_id = conn_id,
1981 CatalogItem::Table(table) => table.conn_id = conn_id,
1982 CatalogItem::Log(_)
1983 | CatalogItem::Source(_)
1984 | CatalogItem::Sink(_)
1985 | CatalogItem::MaterializedView(_)
1986 | CatalogItem::Secret(_)
1987 | CatalogItem::Type(_)
1988 | CatalogItem::Func(_)
1989 | CatalogItem::Connection(_) => (),
1990 }
1991 }
1992
1993 pub fn is_temporary(&self) -> bool {
1995 self.conn_id().is_some()
1996 }
1997
1998 pub fn rename_schema_refs(
1999 &self,
2000 database_name: &str,
2001 cur_schema_name: &str,
2002 new_schema_name: &str,
2003 ) -> Result<CatalogItem, (String, String)> {
2004 let do_rewrite = |create_sql: String| -> Result<String, (String, String)> {
2005 let mut create_stmt = mz_sql::parse::parse(&create_sql)
2006 .expect("invalid create sql persisted to catalog")
2007 .into_element()
2008 .ast;
2009
2010 mz_sql::ast::transform::create_stmt_rename_schema_refs(
2012 &mut create_stmt,
2013 database_name,
2014 cur_schema_name,
2015 new_schema_name,
2016 )?;
2017
2018 Ok(create_stmt.to_ast_string_stable())
2019 };
2020
2021 match self {
2022 CatalogItem::Table(i) => {
2023 let mut i = i.clone();
2024 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2025 Ok(CatalogItem::Table(i))
2026 }
2027 CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
2028 CatalogItem::Source(i) => {
2029 let mut i = i.clone();
2030 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2031 Ok(CatalogItem::Source(i))
2032 }
2033 CatalogItem::Sink(i) => {
2034 let mut i = i.clone();
2035 i.create_sql = do_rewrite(i.create_sql)?;
2036 Ok(CatalogItem::Sink(i))
2037 }
2038 CatalogItem::View(i) => {
2039 let mut i = i.clone();
2040 i.create_sql = do_rewrite(i.create_sql)?;
2041 Ok(CatalogItem::View(i))
2042 }
2043 CatalogItem::MaterializedView(i) => {
2044 let mut i = i.clone();
2045 i.create_sql = do_rewrite(i.create_sql)?;
2046 Ok(CatalogItem::MaterializedView(i))
2047 }
2048 CatalogItem::Index(i) => {
2049 let mut i = i.clone();
2050 i.create_sql = do_rewrite(i.create_sql)?;
2051 Ok(CatalogItem::Index(i))
2052 }
2053 CatalogItem::Secret(i) => {
2054 let mut i = i.clone();
2055 i.create_sql = do_rewrite(i.create_sql)?;
2056 Ok(CatalogItem::Secret(i))
2057 }
2058 CatalogItem::Connection(i) => {
2059 let mut i = i.clone();
2060 i.create_sql = do_rewrite(i.create_sql)?;
2061 Ok(CatalogItem::Connection(i))
2062 }
2063 CatalogItem::Type(i) => {
2064 let mut i = i.clone();
2065 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2066 Ok(CatalogItem::Type(i))
2067 }
2068 CatalogItem::Func(i) => Ok(CatalogItem::Func(i.clone())),
2069 }
2070 }
2071
2072 pub fn rename_item_refs(
2076 &self,
2077 from: FullItemName,
2078 to_item_name: String,
2079 rename_self: bool,
2080 ) -> Result<CatalogItem, String> {
2081 let do_rewrite = |create_sql: String| -> Result<String, String> {
2082 let mut create_stmt = mz_sql::parse::parse(&create_sql)
2083 .expect("invalid create sql persisted to catalog")
2084 .into_element()
2085 .ast;
2086 if rename_self {
2087 mz_sql::ast::transform::create_stmt_rename(&mut create_stmt, to_item_name.clone());
2088 }
2089 mz_sql::ast::transform::create_stmt_rename_refs(&mut create_stmt, from, to_item_name)?;
2091 Ok(create_stmt.to_ast_string_stable())
2092 };
2093
2094 match self {
2095 CatalogItem::Table(i) => {
2096 let mut i = i.clone();
2097 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2098 Ok(CatalogItem::Table(i))
2099 }
2100 CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
2101 CatalogItem::Source(i) => {
2102 let mut i = i.clone();
2103 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2104 Ok(CatalogItem::Source(i))
2105 }
2106 CatalogItem::Sink(i) => {
2107 let mut i = i.clone();
2108 i.create_sql = do_rewrite(i.create_sql)?;
2109 Ok(CatalogItem::Sink(i))
2110 }
2111 CatalogItem::View(i) => {
2112 let mut i = i.clone();
2113 i.create_sql = do_rewrite(i.create_sql)?;
2114 Ok(CatalogItem::View(i))
2115 }
2116 CatalogItem::MaterializedView(i) => {
2117 let mut i = i.clone();
2118 i.create_sql = do_rewrite(i.create_sql)?;
2119 Ok(CatalogItem::MaterializedView(i))
2120 }
2121 CatalogItem::Index(i) => {
2122 let mut i = i.clone();
2123 i.create_sql = do_rewrite(i.create_sql)?;
2124 Ok(CatalogItem::Index(i))
2125 }
2126 CatalogItem::Secret(i) => {
2127 let mut i = i.clone();
2128 i.create_sql = do_rewrite(i.create_sql)?;
2129 Ok(CatalogItem::Secret(i))
2130 }
2131 CatalogItem::Func(_) | CatalogItem::Type(_) => {
2132 unreachable!("{}s cannot be renamed", self.typ())
2133 }
2134 CatalogItem::Connection(i) => {
2135 let mut i = i.clone();
2136 i.create_sql = do_rewrite(i.create_sql)?;
2137 Ok(CatalogItem::Connection(i))
2138 }
2139 }
2140 }
2141
2142 pub fn replace_item_refs(&self, old_id: CatalogItemId, new_id: CatalogItemId) -> CatalogItem {
2144 let do_rewrite = |create_sql: String| -> String {
2145 let mut create_stmt = mz_sql::parse::parse(&create_sql)
2146 .expect("invalid create sql persisted to catalog")
2147 .into_element()
2148 .ast;
2149 mz_sql::ast::transform::create_stmt_replace_ids(
2150 &mut create_stmt,
2151 &[(old_id, new_id)].into(),
2152 );
2153 create_stmt.to_ast_string_stable()
2154 };
2155
2156 match self {
2157 CatalogItem::Table(i) => {
2158 let mut i = i.clone();
2159 i.create_sql = i.create_sql.map(do_rewrite);
2160 CatalogItem::Table(i)
2161 }
2162 CatalogItem::Log(i) => CatalogItem::Log(i.clone()),
2163 CatalogItem::Source(i) => {
2164 let mut i = i.clone();
2165 i.create_sql = i.create_sql.map(do_rewrite);
2166 CatalogItem::Source(i)
2167 }
2168 CatalogItem::Sink(i) => {
2169 let mut i = i.clone();
2170 i.create_sql = do_rewrite(i.create_sql);
2171 CatalogItem::Sink(i)
2172 }
2173 CatalogItem::View(i) => {
2174 let mut i = i.clone();
2175 i.create_sql = do_rewrite(i.create_sql);
2176 CatalogItem::View(i)
2177 }
2178 CatalogItem::MaterializedView(i) => {
2179 let mut i = i.clone();
2180 i.create_sql = do_rewrite(i.create_sql);
2181 CatalogItem::MaterializedView(i)
2182 }
2183 CatalogItem::Index(i) => {
2184 let mut i = i.clone();
2185 i.create_sql = do_rewrite(i.create_sql);
2186 CatalogItem::Index(i)
2187 }
2188 CatalogItem::Secret(i) => {
2189 let mut i = i.clone();
2190 i.create_sql = do_rewrite(i.create_sql);
2191 CatalogItem::Secret(i)
2192 }
2193 CatalogItem::Func(_) | CatalogItem::Type(_) => {
2194 unreachable!("references of {}s cannot be replaced", self.typ())
2195 }
2196 CatalogItem::Connection(i) => {
2197 let mut i = i.clone();
2198 i.create_sql = do_rewrite(i.create_sql);
2199 CatalogItem::Connection(i)
2200 }
2201 }
2202 }
2203 pub fn update_retain_history(
2206 &mut self,
2207 value: Option<Value>,
2208 window: CompactionWindow,
2209 ) -> Result<Option<WithOptionValue<Raw>>, ()> {
2210 let update = |mut ast: &mut Statement<Raw>| {
2211 macro_rules! update_retain_history {
2213 ( $stmt:ident, $opt:ident, $name:ident ) => {{
2214 let pos = $stmt
2216 .with_options
2217 .iter()
2218 .rposition(|o| o.name == mz_sql_parser::ast::$name::RetainHistory);
2220 if let Some(value) = value {
2221 let next = mz_sql_parser::ast::$opt {
2222 name: mz_sql_parser::ast::$name::RetainHistory,
2223 value: Some(WithOptionValue::RetainHistoryFor(value)),
2224 };
2225 if let Some(idx) = pos {
2226 let previous = $stmt.with_options[idx].clone();
2227 $stmt.with_options[idx] = next;
2228 previous.value
2229 } else {
2230 $stmt.with_options.push(next);
2231 None
2232 }
2233 } else {
2234 if let Some(idx) = pos {
2235 $stmt.with_options.swap_remove(idx).value
2236 } else {
2237 None
2238 }
2239 }
2240 }};
2241 }
2242 let previous = match &mut ast {
2243 Statement::CreateTable(stmt) => {
2244 update_retain_history!(stmt, TableOption, TableOptionName)
2245 }
2246 Statement::CreateIndex(stmt) => {
2247 update_retain_history!(stmt, IndexOption, IndexOptionName)
2248 }
2249 Statement::CreateSource(stmt) => {
2250 update_retain_history!(stmt, CreateSourceOption, CreateSourceOptionName)
2251 }
2252 Statement::CreateMaterializedView(stmt) => {
2253 update_retain_history!(stmt, MaterializedViewOption, MaterializedViewOptionName)
2254 }
2255 _ => {
2256 return Err(());
2257 }
2258 };
2259 Ok(previous)
2260 };
2261
2262 let res = self.update_sql(update)?;
2263 let cw = self
2264 .custom_logical_compaction_window_mut()
2265 .expect("item must have compaction window");
2266 *cw = Some(window);
2267 Ok(res)
2268 }
2269
2270 pub fn update_timestamp_interval(
2272 &mut self,
2273 value: Option<Value>,
2274 interval: Duration,
2275 ) -> Result<(), ()> {
2276 let update = |ast: &mut Statement<Raw>| {
2277 match ast {
2278 Statement::CreateSource(stmt) => {
2279 let pos = stmt.with_options.iter().rposition(|o| {
2280 o.name == mz_sql_parser::ast::CreateSourceOptionName::TimestampInterval
2281 });
2282 if let Some(value) = value {
2283 let next = mz_sql_parser::ast::CreateSourceOption {
2284 name: mz_sql_parser::ast::CreateSourceOptionName::TimestampInterval,
2285 value: Some(WithOptionValue::Value(value)),
2286 };
2287 if let Some(idx) = pos {
2288 stmt.with_options[idx] = next;
2289 } else {
2290 stmt.with_options.push(next);
2291 }
2292 } else {
2293 if let Some(idx) = pos {
2294 stmt.with_options.swap_remove(idx);
2295 }
2296 }
2297 }
2298 _ => return Err(()),
2299 };
2300 Ok(())
2301 };
2302
2303 self.update_sql(update)?;
2304
2305 match self {
2307 CatalogItem::Source(source) => {
2308 match &mut source.data_source {
2309 DataSourceDesc::Ingestion { desc, .. }
2310 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
2311 desc.timestamp_interval = interval;
2312 }
2313 _ => return Err(()),
2314 }
2315 Ok(())
2316 }
2317 _ => Err(()),
2318 }
2319 }
2320
2321 pub fn add_column(
2322 &mut self,
2323 name: ColumnName,
2324 typ: SqlColumnType,
2325 sql: RawDataType,
2326 ) -> Result<RelationVersion, PlanError> {
2327 let CatalogItem::Table(table) = self else {
2328 return Err(PlanError::Unsupported {
2329 feature: "adding columns to a non-Table".to_string(),
2330 discussion_no: None,
2331 });
2332 };
2333 let next_version = table.desc.add_column(name.clone(), typ);
2334
2335 let update = |mut ast: &mut Statement<Raw>| match &mut ast {
2336 Statement::CreateTable(stmt) => {
2337 let version = ColumnOptionDef {
2338 name: None,
2339 option: ColumnOption::Versioned {
2340 action: ColumnVersioned::Added,
2341 version: next_version.into(),
2342 },
2343 };
2344 let column = ColumnDef {
2345 name: name.into(),
2346 data_type: sql,
2347 collation: None,
2348 options: vec![version],
2349 };
2350 stmt.columns.push(column);
2351 Ok(())
2352 }
2353 _ => Err(()),
2354 };
2355
2356 self.update_sql(update)
2357 .map_err(|()| PlanError::Unstructured("expected CREATE TABLE statement".to_string()))?;
2358 Ok(next_version)
2359 }
2360
2361 pub fn update_sql<F, T>(&mut self, f: F) -> Result<T, ()>
2364 where
2365 F: FnOnce(&mut Statement<Raw>) -> Result<T, ()>,
2366 {
2367 let create_sql = match self {
2368 CatalogItem::Table(Table { create_sql, .. })
2369 | CatalogItem::Type(Type { create_sql, .. })
2370 | CatalogItem::Source(Source { create_sql, .. }) => create_sql.as_mut(),
2371 CatalogItem::Sink(Sink { create_sql, .. })
2372 | CatalogItem::View(View { create_sql, .. })
2373 | CatalogItem::MaterializedView(MaterializedView { create_sql, .. })
2374 | CatalogItem::Index(Index { create_sql, .. })
2375 | CatalogItem::Secret(Secret { create_sql, .. })
2376 | CatalogItem::Connection(Connection { create_sql, .. }) => Some(create_sql),
2377 CatalogItem::Func(_) | CatalogItem::Log(_) => None,
2378 };
2379 let Some(create_sql) = create_sql else {
2380 return Err(());
2381 };
2382 let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
2383 .expect("non-system items must be parseable")
2384 .into_element()
2385 .ast;
2386 debug!("rewrite: {}", ast.to_ast_string_redacted());
2387 let t = f(&mut ast)?;
2388 *create_sql = ast.to_ast_string_stable();
2389 debug!("rewrote: {}", ast.to_ast_string_redacted());
2390 Ok(t)
2391 }
2392
2393 pub fn is_compute_object_on_cluster(&self) -> Option<ClusterId> {
2400 match self {
2401 CatalogItem::Index(index) => Some(index.cluster_id),
2402 CatalogItem::Table(_)
2403 | CatalogItem::Source(_)
2404 | CatalogItem::Log(_)
2405 | CatalogItem::View(_)
2406 | CatalogItem::MaterializedView(_)
2407 | CatalogItem::Sink(_)
2408 | CatalogItem::Type(_)
2409 | CatalogItem::Func(_)
2410 | CatalogItem::Secret(_)
2411 | CatalogItem::Connection(_) => None,
2412 }
2413 }
2414
2415 pub fn cluster_id(&self) -> Option<ClusterId> {
2416 match self {
2417 CatalogItem::MaterializedView(mv) => Some(mv.cluster_id),
2418 CatalogItem::Index(index) => Some(index.cluster_id),
2419 CatalogItem::Source(source) => match &source.data_source {
2420 DataSourceDesc::Ingestion { cluster_id, .. }
2421 | DataSourceDesc::OldSyntaxIngestion { cluster_id, .. } => Some(*cluster_id),
2422 DataSourceDesc::IngestionExport { .. } => None,
2426 DataSourceDesc::Webhook { cluster_id, .. } => Some(*cluster_id),
2427 DataSourceDesc::Introspection(_)
2428 | DataSourceDesc::Progress
2429 | DataSourceDesc::Catalog => None,
2430 },
2431 CatalogItem::Sink(sink) => Some(sink.cluster_id),
2432 CatalogItem::Table(_)
2433 | CatalogItem::Log(_)
2434 | CatalogItem::View(_)
2435 | CatalogItem::Type(_)
2436 | CatalogItem::Func(_)
2437 | CatalogItem::Secret(_)
2438 | CatalogItem::Connection(_) => None,
2439 }
2440 }
2441
2442 pub fn custom_logical_compaction_window(&self) -> Option<CompactionWindow> {
2445 match self {
2446 CatalogItem::Table(table) => table.custom_logical_compaction_window,
2447 CatalogItem::Source(source) => source.custom_logical_compaction_window,
2448 CatalogItem::Index(index) => index.custom_logical_compaction_window,
2449 CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
2450 CatalogItem::Log(_)
2451 | CatalogItem::View(_)
2452 | CatalogItem::Sink(_)
2453 | CatalogItem::Type(_)
2454 | CatalogItem::Func(_)
2455 | CatalogItem::Secret(_)
2456 | CatalogItem::Connection(_) => None,
2457 }
2458 }
2459
2460 pub fn custom_logical_compaction_window_mut(
2464 &mut self,
2465 ) -> Option<&mut Option<CompactionWindow>> {
2466 let cw = match self {
2467 CatalogItem::Table(table) => &mut table.custom_logical_compaction_window,
2468 CatalogItem::Source(source) => &mut source.custom_logical_compaction_window,
2469 CatalogItem::Index(index) => &mut index.custom_logical_compaction_window,
2470 CatalogItem::MaterializedView(mview) => &mut mview.custom_logical_compaction_window,
2471 CatalogItem::Log(_)
2472 | CatalogItem::View(_)
2473 | CatalogItem::Sink(_)
2474 | CatalogItem::Type(_)
2475 | CatalogItem::Func(_)
2476 | CatalogItem::Secret(_)
2477 | CatalogItem::Connection(_) => return None,
2478 };
2479 Some(cw)
2480 }
2481
2482 pub fn initial_logical_compaction_window(&self) -> Option<CompactionWindow> {
2490 let custom_logical_compaction_window = match self {
2491 CatalogItem::Table(_)
2492 | CatalogItem::Source(_)
2493 | CatalogItem::Index(_)
2494 | CatalogItem::MaterializedView(_) => self.custom_logical_compaction_window(),
2495 CatalogItem::Log(_)
2496 | CatalogItem::View(_)
2497 | CatalogItem::Sink(_)
2498 | CatalogItem::Type(_)
2499 | CatalogItem::Func(_)
2500 | CatalogItem::Secret(_)
2501 | CatalogItem::Connection(_) => return None,
2502 };
2503 Some(custom_logical_compaction_window.unwrap_or(CompactionWindow::Default))
2504 }
2505
2506 pub fn is_retained_metrics_object(&self) -> bool {
2510 match self {
2511 CatalogItem::Table(table) => table.is_retained_metrics_object,
2512 CatalogItem::Source(source) => source.is_retained_metrics_object,
2513 CatalogItem::Index(index) => index.is_retained_metrics_object,
2514 CatalogItem::Log(_)
2515 | CatalogItem::View(_)
2516 | CatalogItem::MaterializedView(_)
2517 | CatalogItem::Sink(_)
2518 | CatalogItem::Type(_)
2519 | CatalogItem::Func(_)
2520 | CatalogItem::Secret(_)
2521 | CatalogItem::Connection(_) => false,
2522 }
2523 }
2524
2525 pub fn to_serialized(&self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2526 match self {
2527 CatalogItem::Table(table) => {
2528 let create_sql = table
2529 .create_sql
2530 .clone()
2531 .expect("builtin tables cannot be serialized");
2532 let mut collections = table.collections.clone();
2533 let global_id = collections
2534 .remove(&RelationVersion::root())
2535 .expect("at least one version");
2536 (create_sql, global_id, collections)
2537 }
2538 CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2539 CatalogItem::Source(source) => {
2540 assert!(
2541 !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2542 "cannot serialize introspection/builtin sources",
2543 );
2544 let create_sql = source
2545 .create_sql
2546 .clone()
2547 .expect("builtin sources cannot be serialized");
2548 (create_sql, source.global_id, BTreeMap::new())
2549 }
2550 CatalogItem::View(view) => (view.create_sql.clone(), view.global_id, BTreeMap::new()),
2551 CatalogItem::MaterializedView(mview) => {
2552 let mut collections = mview.collections.clone();
2553 let global_id = collections
2554 .remove(&RelationVersion::root())
2555 .expect("at least one version");
2556 (mview.create_sql.clone(), global_id, collections)
2557 }
2558 CatalogItem::Index(index) => {
2559 (index.create_sql.clone(), index.global_id, BTreeMap::new())
2560 }
2561 CatalogItem::Sink(sink) => (sink.create_sql.clone(), sink.global_id, BTreeMap::new()),
2562 CatalogItem::Type(typ) => {
2563 let create_sql = typ
2564 .create_sql
2565 .clone()
2566 .expect("builtin types cannot be serialized");
2567 (create_sql, typ.global_id, BTreeMap::new())
2568 }
2569 CatalogItem::Secret(secret) => {
2570 (secret.create_sql.clone(), secret.global_id, BTreeMap::new())
2571 }
2572 CatalogItem::Connection(connection) => (
2573 connection.create_sql.clone(),
2574 connection.global_id,
2575 BTreeMap::new(),
2576 ),
2577 CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2578 }
2579 }
2580
2581 pub fn into_serialized(self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2582 match self {
2583 CatalogItem::Table(mut table) => {
2584 let create_sql = table
2585 .create_sql
2586 .expect("builtin tables cannot be serialized");
2587 let global_id = table
2588 .collections
2589 .remove(&RelationVersion::root())
2590 .expect("at least one version");
2591 (create_sql, global_id, table.collections)
2592 }
2593 CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2594 CatalogItem::Source(source) => {
2595 assert!(
2596 !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2597 "cannot serialize introspection/builtin sources",
2598 );
2599 let create_sql = source
2600 .create_sql
2601 .expect("builtin sources cannot be serialized");
2602 (create_sql, source.global_id, BTreeMap::new())
2603 }
2604 CatalogItem::View(view) => (view.create_sql, view.global_id, BTreeMap::new()),
2605 CatalogItem::MaterializedView(mut mview) => {
2606 let global_id = mview
2607 .collections
2608 .remove(&RelationVersion::root())
2609 .expect("at least one version");
2610 (mview.create_sql, global_id, mview.collections)
2611 }
2612 CatalogItem::Index(index) => (index.create_sql, index.global_id, BTreeMap::new()),
2613 CatalogItem::Sink(sink) => (sink.create_sql, sink.global_id, BTreeMap::new()),
2614 CatalogItem::Type(typ) => {
2615 let create_sql = typ.create_sql.expect("builtin types cannot be serialized");
2616 (create_sql, typ.global_id, BTreeMap::new())
2617 }
2618 CatalogItem::Secret(secret) => (secret.create_sql, secret.global_id, BTreeMap::new()),
2619 CatalogItem::Connection(connection) => {
2620 (connection.create_sql, connection.global_id, BTreeMap::new())
2621 }
2622 CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2623 }
2624 }
2625
2626 pub fn global_id_for_version(&self, version: RelationVersionSelector) -> Option<GlobalId> {
2629 let collections = match self {
2630 CatalogItem::MaterializedView(mv) => &mv.collections,
2631 CatalogItem::Table(table) => &table.collections,
2632 CatalogItem::Source(source) => return Some(source.global_id),
2633 CatalogItem::Log(log) => return Some(log.global_id),
2634 CatalogItem::View(view) => return Some(view.global_id),
2635 CatalogItem::Sink(sink) => return Some(sink.global_id),
2636 CatalogItem::Index(index) => return Some(index.global_id),
2637 CatalogItem::Type(ty) => return Some(ty.global_id),
2638 CatalogItem::Func(func) => return Some(func.global_id),
2639 CatalogItem::Secret(secret) => return Some(secret.global_id),
2640 CatalogItem::Connection(conn) => return Some(conn.global_id),
2641 };
2642 match version {
2643 RelationVersionSelector::Latest => collections.values().last().copied(),
2644 RelationVersionSelector::Specific(version) => collections.get(&version).copied(),
2645 }
2646 }
2647}
2648
2649impl CatalogEntry {
2650 pub fn relation_desc_latest(&self) -> Option<Cow<'_, RelationDesc>> {
2653 self.item.relation_desc(RelationVersionSelector::Latest)
2654 }
2655
2656 pub fn has_columns(&self) -> bool {
2658 match self.item() {
2659 CatalogItem::Type(Type { details, .. }) => {
2660 matches!(details.typ, CatalogType::Record { .. })
2661 }
2662 _ => self.relation_desc_latest().is_some(),
2663 }
2664 }
2665
2666 pub fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
2668 self.item.func(self)
2669 }
2670
2671 pub fn index(&self) -> Option<&Index> {
2673 match self.item() {
2674 CatalogItem::Index(idx) => Some(idx),
2675 _ => None,
2676 }
2677 }
2678
2679 pub fn materialized_view(&self) -> Option<&MaterializedView> {
2681 match self.item() {
2682 CatalogItem::MaterializedView(mv) => Some(mv),
2683 _ => None,
2684 }
2685 }
2686
2687 pub fn table(&self) -> Option<&Table> {
2689 match self.item() {
2690 CatalogItem::Table(tbl) => Some(tbl),
2691 _ => None,
2692 }
2693 }
2694
2695 pub fn source(&self) -> Option<&Source> {
2697 match self.item() {
2698 CatalogItem::Source(src) => Some(src),
2699 _ => None,
2700 }
2701 }
2702
2703 pub fn sink(&self) -> Option<&Sink> {
2705 match self.item() {
2706 CatalogItem::Sink(sink) => Some(sink),
2707 _ => None,
2708 }
2709 }
2710
2711 pub fn secret(&self) -> Option<&Secret> {
2713 match self.item() {
2714 CatalogItem::Secret(secret) => Some(secret),
2715 _ => None,
2716 }
2717 }
2718
2719 pub fn connection(&self) -> Result<&Connection, SqlCatalogError> {
2720 match self.item() {
2721 CatalogItem::Connection(connection) => Ok(connection),
2722 _ => {
2723 let db_name = match self.name().qualifiers.database_spec {
2724 ResolvedDatabaseSpecifier::Ambient => "".to_string(),
2725 ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
2726 };
2727 Err(SqlCatalogError::UnknownConnection(format!(
2728 "{}{}.{}",
2729 db_name,
2730 self.name().qualifiers.schema_spec,
2731 self.name().item
2732 )))
2733 }
2734 }
2735 }
2736
2737 pub fn source_desc(
2740 &self,
2741 ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
2742 self.item.source_desc(self)
2743 }
2744
2745 pub fn is_connection(&self) -> bool {
2747 matches!(self.item(), CatalogItem::Connection(_))
2748 }
2749
2750 pub fn is_table(&self) -> bool {
2752 matches!(self.item(), CatalogItem::Table(_))
2753 }
2754
2755 pub fn is_source(&self) -> bool {
2758 matches!(self.item(), CatalogItem::Source(_))
2759 }
2760
2761 pub fn subsource_details(
2764 &self,
2765 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
2766 match &self.item() {
2767 CatalogItem::Source(source) => match &source.data_source {
2768 DataSourceDesc::IngestionExport {
2769 ingestion_id,
2770 external_reference,
2771 details,
2772 data_config: _,
2773 } => Some((*ingestion_id, external_reference, details)),
2774 _ => None,
2775 },
2776 _ => None,
2777 }
2778 }
2779
2780 pub fn source_export_details(
2783 &self,
2784 ) -> Option<(
2785 CatalogItemId,
2786 &UnresolvedItemName,
2787 &SourceExportDetails,
2788 &SourceExportDataConfig<ReferencedConnection>,
2789 )> {
2790 match &self.item() {
2791 CatalogItem::Source(source) => match &source.data_source {
2792 DataSourceDesc::IngestionExport {
2793 ingestion_id,
2794 external_reference,
2795 details,
2796 data_config,
2797 } => Some((*ingestion_id, external_reference, details, data_config)),
2798 _ => None,
2799 },
2800 CatalogItem::Table(table) => match &table.data_source {
2801 TableDataSource::DataSource {
2802 desc:
2803 DataSourceDesc::IngestionExport {
2804 ingestion_id,
2805 external_reference,
2806 details,
2807 data_config,
2808 },
2809 timeline: _,
2810 } => Some((*ingestion_id, external_reference, details, data_config)),
2811 _ => None,
2812 },
2813 _ => None,
2814 }
2815 }
2816
2817 pub fn is_progress_source(&self) -> bool {
2819 self.item().is_progress_source()
2820 }
2821
2822 pub fn progress_id(&self) -> Option<CatalogItemId> {
2824 match &self.item() {
2825 CatalogItem::Source(source) => match &source.data_source {
2826 DataSourceDesc::Ingestion { .. } => Some(self.id),
2827 DataSourceDesc::OldSyntaxIngestion {
2828 progress_subsource, ..
2829 } => Some(*progress_subsource),
2830 DataSourceDesc::IngestionExport { .. }
2831 | DataSourceDesc::Introspection(_)
2832 | DataSourceDesc::Progress
2833 | DataSourceDesc::Webhook { .. }
2834 | DataSourceDesc::Catalog => None,
2835 },
2836 CatalogItem::Table(_)
2837 | CatalogItem::Log(_)
2838 | CatalogItem::View(_)
2839 | CatalogItem::MaterializedView(_)
2840 | CatalogItem::Sink(_)
2841 | CatalogItem::Index(_)
2842 | CatalogItem::Type(_)
2843 | CatalogItem::Func(_)
2844 | CatalogItem::Secret(_)
2845 | CatalogItem::Connection(_) => None,
2846 }
2847 }
2848
2849 pub fn is_sink(&self) -> bool {
2851 matches!(self.item(), CatalogItem::Sink(_))
2852 }
2853
2854 pub fn is_materialized_view(&self) -> bool {
2856 matches!(self.item(), CatalogItem::MaterializedView(_))
2857 }
2858
2859 pub fn is_view(&self) -> bool {
2861 matches!(self.item(), CatalogItem::View(_))
2862 }
2863
2864 pub fn is_secret(&self) -> bool {
2866 matches!(self.item(), CatalogItem::Secret(_))
2867 }
2868
2869 pub fn is_introspection_source(&self) -> bool {
2871 matches!(self.item(), CatalogItem::Log(_))
2872 }
2873
2874 pub fn is_index(&self) -> bool {
2876 matches!(self.item(), CatalogItem::Index(_))
2877 }
2878
2879 pub fn is_relation(&self) -> bool {
2881 mz_sql::catalog::ObjectType::from(self.item_type()).is_relation()
2882 }
2883
2884 pub fn references(&self) -> &ResolvedIds {
2887 self.item.references()
2888 }
2889
2890 pub fn uses(&self) -> BTreeSet<CatalogItemId> {
2896 self.item.uses()
2897 }
2898
2899 pub fn item(&self) -> &CatalogItem {
2901 &self.item
2902 }
2903
2904 pub fn item_mut(&mut self) -> &mut CatalogItem {
2907 &mut self.item
2908 }
2909
2910 pub fn id(&self) -> CatalogItemId {
2912 self.id
2913 }
2914
2915 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2917 self.item().global_ids()
2918 }
2919
2920 pub fn latest_global_id(&self) -> GlobalId {
2921 self.item().latest_global_id()
2922 }
2923
2924 pub fn oid(&self) -> u32 {
2926 self.oid
2927 }
2928
2929 pub fn name(&self) -> &QualifiedItemName {
2931 &self.name
2932 }
2933
2934 pub fn referenced_by(&self) -> &[CatalogItemId] {
2936 &self.referenced_by
2937 }
2938
2939 pub fn used_by(&self) -> &[CatalogItemId] {
2941 &self.used_by
2942 }
2943
2944 pub fn conn_id(&self) -> Option<&ConnectionId> {
2947 self.item.conn_id()
2948 }
2949
2950 pub fn owner_id(&self) -> &RoleId {
2952 &self.owner_id
2953 }
2954
2955 pub fn privileges(&self) -> &PrivilegeMap {
2957 &self.privileges
2958 }
2959
2960 pub fn comment_object_id(&self) -> CommentObjectId {
2962 use CatalogItemType::*;
2963 match self.item_type() {
2964 Table => CommentObjectId::Table(self.id),
2965 Source => CommentObjectId::Source(self.id),
2966 Sink => CommentObjectId::Sink(self.id),
2967 View => CommentObjectId::View(self.id),
2968 MaterializedView => CommentObjectId::MaterializedView(self.id),
2969 Index => CommentObjectId::Index(self.id),
2970 Func => CommentObjectId::Func(self.id),
2971 Connection => CommentObjectId::Connection(self.id),
2972 Type => CommentObjectId::Type(self.id),
2973 Secret => CommentObjectId::Secret(self.id),
2974 }
2975 }
2976}
2977
2978#[derive(Debug, Clone, Default)]
2979pub struct CommentsMap {
2980 map: BTreeMap<CommentObjectId, BTreeMap<Option<usize>, String>>,
2981}
2982
2983impl CommentsMap {
2984 pub fn update_comment(
2985 &mut self,
2986 object_id: CommentObjectId,
2987 sub_component: Option<usize>,
2988 comment: Option<String>,
2989 ) -> Option<String> {
2990 let object_comments = self.map.entry(object_id).or_default();
2991
2992 let (empty, prev) = if let Some(comment) = comment {
2994 let prev = object_comments.insert(sub_component, comment);
2995 (false, prev)
2996 } else {
2997 let prev = object_comments.remove(&sub_component);
2998 (object_comments.is_empty(), prev)
2999 };
3000
3001 if empty {
3003 self.map.remove(&object_id);
3004 }
3005
3006 prev
3008 }
3009
3010 pub fn drop_comments(
3016 &mut self,
3017 object_ids: &BTreeSet<CommentObjectId>,
3018 ) -> Vec<(CommentObjectId, Option<usize>, String)> {
3019 let mut removed_comments = Vec::new();
3020
3021 for object_id in object_ids {
3022 if let Some(comments) = self.map.remove(object_id) {
3023 let removed = comments
3024 .into_iter()
3025 .map(|(sub_comp, comment)| (object_id.clone(), sub_comp, comment));
3026 removed_comments.extend(removed);
3027 }
3028 }
3029
3030 removed_comments
3031 }
3032
3033 pub fn iter(&self) -> impl Iterator<Item = (CommentObjectId, Option<usize>, &str)> {
3034 self.map
3035 .iter()
3036 .map(|(id, comments)| {
3037 comments
3038 .iter()
3039 .map(|(pos, comment)| (*id, *pos, comment.as_str()))
3040 })
3041 .flatten()
3042 }
3043
3044 pub fn get_object_comments(
3045 &self,
3046 object_id: CommentObjectId,
3047 ) -> Option<&BTreeMap<Option<usize>, String>> {
3048 self.map.get(&object_id)
3049 }
3050}
3051
3052impl Serialize for CommentsMap {
3053 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
3054 where
3055 S: serde::Serializer,
3056 {
3057 let comment_count = self
3058 .map
3059 .iter()
3060 .map(|(_object_id, comments)| comments.len())
3061 .sum();
3062
3063 let mut seq = serializer.serialize_seq(Some(comment_count))?;
3064 for (object_id, sub) in &self.map {
3065 for (sub_component, comment) in sub {
3066 seq.serialize_element(&(
3067 format!("{object_id:?}"),
3068 format!("{sub_component:?}"),
3069 comment,
3070 ))?;
3071 }
3072 }
3073 seq.end()
3074 }
3075}
3076
3077#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
3078pub struct DefaultPrivileges {
3079 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
3080 privileges: BTreeMap<DefaultPrivilegeObject, RoleDefaultPrivileges>,
3081}
3082
3083#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
3086struct RoleDefaultPrivileges(
3087 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
3089 BTreeMap<RoleId, DefaultPrivilegeAclItem>,
3090);
3091
3092impl Deref for RoleDefaultPrivileges {
3093 type Target = BTreeMap<RoleId, DefaultPrivilegeAclItem>;
3094
3095 fn deref(&self) -> &Self::Target {
3096 &self.0
3097 }
3098}
3099
3100impl DerefMut for RoleDefaultPrivileges {
3101 fn deref_mut(&mut self) -> &mut Self::Target {
3102 &mut self.0
3103 }
3104}
3105
3106impl DefaultPrivileges {
3107 pub fn grant(&mut self, object: DefaultPrivilegeObject, privilege: DefaultPrivilegeAclItem) {
3109 if privilege.acl_mode.is_empty() {
3110 return;
3111 }
3112
3113 let privileges = self.privileges.entry(object).or_default();
3114 if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
3115 default_privilege.acl_mode |= privilege.acl_mode;
3116 } else {
3117 privileges.insert(privilege.grantee, privilege);
3118 }
3119 }
3120
3121 pub fn revoke(&mut self, object: &DefaultPrivilegeObject, privilege: &DefaultPrivilegeAclItem) {
3123 if let Some(privileges) = self.privileges.get_mut(object) {
3124 if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
3125 default_privilege.acl_mode =
3126 default_privilege.acl_mode.difference(privilege.acl_mode);
3127 if default_privilege.acl_mode.is_empty() {
3128 privileges.remove(&privilege.grantee);
3129 }
3130 }
3131 if privileges.is_empty() {
3132 self.privileges.remove(object);
3133 }
3134 }
3135 }
3136
3137 pub fn get_privileges_for_grantee(
3140 &self,
3141 object: &DefaultPrivilegeObject,
3142 grantee: &RoleId,
3143 ) -> Option<&AclMode> {
3144 self.privileges
3145 .get(object)
3146 .and_then(|privileges| privileges.get(grantee))
3147 .map(|privilege| &privilege.acl_mode)
3148 }
3149
3150 pub fn get_applicable_privileges(
3152 &self,
3153 role_id: RoleId,
3154 database_id: Option<DatabaseId>,
3155 schema_id: Option<SchemaId>,
3156 object_type: mz_sql::catalog::ObjectType,
3157 ) -> impl Iterator<Item = DefaultPrivilegeAclItem> + '_ {
3158 let privilege_object_type = if object_type.is_relation() {
3162 mz_sql::catalog::ObjectType::Table
3163 } else {
3164 object_type
3165 };
3166 let valid_acl_mode = rbac::all_object_privileges(SystemObjectType::Object(object_type));
3167
3168 [
3172 DefaultPrivilegeObject {
3173 role_id,
3174 database_id,
3175 schema_id,
3176 object_type: privilege_object_type,
3177 },
3178 DefaultPrivilegeObject {
3179 role_id,
3180 database_id,
3181 schema_id: None,
3182 object_type: privilege_object_type,
3183 },
3184 DefaultPrivilegeObject {
3185 role_id,
3186 database_id: None,
3187 schema_id: None,
3188 object_type: privilege_object_type,
3189 },
3190 DefaultPrivilegeObject {
3191 role_id: RoleId::Public,
3192 database_id,
3193 schema_id,
3194 object_type: privilege_object_type,
3195 },
3196 DefaultPrivilegeObject {
3197 role_id: RoleId::Public,
3198 database_id,
3199 schema_id: None,
3200 object_type: privilege_object_type,
3201 },
3202 DefaultPrivilegeObject {
3203 role_id: RoleId::Public,
3204 database_id: None,
3205 schema_id: None,
3206 object_type: privilege_object_type,
3207 },
3208 ]
3209 .into_iter()
3210 .filter_map(|object| self.privileges.get(&object))
3211 .flat_map(|acl_map| acl_map.values())
3212 .fold(
3214 BTreeMap::new(),
3215 |mut accum, DefaultPrivilegeAclItem { grantee, acl_mode }| {
3216 let accum_acl_mode = accum.entry(grantee).or_insert_with(AclMode::empty);
3217 *accum_acl_mode |= *acl_mode;
3218 accum
3219 },
3220 )
3221 .into_iter()
3222 .map(move |(grantee, acl_mode)| (grantee, acl_mode & valid_acl_mode))
3227 .filter(|(_, acl_mode)| !acl_mode.is_empty())
3229 .map(|(grantee, acl_mode)| DefaultPrivilegeAclItem {
3230 grantee: *grantee,
3231 acl_mode,
3232 })
3233 }
3234
3235 pub fn iter(
3236 &self,
3237 ) -> impl Iterator<
3238 Item = (
3239 &DefaultPrivilegeObject,
3240 impl Iterator<Item = &DefaultPrivilegeAclItem>,
3241 ),
3242 > {
3243 self.privileges
3244 .iter()
3245 .map(|(object, acl_map)| (object, acl_map.values()))
3246 }
3247}
3248
3249#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3250pub struct ClusterConfig {
3251 pub variant: ClusterVariant,
3252 pub workload_class: Option<String>,
3253}
3254
3255impl ClusterConfig {
3256 pub fn features(&self) -> Option<&OptimizerFeatureOverrides> {
3257 match &self.variant {
3258 ClusterVariant::Managed(managed) => Some(&managed.optimizer_feature_overrides),
3259 ClusterVariant::Unmanaged => None,
3260 }
3261 }
3262}
3263
3264impl From<ClusterConfig> for durable::ClusterConfig {
3265 fn from(config: ClusterConfig) -> Self {
3266 Self {
3267 variant: config.variant.into(),
3268 workload_class: config.workload_class,
3269 }
3270 }
3271}
3272
3273impl From<durable::ClusterConfig> for ClusterConfig {
3274 fn from(config: durable::ClusterConfig) -> Self {
3275 Self {
3276 variant: config.variant.into(),
3277 workload_class: config.workload_class,
3278 }
3279 }
3280}
3281
3282#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3283pub struct ClusterVariantManaged {
3284 pub size: String,
3285 pub availability_zones: Vec<String>,
3286 pub logging: ReplicaLogging,
3287 pub replication_factor: u32,
3288 pub optimizer_feature_overrides: OptimizerFeatureOverrides,
3289 pub schedule: ClusterSchedule,
3290}
3291
3292impl From<ClusterVariantManaged> for durable::ClusterVariantManaged {
3293 fn from(managed: ClusterVariantManaged) -> Self {
3294 Self {
3295 size: managed.size,
3296 availability_zones: managed.availability_zones,
3297 logging: managed.logging,
3298 replication_factor: managed.replication_factor,
3299 optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3300 schedule: managed.schedule,
3301 }
3302 }
3303}
3304
3305impl From<durable::ClusterVariantManaged> for ClusterVariantManaged {
3306 fn from(managed: durable::ClusterVariantManaged) -> Self {
3307 Self {
3308 size: managed.size,
3309 availability_zones: managed.availability_zones,
3310 logging: managed.logging,
3311 replication_factor: managed.replication_factor,
3312 optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3313 schedule: managed.schedule,
3314 }
3315 }
3316}
3317
3318#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3319pub enum ClusterVariant {
3320 Managed(ClusterVariantManaged),
3321 Unmanaged,
3322}
3323
3324impl From<ClusterVariant> for durable::ClusterVariant {
3325 fn from(variant: ClusterVariant) -> Self {
3326 match variant {
3327 ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3328 ClusterVariant::Unmanaged => Self::Unmanaged,
3329 }
3330 }
3331}
3332
3333impl From<durable::ClusterVariant> for ClusterVariant {
3334 fn from(variant: durable::ClusterVariant) -> Self {
3335 match variant {
3336 durable::ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3337 durable::ClusterVariant::Unmanaged => Self::Unmanaged,
3338 }
3339 }
3340}
3341
3342impl mz_sql::catalog::CatalogDatabase for Database {
3343 fn name(&self) -> &str {
3344 &self.name
3345 }
3346
3347 fn id(&self) -> DatabaseId {
3348 self.id
3349 }
3350
3351 fn has_schemas(&self) -> bool {
3352 !self.schemas_by_name.is_empty()
3353 }
3354
3355 fn schema_ids(&self) -> &BTreeMap<String, SchemaId> {
3356 &self.schemas_by_name
3357 }
3358
3359 #[allow(clippy::as_conversions)]
3361 fn schemas(&self) -> Vec<&dyn CatalogSchema> {
3362 self.schemas_by_id
3363 .values()
3364 .map(|schema| schema as &dyn CatalogSchema)
3365 .collect()
3366 }
3367
3368 fn owner_id(&self) -> RoleId {
3369 self.owner_id
3370 }
3371
3372 fn privileges(&self) -> &PrivilegeMap {
3373 &self.privileges
3374 }
3375}
3376
3377impl mz_sql::catalog::CatalogSchema for Schema {
3378 fn database(&self) -> &ResolvedDatabaseSpecifier {
3379 &self.name.database
3380 }
3381
3382 fn name(&self) -> &QualifiedSchemaName {
3383 &self.name
3384 }
3385
3386 fn id(&self) -> &SchemaSpecifier {
3387 &self.id
3388 }
3389
3390 fn has_items(&self) -> bool {
3391 !self.items.is_empty()
3392 }
3393
3394 fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_> {
3395 Box::new(
3396 self.items
3397 .values()
3398 .chain(self.functions.values())
3399 .chain(self.types.values())
3400 .copied(),
3401 )
3402 }
3403
3404 fn owner_id(&self) -> RoleId {
3405 self.owner_id
3406 }
3407
3408 fn privileges(&self) -> &PrivilegeMap {
3409 &self.privileges
3410 }
3411}
3412
3413impl mz_sql::catalog::CatalogRole for Role {
3414 fn name(&self) -> &str {
3415 &self.name
3416 }
3417
3418 fn id(&self) -> RoleId {
3419 self.id
3420 }
3421
3422 fn membership(&self) -> &BTreeMap<RoleId, RoleId> {
3423 &self.membership.map
3424 }
3425
3426 fn attributes(&self) -> &RoleAttributes {
3427 &self.attributes
3428 }
3429
3430 fn vars(&self) -> &BTreeMap<String, OwnedVarInput> {
3431 &self.vars.map
3432 }
3433}
3434
3435impl mz_sql::catalog::CatalogNetworkPolicy for NetworkPolicy {
3436 fn name(&self) -> &str {
3437 &self.name
3438 }
3439
3440 fn id(&self) -> NetworkPolicyId {
3441 self.id
3442 }
3443
3444 fn owner_id(&self) -> RoleId {
3445 self.owner_id
3446 }
3447
3448 fn privileges(&self) -> &PrivilegeMap {
3449 &self.privileges
3450 }
3451}
3452
3453impl mz_sql::catalog::CatalogCluster<'_> for Cluster {
3454 fn name(&self) -> &str {
3455 &self.name
3456 }
3457
3458 fn id(&self) -> ClusterId {
3459 self.id
3460 }
3461
3462 fn bound_objects(&self) -> &BTreeSet<CatalogItemId> {
3463 &self.bound_objects
3464 }
3465
3466 fn replica_ids(&self) -> &BTreeMap<String, ReplicaId> {
3467 &self.replica_id_by_name_
3468 }
3469
3470 #[allow(clippy::as_conversions)]
3472 fn replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>> {
3473 self.replicas()
3474 .map(|replica| replica as &dyn CatalogClusterReplica)
3475 .collect()
3476 }
3477
3478 fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica<'_> {
3479 self.replica(id).expect("catalog out of sync")
3480 }
3481
3482 fn owner_id(&self) -> RoleId {
3483 self.owner_id
3484 }
3485
3486 fn privileges(&self) -> &PrivilegeMap {
3487 &self.privileges
3488 }
3489
3490 fn is_managed(&self) -> bool {
3491 self.is_managed()
3492 }
3493
3494 fn managed_size(&self) -> Option<&str> {
3495 match &self.config.variant {
3496 ClusterVariant::Managed(ClusterVariantManaged { size, .. }) => Some(size),
3497 ClusterVariant::Unmanaged => None,
3498 }
3499 }
3500
3501 fn schedule(&self) -> Option<&ClusterSchedule> {
3502 match &self.config.variant {
3503 ClusterVariant::Managed(ClusterVariantManaged { schedule, .. }) => Some(schedule),
3504 ClusterVariant::Unmanaged => None,
3505 }
3506 }
3507
3508 fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
3509 self.try_to_plan()
3510 }
3511}
3512
3513impl mz_sql::catalog::CatalogClusterReplica<'_> for ClusterReplica {
3514 fn name(&self) -> &str {
3515 &self.name
3516 }
3517
3518 fn cluster_id(&self) -> ClusterId {
3519 self.cluster_id
3520 }
3521
3522 fn replica_id(&self) -> ReplicaId {
3523 self.replica_id
3524 }
3525
3526 fn owner_id(&self) -> RoleId {
3527 self.owner_id
3528 }
3529
3530 fn internal(&self) -> bool {
3531 self.config.location.internal()
3532 }
3533}
3534
3535impl mz_sql::catalog::CatalogItem for CatalogEntry {
3536 fn name(&self) -> &QualifiedItemName {
3537 self.name()
3538 }
3539
3540 fn id(&self) -> CatalogItemId {
3541 self.id()
3542 }
3543
3544 fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
3545 Box::new(self.global_ids())
3546 }
3547
3548 fn oid(&self) -> u32 {
3549 self.oid()
3550 }
3551
3552 fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
3553 self.func()
3554 }
3555
3556 fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
3557 self.source_desc()
3558 }
3559
3560 fn connection(
3561 &self,
3562 ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
3563 {
3564 Ok(self.connection()?.details.to_connection())
3565 }
3566
3567 fn create_sql(&self) -> &str {
3568 match self.item() {
3569 CatalogItem::Table(Table { create_sql, .. }) => {
3570 create_sql.as_deref().unwrap_or("<builtin>")
3571 }
3572 CatalogItem::Source(Source { create_sql, .. }) => {
3573 create_sql.as_deref().unwrap_or("<builtin>")
3574 }
3575 CatalogItem::Sink(Sink { create_sql, .. }) => create_sql,
3576 CatalogItem::View(View { create_sql, .. }) => create_sql,
3577 CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) => create_sql,
3578 CatalogItem::Index(Index { create_sql, .. }) => create_sql,
3579 CatalogItem::Type(Type { create_sql, .. }) => {
3580 create_sql.as_deref().unwrap_or("<builtin>")
3581 }
3582 CatalogItem::Secret(Secret { create_sql, .. }) => create_sql,
3583 CatalogItem::Connection(Connection { create_sql, .. }) => create_sql,
3584 CatalogItem::Func(_) => "<builtin>",
3585 CatalogItem::Log(_) => "<builtin>",
3586 }
3587 }
3588
3589 fn item_type(&self) -> SqlCatalogItemType {
3590 self.item().typ()
3591 }
3592
3593 fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
3594 if let CatalogItem::Index(Index { keys, on, .. }) = self.item() {
3595 Some((keys, *on))
3596 } else {
3597 None
3598 }
3599 }
3600
3601 fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
3602 if let CatalogItem::Table(Table {
3603 data_source: TableDataSource::TableWrites { defaults },
3604 ..
3605 }) = self.item()
3606 {
3607 Some(defaults.as_slice())
3608 } else {
3609 None
3610 }
3611 }
3612
3613 fn replacement_target(&self) -> Option<CatalogItemId> {
3614 if let CatalogItem::MaterializedView(mv) = self.item() {
3615 mv.replacement_target
3616 } else {
3617 None
3618 }
3619 }
3620
3621 fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
3622 if let CatalogItem::Type(Type { details, .. }) = self.item() {
3623 Some(details)
3624 } else {
3625 None
3626 }
3627 }
3628
3629 fn references(&self) -> &ResolvedIds {
3630 self.references()
3631 }
3632
3633 fn uses(&self) -> BTreeSet<CatalogItemId> {
3634 self.uses()
3635 }
3636
3637 fn referenced_by(&self) -> &[CatalogItemId] {
3638 self.referenced_by()
3639 }
3640
3641 fn used_by(&self) -> &[CatalogItemId] {
3642 self.used_by()
3643 }
3644
3645 fn subsource_details(
3646 &self,
3647 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
3648 self.subsource_details()
3649 }
3650
3651 fn source_export_details(
3652 &self,
3653 ) -> Option<(
3654 CatalogItemId,
3655 &UnresolvedItemName,
3656 &SourceExportDetails,
3657 &SourceExportDataConfig<ReferencedConnection>,
3658 )> {
3659 self.source_export_details()
3660 }
3661
3662 fn is_progress_source(&self) -> bool {
3663 self.is_progress_source()
3664 }
3665
3666 fn progress_id(&self) -> Option<CatalogItemId> {
3667 self.progress_id()
3668 }
3669
3670 fn owner_id(&self) -> RoleId {
3671 self.owner_id
3672 }
3673
3674 fn privileges(&self) -> &PrivilegeMap {
3675 &self.privileges
3676 }
3677
3678 fn cluster_id(&self) -> Option<ClusterId> {
3679 self.item().cluster_id()
3680 }
3681
3682 fn at_version(
3683 &self,
3684 version: RelationVersionSelector,
3685 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
3686 Box::new(CatalogCollectionEntry {
3687 entry: self.clone(),
3688 version,
3689 })
3690 }
3691
3692 fn latest_version(&self) -> Option<RelationVersion> {
3693 self.table().map(|t| t.desc.latest_version())
3694 }
3695}
3696
3697#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3699pub struct StateUpdate {
3700 pub kind: StateUpdateKind,
3701 pub ts: Timestamp,
3702 pub diff: StateDiff,
3703}
3704
3705#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3709pub enum StateUpdateKind {
3710 Role(durable::objects::Role),
3711 RoleAuth(durable::objects::RoleAuth),
3712 Database(durable::objects::Database),
3713 Schema(durable::objects::Schema),
3714 DefaultPrivilege(durable::objects::DefaultPrivilege),
3715 SystemPrivilege(MzAclItem),
3716 SystemConfiguration(durable::objects::SystemConfiguration),
3717 Cluster(durable::objects::Cluster),
3718 NetworkPolicy(durable::objects::NetworkPolicy),
3719 IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3720 ClusterReplica(durable::objects::ClusterReplica),
3721 SourceReferences(durable::objects::SourceReferences),
3722 SystemObjectMapping(durable::objects::SystemObjectMapping),
3723 TemporaryItem(TemporaryItem),
3727 Item(durable::objects::Item),
3728 Comment(durable::objects::Comment),
3729 AuditLog(durable::objects::AuditLog),
3730 StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3732 UnfinalizedShard(durable::objects::UnfinalizedShard),
3733}
3734
3735#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
3737pub enum StateDiff {
3738 Retraction,
3739 Addition,
3740}
3741
3742impl From<StateDiff> for Diff {
3743 fn from(diff: StateDiff) -> Self {
3744 match diff {
3745 StateDiff::Retraction => Diff::MINUS_ONE,
3746 StateDiff::Addition => Diff::ONE,
3747 }
3748 }
3749}
3750impl TryFrom<Diff> for StateDiff {
3751 type Error = String;
3752
3753 fn try_from(diff: Diff) -> Result<Self, Self::Error> {
3754 match diff {
3755 Diff::MINUS_ONE => Ok(Self::Retraction),
3756 Diff::ONE => Ok(Self::Addition),
3757 diff => Err(format!("invalid diff {diff}")),
3758 }
3759 }
3760}
3761
3762#[derive(Debug, Clone, Ord, PartialOrd, PartialEq, Eq)]
3764pub struct TemporaryItem {
3765 pub id: CatalogItemId,
3766 pub oid: u32,
3767 pub global_id: GlobalId,
3768 pub schema_id: SchemaId,
3769 pub name: String,
3770 pub conn_id: Option<ConnectionId>,
3771 pub create_sql: String,
3772 pub owner_id: RoleId,
3773 pub privileges: Vec<MzAclItem>,
3774 pub extra_versions: BTreeMap<RelationVersion, GlobalId>,
3775}
3776
3777impl From<CatalogEntry> for TemporaryItem {
3778 fn from(entry: CatalogEntry) -> Self {
3779 let conn_id = entry.conn_id().cloned();
3780 let (create_sql, global_id, extra_versions) = entry.item.to_serialized();
3781
3782 TemporaryItem {
3783 id: entry.id,
3784 oid: entry.oid,
3785 global_id,
3786 schema_id: entry.name.qualifiers.schema_spec.into(),
3787 name: entry.name.item,
3788 conn_id,
3789 create_sql,
3790 owner_id: entry.owner_id,
3791 privileges: entry.privileges.into_all_values().collect(),
3792 extra_versions,
3793 }
3794 }
3795}
3796
3797impl TemporaryItem {
3798 pub fn item_type(&self) -> CatalogItemType {
3799 item_type(&self.create_sql)
3800 }
3801}
3802
3803#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
3805pub enum BootstrapStateUpdateKind {
3806 Role(durable::objects::Role),
3807 RoleAuth(durable::objects::RoleAuth),
3808 Database(durable::objects::Database),
3809 Schema(durable::objects::Schema),
3810 DefaultPrivilege(durable::objects::DefaultPrivilege),
3811 SystemPrivilege(MzAclItem),
3812 SystemConfiguration(durable::objects::SystemConfiguration),
3813 Cluster(durable::objects::Cluster),
3814 NetworkPolicy(durable::objects::NetworkPolicy),
3815 IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3816 ClusterReplica(durable::objects::ClusterReplica),
3817 SourceReferences(durable::objects::SourceReferences),
3818 SystemObjectMapping(durable::objects::SystemObjectMapping),
3819 Item(durable::objects::Item),
3820 Comment(durable::objects::Comment),
3821 AuditLog(durable::objects::AuditLog),
3822 StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3824 UnfinalizedShard(durable::objects::UnfinalizedShard),
3825}
3826
3827impl From<BootstrapStateUpdateKind> for StateUpdateKind {
3828 fn from(value: BootstrapStateUpdateKind) -> Self {
3829 match value {
3830 BootstrapStateUpdateKind::Role(kind) => StateUpdateKind::Role(kind),
3831 BootstrapStateUpdateKind::RoleAuth(kind) => StateUpdateKind::RoleAuth(kind),
3832 BootstrapStateUpdateKind::Database(kind) => StateUpdateKind::Database(kind),
3833 BootstrapStateUpdateKind::Schema(kind) => StateUpdateKind::Schema(kind),
3834 BootstrapStateUpdateKind::DefaultPrivilege(kind) => {
3835 StateUpdateKind::DefaultPrivilege(kind)
3836 }
3837 BootstrapStateUpdateKind::SystemPrivilege(kind) => {
3838 StateUpdateKind::SystemPrivilege(kind)
3839 }
3840 BootstrapStateUpdateKind::SystemConfiguration(kind) => {
3841 StateUpdateKind::SystemConfiguration(kind)
3842 }
3843 BootstrapStateUpdateKind::SourceReferences(kind) => {
3844 StateUpdateKind::SourceReferences(kind)
3845 }
3846 BootstrapStateUpdateKind::Cluster(kind) => StateUpdateKind::Cluster(kind),
3847 BootstrapStateUpdateKind::NetworkPolicy(kind) => StateUpdateKind::NetworkPolicy(kind),
3848 BootstrapStateUpdateKind::IntrospectionSourceIndex(kind) => {
3849 StateUpdateKind::IntrospectionSourceIndex(kind)
3850 }
3851 BootstrapStateUpdateKind::ClusterReplica(kind) => StateUpdateKind::ClusterReplica(kind),
3852 BootstrapStateUpdateKind::SystemObjectMapping(kind) => {
3853 StateUpdateKind::SystemObjectMapping(kind)
3854 }
3855 BootstrapStateUpdateKind::Item(kind) => StateUpdateKind::Item(kind),
3856 BootstrapStateUpdateKind::Comment(kind) => StateUpdateKind::Comment(kind),
3857 BootstrapStateUpdateKind::AuditLog(kind) => StateUpdateKind::AuditLog(kind),
3858 BootstrapStateUpdateKind::StorageCollectionMetadata(kind) => {
3859 StateUpdateKind::StorageCollectionMetadata(kind)
3860 }
3861 BootstrapStateUpdateKind::UnfinalizedShard(kind) => {
3862 StateUpdateKind::UnfinalizedShard(kind)
3863 }
3864 }
3865 }
3866}
3867
3868impl TryFrom<StateUpdateKind> for BootstrapStateUpdateKind {
3869 type Error = TemporaryItem;
3870
3871 fn try_from(value: StateUpdateKind) -> Result<Self, Self::Error> {
3872 match value {
3873 StateUpdateKind::Role(kind) => Ok(BootstrapStateUpdateKind::Role(kind)),
3874 StateUpdateKind::RoleAuth(kind) => Ok(BootstrapStateUpdateKind::RoleAuth(kind)),
3875 StateUpdateKind::Database(kind) => Ok(BootstrapStateUpdateKind::Database(kind)),
3876 StateUpdateKind::Schema(kind) => Ok(BootstrapStateUpdateKind::Schema(kind)),
3877 StateUpdateKind::DefaultPrivilege(kind) => {
3878 Ok(BootstrapStateUpdateKind::DefaultPrivilege(kind))
3879 }
3880 StateUpdateKind::SystemPrivilege(kind) => {
3881 Ok(BootstrapStateUpdateKind::SystemPrivilege(kind))
3882 }
3883 StateUpdateKind::SystemConfiguration(kind) => {
3884 Ok(BootstrapStateUpdateKind::SystemConfiguration(kind))
3885 }
3886 StateUpdateKind::Cluster(kind) => Ok(BootstrapStateUpdateKind::Cluster(kind)),
3887 StateUpdateKind::NetworkPolicy(kind) => {
3888 Ok(BootstrapStateUpdateKind::NetworkPolicy(kind))
3889 }
3890 StateUpdateKind::IntrospectionSourceIndex(kind) => {
3891 Ok(BootstrapStateUpdateKind::IntrospectionSourceIndex(kind))
3892 }
3893 StateUpdateKind::ClusterReplica(kind) => {
3894 Ok(BootstrapStateUpdateKind::ClusterReplica(kind))
3895 }
3896 StateUpdateKind::SourceReferences(kind) => {
3897 Ok(BootstrapStateUpdateKind::SourceReferences(kind))
3898 }
3899 StateUpdateKind::SystemObjectMapping(kind) => {
3900 Ok(BootstrapStateUpdateKind::SystemObjectMapping(kind))
3901 }
3902 StateUpdateKind::TemporaryItem(kind) => Err(kind),
3903 StateUpdateKind::Item(kind) => Ok(BootstrapStateUpdateKind::Item(kind)),
3904 StateUpdateKind::Comment(kind) => Ok(BootstrapStateUpdateKind::Comment(kind)),
3905 StateUpdateKind::AuditLog(kind) => Ok(BootstrapStateUpdateKind::AuditLog(kind)),
3906 StateUpdateKind::StorageCollectionMetadata(kind) => {
3907 Ok(BootstrapStateUpdateKind::StorageCollectionMetadata(kind))
3908 }
3909 StateUpdateKind::UnfinalizedShard(kind) => {
3910 Ok(BootstrapStateUpdateKind::UnfinalizedShard(kind))
3911 }
3912 }
3913 }
3914}