1use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use chrono::{DateTime, Utc};
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_compute_client::logging::LogVariant;
24use mz_controller::clusters::{ClusterRole, ClusterStatus, ReplicaConfig, ReplicaLogging};
25use mz_controller_types::{ClusterId, ReplicaId};
26use mz_expr::{MirScalarExpr, OptimizedMirRelationExpr};
27use mz_ore::collections::CollectionExt;
28use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
29use mz_repr::network_policy_id::NetworkPolicyId;
30use mz_repr::optimize::OptimizerFeatureOverrides;
31use mz_repr::refresh_schedule::RefreshSchedule;
32use mz_repr::role_id::RoleId;
33use mz_repr::{
34 CatalogItemId, ColumnName, Diff, GlobalId, RelationDesc, RelationVersion,
35 RelationVersionSelector, SqlColumnType, Timestamp, VersionedRelationDesc,
36};
37use mz_sql::ast::display::AstDisplay;
38use mz_sql::ast::{
39 ColumnDef, ColumnOption, ColumnOptionDef, ColumnVersioned, Expr, Raw, RawDataType, Statement,
40 UnresolvedItemName, Value, WithOptionValue,
41};
42use mz_sql::catalog::{
43 CatalogClusterReplica, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem,
44 CatalogItemType as SqlCatalogItemType, CatalogItemType, CatalogSchema, CatalogType,
45 CatalogTypeDetails, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference,
46 RoleAttributes, RoleMembership, RoleVars, SystemObjectType,
47};
48use mz_sql::names::{
49 Aug, CommentObjectId, DatabaseId, DependencyIds, FullItemName, QualifiedItemName,
50 QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier,
51};
52use mz_sql::plan::{
53 ClusterSchedule, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, ConnectionDetails,
54 CreateClusterManagedPlan, CreateClusterPlan, CreateClusterVariant, CreateSourcePlan,
55 HirRelationExpr, NetworkPolicyRule, PlanError, WebhookBodyFormat, WebhookHeaders,
56 WebhookValidation,
57};
58use mz_sql::rbac;
59use mz_sql::session::vars::OwnedVarInput;
60use mz_storage_client::controller::IntrospectionType;
61use mz_storage_types::connections::inline::ReferencedConnection;
62use mz_storage_types::sinks::{SinkEnvelope, StorageSinkConnection};
63use mz_storage_types::sources::load_generator::LoadGenerator;
64use mz_storage_types::sources::{
65 GenericSourceConnection, SourceConnection, SourceDesc, SourceEnvelope, SourceExportDataConfig,
66 SourceExportDetails, Timeline,
67};
68use serde::ser::SerializeSeq;
69use serde::{Deserialize, Serialize};
70use timely::progress::Antichain;
71use tracing::debug;
72
73use crate::builtin::{MZ_CATALOG_SERVER_CLUSTER, MZ_SYSTEM_CLUSTER};
74use crate::durable;
75use crate::durable::objects::item_type;
76
77pub trait UpdateFrom<T>: From<T> {
79 fn update_from(&mut self, from: T);
80}
81
82#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
83pub struct Database {
84 pub name: String,
85 pub id: DatabaseId,
86 pub oid: u32,
87 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
88 pub schemas_by_id: BTreeMap<SchemaId, Schema>,
89 pub schemas_by_name: BTreeMap<String, SchemaId>,
90 pub owner_id: RoleId,
91 pub privileges: PrivilegeMap,
92}
93
94impl From<Database> for durable::Database {
95 fn from(database: Database) -> durable::Database {
96 durable::Database {
97 id: database.id,
98 oid: database.oid,
99 name: database.name,
100 owner_id: database.owner_id,
101 privileges: database.privileges.into_all_values().collect(),
102 }
103 }
104}
105
106impl From<durable::Database> for Database {
107 fn from(
108 durable::Database {
109 id,
110 oid,
111 name,
112 owner_id,
113 privileges,
114 }: durable::Database,
115 ) -> Database {
116 Database {
117 id,
118 oid,
119 schemas_by_id: BTreeMap::new(),
120 schemas_by_name: BTreeMap::new(),
121 name,
122 owner_id,
123 privileges: PrivilegeMap::from_mz_acl_items(privileges),
124 }
125 }
126}
127
128impl UpdateFrom<durable::Database> for Database {
129 fn update_from(
130 &mut self,
131 durable::Database {
132 id,
133 oid,
134 name,
135 owner_id,
136 privileges,
137 }: durable::Database,
138 ) {
139 self.id = id;
140 self.oid = oid;
141 self.name = name;
142 self.owner_id = owner_id;
143 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
144 }
145}
146
147#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
148pub struct Schema {
149 pub name: QualifiedSchemaName,
150 pub id: SchemaSpecifier,
151 pub oid: u32,
152 pub items: BTreeMap<String, CatalogItemId>,
153 pub functions: BTreeMap<String, CatalogItemId>,
154 pub types: BTreeMap<String, CatalogItemId>,
155 pub owner_id: RoleId,
156 pub privileges: PrivilegeMap,
157}
158
159impl From<Schema> for durable::Schema {
160 fn from(schema: Schema) -> durable::Schema {
161 durable::Schema {
162 id: schema.id.into(),
163 oid: schema.oid,
164 name: schema.name.schema,
165 database_id: schema.name.database.id(),
166 owner_id: schema.owner_id,
167 privileges: schema.privileges.into_all_values().collect(),
168 }
169 }
170}
171
172impl From<durable::Schema> for Schema {
173 fn from(
174 durable::Schema {
175 id,
176 oid,
177 name,
178 database_id,
179 owner_id,
180 privileges,
181 }: durable::Schema,
182 ) -> Schema {
183 Schema {
184 name: QualifiedSchemaName {
185 database: database_id.into(),
186 schema: name,
187 },
188 id: id.into(),
189 oid,
190 items: BTreeMap::new(),
191 functions: BTreeMap::new(),
192 types: BTreeMap::new(),
193 owner_id,
194 privileges: PrivilegeMap::from_mz_acl_items(privileges),
195 }
196 }
197}
198
199impl UpdateFrom<durable::Schema> for Schema {
200 fn update_from(
201 &mut self,
202 durable::Schema {
203 id,
204 oid,
205 name,
206 database_id,
207 owner_id,
208 privileges,
209 }: durable::Schema,
210 ) {
211 self.name = QualifiedSchemaName {
212 database: database_id.into(),
213 schema: name,
214 };
215 self.id = id.into();
216 self.oid = oid;
217 self.owner_id = owner_id;
218 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
219 }
220}
221
222#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
223pub struct Role {
224 pub name: String,
225 pub id: RoleId,
226 pub oid: u32,
227 pub attributes: RoleAttributes,
228 pub membership: RoleMembership,
229 pub vars: RoleVars,
230}
231
232impl Role {
233 pub fn is_user(&self) -> bool {
234 self.id.is_user()
235 }
236
237 pub fn vars<'a>(&'a self) -> impl Iterator<Item = (&'a str, &'a OwnedVarInput)> {
238 self.vars.map.iter().map(|(name, val)| (name.as_str(), val))
239 }
240}
241
242impl From<Role> for durable::Role {
243 fn from(role: Role) -> durable::Role {
244 durable::Role {
245 id: role.id,
246 oid: role.oid,
247 name: role.name,
248 attributes: role.attributes,
249 membership: role.membership,
250 vars: role.vars,
251 }
252 }
253}
254
255impl From<durable::Role> for Role {
256 fn from(
257 durable::Role {
258 id,
259 oid,
260 name,
261 attributes,
262 membership,
263 vars,
264 }: durable::Role,
265 ) -> Self {
266 Role {
267 name,
268 id,
269 oid,
270 attributes,
271 membership,
272 vars,
273 }
274 }
275}
276
277impl UpdateFrom<durable::Role> for Role {
278 fn update_from(
279 &mut self,
280 durable::Role {
281 id,
282 oid,
283 name,
284 attributes,
285 membership,
286 vars,
287 }: durable::Role,
288 ) {
289 self.id = id;
290 self.oid = oid;
291 self.name = name;
292 self.attributes = attributes;
293 self.membership = membership;
294 self.vars = vars;
295 }
296}
297
298#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
299pub struct RoleAuth {
300 pub role_id: RoleId,
301 pub password_hash: Option<String>,
302 pub updated_at: u64,
303}
304
305impl From<RoleAuth> for durable::RoleAuth {
306 fn from(role_auth: RoleAuth) -> durable::RoleAuth {
307 durable::RoleAuth {
308 role_id: role_auth.role_id,
309 password_hash: role_auth.password_hash,
310 updated_at: role_auth.updated_at,
311 }
312 }
313}
314
315impl From<durable::RoleAuth> for RoleAuth {
316 fn from(
317 durable::RoleAuth {
318 role_id,
319 password_hash,
320 updated_at,
321 }: durable::RoleAuth,
322 ) -> RoleAuth {
323 RoleAuth {
324 role_id,
325 password_hash,
326 updated_at,
327 }
328 }
329}
330
331impl UpdateFrom<durable::RoleAuth> for RoleAuth {
332 fn update_from(&mut self, from: durable::RoleAuth) {
333 self.role_id = from.role_id;
334 self.password_hash = from.password_hash;
335 }
336}
337
338#[derive(Debug, Serialize, Clone, PartialEq)]
339pub struct Cluster {
340 pub name: String,
341 pub id: ClusterId,
342 pub config: ClusterConfig,
343 #[serde(skip)]
344 pub log_indexes: BTreeMap<LogVariant, GlobalId>,
345 pub bound_objects: BTreeSet<CatalogItemId>,
348 pub replica_id_by_name_: BTreeMap<String, ReplicaId>,
349 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
350 pub replicas_by_id_: BTreeMap<ReplicaId, ClusterReplica>,
351 pub owner_id: RoleId,
352 pub privileges: PrivilegeMap,
353}
354
355impl Cluster {
356 pub fn role(&self) -> ClusterRole {
358 if self.name == MZ_SYSTEM_CLUSTER.name {
361 ClusterRole::SystemCritical
362 } else if self.name == MZ_CATALOG_SERVER_CLUSTER.name {
363 ClusterRole::System
364 } else {
365 ClusterRole::User
366 }
367 }
368
369 pub fn is_managed(&self) -> bool {
371 matches!(self.config.variant, ClusterVariant::Managed { .. })
372 }
373
374 pub fn user_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
376 self.replicas().filter(|r| !r.config.location.internal())
377 }
378
379 pub fn replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
381 self.replicas_by_id_.values()
382 }
383
384 pub fn replica(&self, replica_id: ReplicaId) -> Option<&ClusterReplica> {
386 self.replicas_by_id_.get(&replica_id)
387 }
388
389 pub fn replica_id(&self, name: &str) -> Option<ReplicaId> {
391 self.replica_id_by_name_.get(name).copied()
392 }
393
394 pub fn availability_zones(&self) -> Option<&[String]> {
396 match &self.config.variant {
397 ClusterVariant::Managed(managed) => Some(&managed.availability_zones),
398 ClusterVariant::Unmanaged => None,
399 }
400 }
401
402 pub fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
403 let name = self.name.clone();
404 let variant = match &self.config.variant {
405 ClusterVariant::Managed(ClusterVariantManaged {
406 size,
407 availability_zones,
408 logging,
409 replication_factor,
410 optimizer_feature_overrides,
411 schedule,
412 }) => {
413 let introspection = match logging {
414 ReplicaLogging {
415 log_logging,
416 interval: Some(interval),
417 } => Some(ComputeReplicaIntrospectionConfig {
418 debugging: *log_logging,
419 interval: interval.clone(),
420 }),
421 ReplicaLogging {
422 log_logging: _,
423 interval: None,
424 } => None,
425 };
426 let compute = ComputeReplicaConfig { introspection };
427 CreateClusterVariant::Managed(CreateClusterManagedPlan {
428 replication_factor: replication_factor.clone(),
429 size: size.clone(),
430 availability_zones: availability_zones.clone(),
431 compute,
432 optimizer_feature_overrides: optimizer_feature_overrides.clone(),
433 schedule: schedule.clone(),
434 })
435 }
436 ClusterVariant::Unmanaged => {
437 return Err(PlanError::Unsupported {
440 feature: "SHOW CREATE for unmanaged clusters".to_string(),
441 discussion_no: None,
442 });
443 }
444 };
445 let workload_class = self.config.workload_class.clone();
446 Ok(CreateClusterPlan {
447 name,
448 variant,
449 workload_class,
450 })
451 }
452}
453
454impl From<Cluster> for durable::Cluster {
455 fn from(cluster: Cluster) -> durable::Cluster {
456 durable::Cluster {
457 id: cluster.id,
458 name: cluster.name,
459 owner_id: cluster.owner_id,
460 privileges: cluster.privileges.into_all_values().collect(),
461 config: cluster.config.into(),
462 }
463 }
464}
465
466impl From<durable::Cluster> for Cluster {
467 fn from(
468 durable::Cluster {
469 id,
470 name,
471 owner_id,
472 privileges,
473 config,
474 }: durable::Cluster,
475 ) -> Self {
476 Cluster {
477 name: name.clone(),
478 id,
479 bound_objects: BTreeSet::new(),
480 log_indexes: BTreeMap::new(),
481 replica_id_by_name_: BTreeMap::new(),
482 replicas_by_id_: BTreeMap::new(),
483 owner_id,
484 privileges: PrivilegeMap::from_mz_acl_items(privileges),
485 config: config.into(),
486 }
487 }
488}
489
490impl UpdateFrom<durable::Cluster> for Cluster {
491 fn update_from(
492 &mut self,
493 durable::Cluster {
494 id,
495 name,
496 owner_id,
497 privileges,
498 config,
499 }: durable::Cluster,
500 ) {
501 self.id = id;
502 self.name = name;
503 self.owner_id = owner_id;
504 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
505 self.config = config.into();
506 }
507}
508
509#[derive(Debug, Serialize, Clone, PartialEq)]
510pub struct ClusterReplica {
511 pub name: String,
512 pub cluster_id: ClusterId,
513 pub replica_id: ReplicaId,
514 pub config: ReplicaConfig,
515 pub owner_id: RoleId,
516}
517
518impl From<ClusterReplica> for durable::ClusterReplica {
519 fn from(replica: ClusterReplica) -> durable::ClusterReplica {
520 durable::ClusterReplica {
521 cluster_id: replica.cluster_id,
522 replica_id: replica.replica_id,
523 name: replica.name,
524 config: replica.config.into(),
525 owner_id: replica.owner_id,
526 }
527 }
528}
529
530#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
531pub struct ClusterReplicaProcessStatus {
532 pub status: ClusterStatus,
533 pub time: DateTime<Utc>,
534}
535
536#[derive(Debug, Serialize, Clone, PartialEq)]
537pub struct SourceReferences {
538 pub updated_at: u64,
539 pub references: Vec<SourceReference>,
540}
541
542#[derive(Debug, Serialize, Clone, PartialEq)]
543pub struct SourceReference {
544 pub name: String,
545 pub namespace: Option<String>,
546 pub columns: Vec<String>,
547}
548
549impl From<SourceReference> for durable::SourceReference {
550 fn from(source_reference: SourceReference) -> durable::SourceReference {
551 durable::SourceReference {
552 name: source_reference.name,
553 namespace: source_reference.namespace,
554 columns: source_reference.columns,
555 }
556 }
557}
558
559impl SourceReferences {
560 pub fn to_durable(self, source_id: CatalogItemId) -> durable::SourceReferences {
561 durable::SourceReferences {
562 source_id,
563 updated_at: self.updated_at,
564 references: self.references.into_iter().map(Into::into).collect(),
565 }
566 }
567}
568
569impl From<durable::SourceReference> for SourceReference {
570 fn from(source_reference: durable::SourceReference) -> SourceReference {
571 SourceReference {
572 name: source_reference.name,
573 namespace: source_reference.namespace,
574 columns: source_reference.columns,
575 }
576 }
577}
578
579impl From<durable::SourceReferences> for SourceReferences {
580 fn from(source_references: durable::SourceReferences) -> SourceReferences {
581 SourceReferences {
582 updated_at: source_references.updated_at,
583 references: source_references
584 .references
585 .into_iter()
586 .map(|source_reference| source_reference.into())
587 .collect(),
588 }
589 }
590}
591
592impl From<mz_sql::plan::SourceReference> for SourceReference {
593 fn from(source_reference: mz_sql::plan::SourceReference) -> SourceReference {
594 SourceReference {
595 name: source_reference.name,
596 namespace: source_reference.namespace,
597 columns: source_reference.columns,
598 }
599 }
600}
601
602impl From<mz_sql::plan::SourceReferences> for SourceReferences {
603 fn from(source_references: mz_sql::plan::SourceReferences) -> SourceReferences {
604 SourceReferences {
605 updated_at: source_references.updated_at,
606 references: source_references
607 .references
608 .into_iter()
609 .map(|source_reference| source_reference.into())
610 .collect(),
611 }
612 }
613}
614
615impl From<SourceReferences> for mz_sql::plan::SourceReferences {
616 fn from(source_references: SourceReferences) -> mz_sql::plan::SourceReferences {
617 mz_sql::plan::SourceReferences {
618 updated_at: source_references.updated_at,
619 references: source_references
620 .references
621 .into_iter()
622 .map(|source_reference| source_reference.into())
623 .collect(),
624 }
625 }
626}
627
628impl From<SourceReference> for mz_sql::plan::SourceReference {
629 fn from(source_reference: SourceReference) -> mz_sql::plan::SourceReference {
630 mz_sql::plan::SourceReference {
631 name: source_reference.name,
632 namespace: source_reference.namespace,
633 columns: source_reference.columns,
634 }
635 }
636}
637
638#[derive(Clone, Debug, Serialize)]
639pub struct CatalogEntry {
640 pub item: CatalogItem,
641 #[serde(skip)]
642 pub referenced_by: Vec<CatalogItemId>,
643 #[serde(skip)]
647 pub used_by: Vec<CatalogItemId>,
648 pub id: CatalogItemId,
649 pub oid: u32,
650 pub name: QualifiedItemName,
651 pub owner_id: RoleId,
652 pub privileges: PrivilegeMap,
653}
654
655#[derive(Clone, Debug)]
673pub struct CatalogCollectionEntry {
674 pub entry: CatalogEntry,
675 pub version: RelationVersionSelector,
676}
677
678impl CatalogCollectionEntry {
679 pub fn relation_desc(&self) -> Option<Cow<'_, RelationDesc>> {
680 self.item().relation_desc(self.version)
681 }
682}
683
684impl mz_sql::catalog::CatalogCollectionItem for CatalogCollectionEntry {
685 fn relation_desc(&self) -> Option<Cow<'_, RelationDesc>> {
686 self.item().relation_desc(self.version)
687 }
688
689 fn global_id(&self) -> GlobalId {
690 self.entry
691 .item()
692 .global_id_for_version(self.version)
693 .expect("catalog corruption, missing version!")
694 }
695}
696
697impl Deref for CatalogCollectionEntry {
698 type Target = CatalogEntry;
699
700 fn deref(&self) -> &CatalogEntry {
701 &self.entry
702 }
703}
704
705impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry {
706 fn name(&self) -> &QualifiedItemName {
707 self.entry.name()
708 }
709
710 fn id(&self) -> CatalogItemId {
711 self.entry.id()
712 }
713
714 fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
715 Box::new(self.entry.global_ids())
716 }
717
718 fn oid(&self) -> u32 {
719 self.entry.oid()
720 }
721
722 fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
723 self.entry.func()
724 }
725
726 fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
727 self.entry.source_desc()
728 }
729
730 fn connection(
731 &self,
732 ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
733 {
734 mz_sql::catalog::CatalogItem::connection(&self.entry)
735 }
736
737 fn create_sql(&self) -> &str {
738 self.entry.create_sql()
739 }
740
741 fn item_type(&self) -> SqlCatalogItemType {
742 self.entry.item_type()
743 }
744
745 fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
746 self.entry.index_details()
747 }
748
749 fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
750 self.entry.writable_table_details()
751 }
752
753 fn replacement_target(&self) -> Option<CatalogItemId> {
754 self.entry.replacement_target()
755 }
756
757 fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
758 self.entry.type_details()
759 }
760
761 fn references(&self) -> &ResolvedIds {
762 self.entry.references()
763 }
764
765 fn uses(&self) -> BTreeSet<CatalogItemId> {
766 self.entry.uses()
767 }
768
769 fn referenced_by(&self) -> &[CatalogItemId] {
770 self.entry.referenced_by()
771 }
772
773 fn used_by(&self) -> &[CatalogItemId] {
774 self.entry.used_by()
775 }
776
777 fn subsource_details(
778 &self,
779 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
780 self.entry.subsource_details()
781 }
782
783 fn source_export_details(
784 &self,
785 ) -> Option<(
786 CatalogItemId,
787 &UnresolvedItemName,
788 &SourceExportDetails,
789 &SourceExportDataConfig<ReferencedConnection>,
790 )> {
791 self.entry.source_export_details()
792 }
793
794 fn is_progress_source(&self) -> bool {
795 self.entry.is_progress_source()
796 }
797
798 fn progress_id(&self) -> Option<CatalogItemId> {
799 self.entry.progress_id()
800 }
801
802 fn owner_id(&self) -> RoleId {
803 *self.entry.owner_id()
804 }
805
806 fn privileges(&self) -> &PrivilegeMap {
807 self.entry.privileges()
808 }
809
810 fn cluster_id(&self) -> Option<ClusterId> {
811 self.entry.item().cluster_id()
812 }
813
814 fn at_version(
815 &self,
816 version: RelationVersionSelector,
817 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
818 Box::new(CatalogCollectionEntry {
819 entry: self.entry.clone(),
820 version,
821 })
822 }
823
824 fn latest_version(&self) -> Option<RelationVersion> {
825 self.entry.latest_version()
826 }
827}
828
829#[derive(Debug, Clone, Serialize)]
830pub enum CatalogItem {
831 Table(Table),
832 Source(Source),
833 Log(Log),
834 View(View),
835 MaterializedView(MaterializedView),
836 Sink(Sink),
837 Index(Index),
838 Type(Type),
839 Func(Func),
840 Secret(Secret),
841 Connection(Connection),
842 ContinualTask(ContinualTask),
843}
844
845impl From<CatalogEntry> for durable::Item {
846 fn from(entry: CatalogEntry) -> durable::Item {
847 let (create_sql, global_id, extra_versions) = entry.item.into_serialized();
848 durable::Item {
849 id: entry.id,
850 oid: entry.oid,
851 global_id,
852 schema_id: entry.name.qualifiers.schema_spec.into(),
853 name: entry.name.item,
854 create_sql,
855 owner_id: entry.owner_id,
856 privileges: entry.privileges.into_all_values().collect(),
857 extra_versions,
858 }
859 }
860}
861
862#[derive(Debug, Clone, Serialize)]
863pub struct Table {
864 pub create_sql: Option<String>,
866 pub desc: VersionedRelationDesc,
868 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
870 pub collections: BTreeMap<RelationVersion, GlobalId>,
871 #[serde(skip)]
873 pub conn_id: Option<ConnectionId>,
874 pub resolved_ids: ResolvedIds,
876 pub custom_logical_compaction_window: Option<CompactionWindow>,
878 pub is_retained_metrics_object: bool,
883 pub data_source: TableDataSource,
885}
886
887impl Table {
888 pub fn timeline(&self) -> Timeline {
889 match &self.data_source {
890 TableDataSource::TableWrites { .. } => Timeline::EpochMilliseconds,
893 TableDataSource::DataSource { timeline, .. } => timeline.clone(),
894 }
895 }
896
897 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
899 self.collections.values().copied()
900 }
901
902 pub fn global_id_writes(&self) -> GlobalId {
904 *self
905 .collections
906 .last_key_value()
907 .expect("at least one version of a table")
908 .1
909 }
910
911 pub fn collection_descs(
913 &self,
914 ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
915 self.collections.iter().map(|(version, gid)| {
916 let desc = self
917 .desc
918 .at_version(RelationVersionSelector::Specific(*version));
919 (*gid, *version, desc)
920 })
921 }
922
923 pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
925 let (version, _gid) = self
926 .collections
927 .iter()
928 .find(|(_version, gid)| *gid == id)
929 .expect("GlobalId to exist");
930 self.desc
931 .at_version(RelationVersionSelector::Specific(*version))
932 }
933}
934
935#[derive(Clone, Debug, Serialize)]
936pub enum TableDataSource {
937 TableWrites {
939 #[serde(skip)]
940 defaults: Vec<Expr<Aug>>,
941 },
942
943 DataSource {
946 desc: DataSourceDesc,
947 timeline: Timeline,
948 },
949}
950
951#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
952pub enum DataSourceDesc {
953 Ingestion {
955 desc: SourceDesc<ReferencedConnection>,
956 cluster_id: ClusterId,
957 },
958 OldSyntaxIngestion {
960 desc: SourceDesc<ReferencedConnection>,
961 cluster_id: ClusterId,
962 progress_subsource: CatalogItemId,
965 data_config: SourceExportDataConfig<ReferencedConnection>,
966 details: SourceExportDetails,
967 },
968 IngestionExport {
976 ingestion_id: CatalogItemId,
977 external_reference: UnresolvedItemName,
978 details: SourceExportDetails,
979 data_config: SourceExportDataConfig<ReferencedConnection>,
980 },
981 Introspection(IntrospectionType),
983 Progress,
985 Webhook {
987 validate_using: Option<WebhookValidation>,
989 body_format: WebhookBodyFormat,
991 headers: WebhookHeaders,
993 cluster_id: ClusterId,
995 },
996 Catalog,
998}
999
1000impl From<IntrospectionType> for DataSourceDesc {
1001 fn from(typ: IntrospectionType) -> Self {
1002 Self::Introspection(typ)
1003 }
1004}
1005
1006impl DataSourceDesc {
1007 pub fn formats(&self) -> (Option<&str>, Option<&str>) {
1009 match &self {
1010 DataSourceDesc::Ingestion { .. } => (None, None),
1011 DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1012 match &data_config.encoding.as_ref() {
1013 Some(encoding) => match &encoding.key {
1014 Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1015 None => (None, Some(encoding.value.type_())),
1016 },
1017 None => (None, None),
1018 }
1019 }
1020 DataSourceDesc::IngestionExport { data_config, .. } => match &data_config.encoding {
1021 Some(encoding) => match &encoding.key {
1022 Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1023 None => (None, Some(encoding.value.type_())),
1024 },
1025 None => (None, None),
1026 },
1027 DataSourceDesc::Introspection(_)
1028 | DataSourceDesc::Webhook { .. }
1029 | DataSourceDesc::Progress
1030 | DataSourceDesc::Catalog => (None, None),
1031 }
1032 }
1033
1034 pub fn envelope(&self) -> Option<&str> {
1036 fn envelope_string(envelope: &SourceEnvelope) -> &str {
1041 match envelope {
1042 SourceEnvelope::None(_) => "none",
1043 SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
1044 mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert",
1045 mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
1046 "debezium"
1050 }
1051 mz_storage_types::sources::envelope::UpsertStyle::ValueErrInline { .. } => {
1052 "upsert-value-err-inline"
1053 }
1054 },
1055 SourceEnvelope::CdcV2 => {
1056 "materialize"
1059 }
1060 }
1061 }
1062
1063 match self {
1064 DataSourceDesc::Ingestion { .. } => None,
1069 DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1070 Some(envelope_string(&data_config.envelope))
1071 }
1072 DataSourceDesc::IngestionExport { data_config, .. } => {
1073 Some(envelope_string(&data_config.envelope))
1074 }
1075 DataSourceDesc::Introspection(_)
1076 | DataSourceDesc::Webhook { .. }
1077 | DataSourceDesc::Progress
1078 | DataSourceDesc::Catalog => None,
1079 }
1080 }
1081}
1082
1083#[derive(Debug, Clone, Serialize)]
1084pub struct Source {
1085 pub create_sql: Option<String>,
1087 pub global_id: GlobalId,
1089 #[serde(skip)]
1091 pub data_source: DataSourceDesc,
1092 pub desc: RelationDesc,
1094 pub timeline: Timeline,
1096 pub resolved_ids: ResolvedIds,
1098 pub custom_logical_compaction_window: Option<CompactionWindow>,
1102 pub is_retained_metrics_object: bool,
1105}
1106
1107impl Source {
1108 pub fn new(
1115 plan: CreateSourcePlan,
1116 global_id: GlobalId,
1117 resolved_ids: ResolvedIds,
1118 custom_logical_compaction_window: Option<CompactionWindow>,
1119 is_retained_metrics_object: bool,
1120 ) -> Source {
1121 Source {
1122 create_sql: Some(plan.source.create_sql),
1123 data_source: match plan.source.data_source {
1124 mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1125 desc,
1126 cluster_id: plan
1127 .in_cluster
1128 .expect("ingestion-based sources must be given a cluster ID"),
1129 },
1130 mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1131 desc,
1132 progress_subsource,
1133 data_config,
1134 details,
1135 } => DataSourceDesc::OldSyntaxIngestion {
1136 desc,
1137 cluster_id: plan
1138 .in_cluster
1139 .expect("ingestion-based sources must be given a cluster ID"),
1140 progress_subsource,
1141 data_config,
1142 details,
1143 },
1144 mz_sql::plan::DataSourceDesc::Progress => {
1145 assert!(
1146 plan.in_cluster.is_none(),
1147 "subsources must not have a host config or cluster_id defined"
1148 );
1149 DataSourceDesc::Progress
1150 }
1151 mz_sql::plan::DataSourceDesc::IngestionExport {
1152 ingestion_id,
1153 external_reference,
1154 details,
1155 data_config,
1156 } => {
1157 assert!(
1158 plan.in_cluster.is_none(),
1159 "subsources must not have a host config or cluster_id defined"
1160 );
1161 DataSourceDesc::IngestionExport {
1162 ingestion_id,
1163 external_reference,
1164 details,
1165 data_config,
1166 }
1167 }
1168 mz_sql::plan::DataSourceDesc::Webhook {
1169 validate_using,
1170 body_format,
1171 headers,
1172 cluster_id,
1173 } => {
1174 mz_ore::soft_assert_or_log!(
1175 cluster_id.is_none(),
1176 "cluster_id set at Source level for Webhooks"
1177 );
1178 DataSourceDesc::Webhook {
1179 validate_using,
1180 body_format,
1181 headers,
1182 cluster_id: plan
1183 .in_cluster
1184 .expect("webhook sources must be given a cluster ID"),
1185 }
1186 }
1187 },
1188 desc: plan.source.desc,
1189 global_id,
1190 timeline: plan.timeline,
1191 resolved_ids,
1192 custom_logical_compaction_window: plan
1193 .source
1194 .compaction_window
1195 .or(custom_logical_compaction_window),
1196 is_retained_metrics_object,
1197 }
1198 }
1199
1200 pub fn source_type(&self) -> &str {
1202 match &self.data_source {
1203 DataSourceDesc::Ingestion { desc, .. }
1204 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.name(),
1205 DataSourceDesc::Progress => "progress",
1206 DataSourceDesc::IngestionExport { .. } => "subsource",
1207 DataSourceDesc::Introspection(_) | DataSourceDesc::Catalog => "source",
1208 DataSourceDesc::Webhook { .. } => "webhook",
1209 }
1210 }
1211
1212 pub fn connection_id(&self) -> Option<CatalogItemId> {
1214 match &self.data_source {
1215 DataSourceDesc::Ingestion { desc, .. }
1216 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.connection_id(),
1217 DataSourceDesc::IngestionExport { .. }
1218 | DataSourceDesc::Introspection(_)
1219 | DataSourceDesc::Webhook { .. }
1220 | DataSourceDesc::Progress
1221 | DataSourceDesc::Catalog => None,
1222 }
1223 }
1224
1225 pub fn global_id(&self) -> GlobalId {
1227 self.global_id
1228 }
1229
1230 pub fn user_controllable_persist_shard_count(&self) -> i64 {
1238 match &self.data_source {
1239 DataSourceDesc::Ingestion { .. } => 0,
1240 DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1241 match &desc.connection {
1242 GenericSourceConnection::Postgres(_)
1246 | GenericSourceConnection::MySql(_)
1247 | GenericSourceConnection::SqlServer(_) => 0,
1248 GenericSourceConnection::LoadGenerator(lg) => match lg.load_generator {
1249 LoadGenerator::Clock
1251 | LoadGenerator::Counter { .. }
1252 | LoadGenerator::Datums
1253 | LoadGenerator::KeyValue(_) => 1,
1254 LoadGenerator::Auction
1255 | LoadGenerator::Marketing
1256 | LoadGenerator::Tpch { .. } => 0,
1257 },
1258 GenericSourceConnection::Kafka(_) => 1,
1259 }
1260 }
1261 DataSourceDesc::IngestionExport { .. } => 1,
1264 DataSourceDesc::Webhook { .. } => 1,
1265 DataSourceDesc::Introspection(_)
1268 | DataSourceDesc::Progress
1269 | DataSourceDesc::Catalog => 0,
1270 }
1271 }
1272}
1273
1274#[derive(Debug, Clone, Serialize)]
1275pub struct Log {
1276 pub variant: LogVariant,
1278 pub global_id: GlobalId,
1280}
1281
1282impl Log {
1283 pub fn global_id(&self) -> GlobalId {
1285 self.global_id
1286 }
1287}
1288
1289#[derive(Debug, Clone, Serialize)]
1290pub struct Sink {
1291 pub create_sql: String,
1293 pub global_id: GlobalId,
1295 pub from: GlobalId,
1297 pub connection: StorageSinkConnection<ReferencedConnection>,
1299 pub envelope: SinkEnvelope,
1303 pub with_snapshot: bool,
1305 pub version: u64,
1307 pub resolved_ids: ResolvedIds,
1309 pub cluster_id: ClusterId,
1311 pub commit_interval: Option<Duration>,
1313}
1314
1315impl Sink {
1316 pub fn sink_type(&self) -> &str {
1317 self.connection.name()
1318 }
1319
1320 pub fn envelope(&self) -> Option<&str> {
1322 match &self.envelope {
1323 SinkEnvelope::Debezium => Some("debezium"),
1324 SinkEnvelope::Upsert => Some("upsert"),
1325 SinkEnvelope::Append => Some("append"),
1326 }
1327 }
1328
1329 pub fn combined_format(&self) -> Option<Cow<'_, str>> {
1334 match &self.connection {
1335 StorageSinkConnection::Kafka(connection) => Some(connection.format.get_format_name()),
1336 StorageSinkConnection::Iceberg(_) => None,
1337 }
1338 }
1339
1340 pub fn formats(&self) -> Option<(Option<&str>, &str)> {
1342 match &self.connection {
1343 StorageSinkConnection::Kafka(connection) => {
1344 let key_format = connection
1345 .format
1346 .key_format
1347 .as_ref()
1348 .map(|f| f.get_format_name());
1349 let value_format = connection.format.value_format.get_format_name();
1350 Some((key_format, value_format))
1351 }
1352 StorageSinkConnection::Iceberg(_) => None,
1353 }
1354 }
1355
1356 pub fn connection_id(&self) -> Option<CatalogItemId> {
1357 self.connection.connection_id()
1358 }
1359
1360 pub fn global_id(&self) -> GlobalId {
1362 self.global_id
1363 }
1364}
1365
1366#[derive(Debug, Clone, Serialize)]
1367pub struct View {
1368 pub create_sql: String,
1370 pub global_id: GlobalId,
1372 pub raw_expr: Arc<HirRelationExpr>,
1374 pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1376 pub desc: RelationDesc,
1378 pub conn_id: Option<ConnectionId>,
1380 pub resolved_ids: ResolvedIds,
1382 pub dependencies: DependencyIds,
1384}
1385
1386impl View {
1387 pub fn global_id(&self) -> GlobalId {
1389 self.global_id
1390 }
1391}
1392
1393#[derive(Debug, Clone, Serialize)]
1394pub struct MaterializedView {
1395 pub create_sql: String,
1397 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
1399 pub collections: BTreeMap<RelationVersion, GlobalId>,
1400 pub raw_expr: Arc<HirRelationExpr>,
1402 pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1404 pub desc: VersionedRelationDesc,
1406 pub resolved_ids: ResolvedIds,
1408 pub dependencies: DependencyIds,
1410 pub replacement_target: Option<CatalogItemId>,
1412 pub cluster_id: ClusterId,
1414 pub target_replica: Option<ReplicaId>,
1416 pub non_null_assertions: Vec<usize>,
1420 pub custom_logical_compaction_window: Option<CompactionWindow>,
1422 pub refresh_schedule: Option<RefreshSchedule>,
1424 pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1429}
1430
1431impl MaterializedView {
1432 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1434 self.collections.values().copied()
1435 }
1436
1437 pub fn global_id_writes(&self) -> GlobalId {
1440 *self
1441 .collections
1442 .last_key_value()
1443 .expect("at least one version of a materialized view")
1444 .1
1445 }
1446
1447 pub fn collection_descs(
1449 &self,
1450 ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
1451 self.collections.iter().map(|(version, gid)| {
1452 let desc = self
1453 .desc
1454 .at_version(RelationVersionSelector::Specific(*version));
1455 (*gid, *version, desc)
1456 })
1457 }
1458
1459 pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
1461 let (version, _gid) = self
1462 .collections
1463 .iter()
1464 .find(|(_version, gid)| *gid == id)
1465 .expect("GlobalId to exist");
1466 self.desc
1467 .at_version(RelationVersionSelector::Specific(*version))
1468 }
1469
1470 pub fn apply_replacement(&mut self, replacement: Self) {
1472 let target_id = replacement
1473 .replacement_target
1474 .expect("replacement has target");
1475
1476 fn parse(create_sql: &str) -> mz_sql::ast::CreateMaterializedViewStatement<Raw> {
1477 let res = mz_sql::parse::parse(create_sql).unwrap_or_else(|e| {
1478 panic!("invalid create_sql persisted in catalog: {e}\n{create_sql}");
1479 });
1480 if let Statement::CreateMaterializedView(cmvs) = res.into_element().ast {
1481 cmvs
1482 } else {
1483 panic!("invalid MV create_sql persisted in catalog\n{create_sql}");
1484 }
1485 }
1486
1487 let old_stmt = parse(&self.create_sql);
1488 let rpl_stmt = parse(&replacement.create_sql);
1489 let new_stmt = mz_sql::ast::CreateMaterializedViewStatement {
1490 if_exists: old_stmt.if_exists,
1491 name: old_stmt.name,
1492 columns: rpl_stmt.columns,
1493 replacement_for: None,
1494 in_cluster: rpl_stmt.in_cluster,
1495 in_cluster_replica: rpl_stmt.in_cluster_replica,
1496 query: rpl_stmt.query,
1497 as_of: rpl_stmt.as_of,
1498 with_options: rpl_stmt.with_options,
1499 };
1500 let create_sql = new_stmt.to_ast_string_stable();
1501
1502 let mut collections = std::mem::take(&mut self.collections);
1503 let latest_version = collections.keys().max().expect("at least one version");
1507 let new_version = latest_version.bump();
1508 collections.insert(new_version, replacement.global_id_writes());
1509
1510 let mut resolved_ids = replacement.resolved_ids;
1511 resolved_ids.remove_item(&target_id);
1512 let mut dependencies = replacement.dependencies;
1513 dependencies.0.remove(&target_id);
1514
1515 *self = Self {
1516 create_sql,
1517 collections,
1518 raw_expr: replacement.raw_expr,
1519 optimized_expr: replacement.optimized_expr,
1520 desc: replacement.desc,
1521 resolved_ids,
1522 dependencies,
1523 replacement_target: None,
1524 cluster_id: replacement.cluster_id,
1525 target_replica: replacement.target_replica,
1526 non_null_assertions: replacement.non_null_assertions,
1527 custom_logical_compaction_window: replacement.custom_logical_compaction_window,
1528 refresh_schedule: replacement.refresh_schedule,
1529 initial_as_of: replacement.initial_as_of,
1530 };
1531 }
1532}
1533
1534#[derive(Debug, Clone, Serialize)]
1535pub struct Index {
1536 pub create_sql: String,
1538 pub global_id: GlobalId,
1540 pub on: GlobalId,
1542 pub keys: Arc<[MirScalarExpr]>,
1544 pub conn_id: Option<ConnectionId>,
1546 pub resolved_ids: ResolvedIds,
1548 pub cluster_id: ClusterId,
1550 pub custom_logical_compaction_window: Option<CompactionWindow>,
1552 pub is_retained_metrics_object: bool,
1557}
1558
1559impl Index {
1560 pub fn global_id(&self) -> GlobalId {
1562 self.global_id
1563 }
1564}
1565
1566#[derive(Debug, Clone, Serialize)]
1567pub struct Type {
1568 pub create_sql: Option<String>,
1570 pub global_id: GlobalId,
1572 #[serde(skip)]
1573 pub details: CatalogTypeDetails<IdReference>,
1574 pub resolved_ids: ResolvedIds,
1576}
1577
1578#[derive(Debug, Clone, Serialize)]
1579pub struct Func {
1580 #[serde(skip)]
1582 pub inner: &'static mz_sql::func::Func,
1583 pub global_id: GlobalId,
1585}
1586
1587#[derive(Debug, Clone, Serialize)]
1588pub struct Secret {
1589 pub create_sql: String,
1591 pub global_id: GlobalId,
1593}
1594
1595#[derive(Debug, Clone, Serialize)]
1596pub struct Connection {
1597 pub create_sql: String,
1599 pub global_id: GlobalId,
1601 pub details: ConnectionDetails,
1603 pub resolved_ids: ResolvedIds,
1605}
1606
1607impl Connection {
1608 pub fn global_id(&self) -> GlobalId {
1610 self.global_id
1611 }
1612}
1613
1614#[derive(Debug, Clone, Serialize)]
1615pub struct ContinualTask {
1616 pub create_sql: String,
1618 pub global_id: GlobalId,
1620 pub input_id: GlobalId,
1622 pub with_snapshot: bool,
1623 pub raw_expr: Arc<HirRelationExpr>,
1628 pub desc: RelationDesc,
1630 pub resolved_ids: ResolvedIds,
1632 pub dependencies: DependencyIds,
1634 pub cluster_id: ClusterId,
1636 pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1638}
1639
1640impl ContinualTask {
1641 pub fn global_id(&self) -> GlobalId {
1643 self.global_id
1644 }
1645}
1646
1647#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
1648pub struct NetworkPolicy {
1649 pub name: String,
1650 pub id: NetworkPolicyId,
1651 pub oid: u32,
1652 pub rules: Vec<NetworkPolicyRule>,
1653 pub owner_id: RoleId,
1654 pub privileges: PrivilegeMap,
1655}
1656
1657impl From<NetworkPolicy> for durable::NetworkPolicy {
1658 fn from(policy: NetworkPolicy) -> durable::NetworkPolicy {
1659 durable::NetworkPolicy {
1660 id: policy.id,
1661 oid: policy.oid,
1662 name: policy.name,
1663 rules: policy.rules,
1664 owner_id: policy.owner_id,
1665 privileges: policy.privileges.into_all_values().collect(),
1666 }
1667 }
1668}
1669
1670impl From<durable::NetworkPolicy> for NetworkPolicy {
1671 fn from(
1672 durable::NetworkPolicy {
1673 id,
1674 oid,
1675 name,
1676 rules,
1677 owner_id,
1678 privileges,
1679 }: durable::NetworkPolicy,
1680 ) -> Self {
1681 NetworkPolicy {
1682 id,
1683 oid,
1684 name,
1685 rules,
1686 owner_id,
1687 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1688 }
1689 }
1690}
1691
1692impl UpdateFrom<durable::NetworkPolicy> for NetworkPolicy {
1693 fn update_from(
1694 &mut self,
1695 durable::NetworkPolicy {
1696 id,
1697 oid,
1698 name,
1699 rules,
1700 owner_id,
1701 privileges,
1702 }: durable::NetworkPolicy,
1703 ) {
1704 self.id = id;
1705 self.oid = oid;
1706 self.name = name;
1707 self.rules = rules;
1708 self.owner_id = owner_id;
1709 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1710 }
1711}
1712
1713impl CatalogItem {
1714 pub fn typ(&self) -> mz_sql::catalog::CatalogItemType {
1716 match self {
1717 CatalogItem::Table(_) => CatalogItemType::Table,
1718 CatalogItem::Source(_) => CatalogItemType::Source,
1719 CatalogItem::Log(_) => CatalogItemType::Source,
1720 CatalogItem::Sink(_) => CatalogItemType::Sink,
1721 CatalogItem::View(_) => CatalogItemType::View,
1722 CatalogItem::MaterializedView(_) => CatalogItemType::MaterializedView,
1723 CatalogItem::Index(_) => CatalogItemType::Index,
1724 CatalogItem::Type(_) => CatalogItemType::Type,
1725 CatalogItem::Func(_) => CatalogItemType::Func,
1726 CatalogItem::Secret(_) => CatalogItemType::Secret,
1727 CatalogItem::Connection(_) => CatalogItemType::Connection,
1728 CatalogItem::ContinualTask(_) => CatalogItemType::ContinualTask,
1729 }
1730 }
1731
1732 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1734 let gid = match self {
1735 CatalogItem::Source(source) => source.global_id,
1736 CatalogItem::Log(log) => log.global_id,
1737 CatalogItem::Sink(sink) => sink.global_id,
1738 CatalogItem::View(view) => view.global_id,
1739 CatalogItem::MaterializedView(mv) => {
1740 return itertools::Either::Left(mv.collections.values().copied());
1741 }
1742 CatalogItem::ContinualTask(ct) => ct.global_id,
1743 CatalogItem::Index(index) => index.global_id,
1744 CatalogItem::Func(func) => func.global_id,
1745 CatalogItem::Type(ty) => ty.global_id,
1746 CatalogItem::Secret(secret) => secret.global_id,
1747 CatalogItem::Connection(conn) => conn.global_id,
1748 CatalogItem::Table(table) => {
1749 return itertools::Either::Left(table.collections.values().copied());
1750 }
1751 };
1752 itertools::Either::Right(std::iter::once(gid))
1753 }
1754
1755 pub fn latest_global_id(&self) -> GlobalId {
1759 match self {
1760 CatalogItem::Source(source) => source.global_id,
1761 CatalogItem::Log(log) => log.global_id,
1762 CatalogItem::Sink(sink) => sink.global_id,
1763 CatalogItem::View(view) => view.global_id,
1764 CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
1765 CatalogItem::ContinualTask(ct) => ct.global_id,
1766 CatalogItem::Index(index) => index.global_id,
1767 CatalogItem::Func(func) => func.global_id,
1768 CatalogItem::Type(ty) => ty.global_id,
1769 CatalogItem::Secret(secret) => secret.global_id,
1770 CatalogItem::Connection(conn) => conn.global_id,
1771 CatalogItem::Table(table) => table.global_id_writes(),
1772 }
1773 }
1774
1775 pub fn is_storage_collection(&self) -> bool {
1777 match self {
1778 CatalogItem::Table(_)
1779 | CatalogItem::Source(_)
1780 | CatalogItem::MaterializedView(_)
1781 | CatalogItem::Sink(_)
1782 | CatalogItem::ContinualTask(_) => true,
1783 CatalogItem::Log(_)
1784 | CatalogItem::View(_)
1785 | CatalogItem::Index(_)
1786 | CatalogItem::Type(_)
1787 | CatalogItem::Func(_)
1788 | CatalogItem::Secret(_)
1789 | CatalogItem::Connection(_) => false,
1790 }
1791 }
1792
1793 pub fn relation_desc(&self, version: RelationVersionSelector) -> Option<Cow<'_, RelationDesc>> {
1802 match &self {
1803 CatalogItem::Source(src) => Some(Cow::Borrowed(&src.desc)),
1804 CatalogItem::Log(log) => Some(Cow::Owned(log.variant.desc())),
1805 CatalogItem::Table(tbl) => Some(Cow::Owned(tbl.desc.at_version(version))),
1806 CatalogItem::View(view) => Some(Cow::Borrowed(&view.desc)),
1807 CatalogItem::MaterializedView(mview) => {
1808 Some(Cow::Owned(mview.desc.at_version(version)))
1809 }
1810 CatalogItem::ContinualTask(ct) => Some(Cow::Borrowed(&ct.desc)),
1811 CatalogItem::Func(_)
1812 | CatalogItem::Index(_)
1813 | CatalogItem::Sink(_)
1814 | CatalogItem::Secret(_)
1815 | CatalogItem::Connection(_)
1816 | CatalogItem::Type(_) => None,
1817 }
1818 }
1819
1820 pub fn func(
1821 &self,
1822 entry: &CatalogEntry,
1823 ) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
1824 match &self {
1825 CatalogItem::Func(func) => Ok(func.inner),
1826 _ => Err(SqlCatalogError::UnexpectedType {
1827 name: entry.name().item.to_string(),
1828 actual_type: entry.item_type(),
1829 expected_type: CatalogItemType::Func,
1830 }),
1831 }
1832 }
1833
1834 pub fn source_desc(
1835 &self,
1836 entry: &CatalogEntry,
1837 ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
1838 match &self {
1839 CatalogItem::Source(source) => match &source.data_source {
1840 DataSourceDesc::Ingestion { desc, .. }
1841 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => Ok(Some(desc)),
1842 DataSourceDesc::IngestionExport { .. }
1843 | DataSourceDesc::Introspection(_)
1844 | DataSourceDesc::Webhook { .. }
1845 | DataSourceDesc::Progress
1846 | DataSourceDesc::Catalog => Ok(None),
1847 },
1848 _ => Err(SqlCatalogError::UnexpectedType {
1849 name: entry.name().item.to_string(),
1850 actual_type: entry.item_type(),
1851 expected_type: CatalogItemType::Source,
1852 }),
1853 }
1854 }
1855
1856 pub fn is_progress_source(&self) -> bool {
1858 matches!(
1859 self,
1860 CatalogItem::Source(Source {
1861 data_source: DataSourceDesc::Progress,
1862 ..
1863 })
1864 )
1865 }
1866
1867 pub fn references(&self) -> &ResolvedIds {
1870 static EMPTY: LazyLock<ResolvedIds> = LazyLock::new(ResolvedIds::empty);
1871 match self {
1872 CatalogItem::Func(_) => &*EMPTY,
1873 CatalogItem::Index(idx) => &idx.resolved_ids,
1874 CatalogItem::Sink(sink) => &sink.resolved_ids,
1875 CatalogItem::Source(source) => &source.resolved_ids,
1876 CatalogItem::Log(_) => &*EMPTY,
1877 CatalogItem::Table(table) => &table.resolved_ids,
1878 CatalogItem::Type(typ) => &typ.resolved_ids,
1879 CatalogItem::View(view) => &view.resolved_ids,
1880 CatalogItem::MaterializedView(mview) => &mview.resolved_ids,
1881 CatalogItem::Secret(_) => &*EMPTY,
1882 CatalogItem::Connection(connection) => &connection.resolved_ids,
1883 CatalogItem::ContinualTask(ct) => &ct.resolved_ids,
1884 }
1885 }
1886
1887 pub fn uses(&self) -> BTreeSet<CatalogItemId> {
1893 let mut uses: BTreeSet<_> = self.references().items().copied().collect();
1894 match self {
1895 CatalogItem::Func(_) => {}
1898 CatalogItem::Index(_) => {}
1899 CatalogItem::Sink(_) => {}
1900 CatalogItem::Source(_) => {}
1901 CatalogItem::Log(_) => {}
1902 CatalogItem::Table(_) => {}
1903 CatalogItem::Type(_) => {}
1904 CatalogItem::View(view) => uses.extend(view.dependencies.0.iter().copied()),
1905 CatalogItem::MaterializedView(mview) => {
1906 uses.extend(mview.dependencies.0.iter().copied())
1907 }
1908 CatalogItem::ContinualTask(ct) => uses.extend(ct.dependencies.0.iter().copied()),
1909 CatalogItem::Secret(_) => {}
1910 CatalogItem::Connection(_) => {}
1911 }
1912 uses
1913 }
1914
1915 pub fn conn_id(&self) -> Option<&ConnectionId> {
1918 match self {
1919 CatalogItem::View(view) => view.conn_id.as_ref(),
1920 CatalogItem::Index(index) => index.conn_id.as_ref(),
1921 CatalogItem::Table(table) => table.conn_id.as_ref(),
1922 CatalogItem::Log(_)
1923 | CatalogItem::Source(_)
1924 | CatalogItem::Sink(_)
1925 | CatalogItem::MaterializedView(_)
1926 | CatalogItem::Secret(_)
1927 | CatalogItem::Type(_)
1928 | CatalogItem::Func(_)
1929 | CatalogItem::Connection(_)
1930 | CatalogItem::ContinualTask(_) => None,
1931 }
1932 }
1933
1934 pub fn set_conn_id(&mut self, conn_id: Option<ConnectionId>) {
1937 match self {
1938 CatalogItem::View(view) => view.conn_id = conn_id,
1939 CatalogItem::Index(index) => index.conn_id = conn_id,
1940 CatalogItem::Table(table) => table.conn_id = conn_id,
1941 CatalogItem::Log(_)
1942 | CatalogItem::Source(_)
1943 | CatalogItem::Sink(_)
1944 | CatalogItem::MaterializedView(_)
1945 | CatalogItem::Secret(_)
1946 | CatalogItem::Type(_)
1947 | CatalogItem::Func(_)
1948 | CatalogItem::Connection(_)
1949 | CatalogItem::ContinualTask(_) => (),
1950 }
1951 }
1952
1953 pub fn is_temporary(&self) -> bool {
1955 self.conn_id().is_some()
1956 }
1957
1958 pub fn rename_schema_refs(
1959 &self,
1960 database_name: &str,
1961 cur_schema_name: &str,
1962 new_schema_name: &str,
1963 ) -> Result<CatalogItem, (String, String)> {
1964 let do_rewrite = |create_sql: String| -> Result<String, (String, String)> {
1965 let mut create_stmt = mz_sql::parse::parse(&create_sql)
1966 .expect("invalid create sql persisted to catalog")
1967 .into_element()
1968 .ast;
1969
1970 mz_sql::ast::transform::create_stmt_rename_schema_refs(
1972 &mut create_stmt,
1973 database_name,
1974 cur_schema_name,
1975 new_schema_name,
1976 )?;
1977
1978 Ok(create_stmt.to_ast_string_stable())
1979 };
1980
1981 match self {
1982 CatalogItem::Table(i) => {
1983 let mut i = i.clone();
1984 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1985 Ok(CatalogItem::Table(i))
1986 }
1987 CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
1988 CatalogItem::Source(i) => {
1989 let mut i = i.clone();
1990 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1991 Ok(CatalogItem::Source(i))
1992 }
1993 CatalogItem::Sink(i) => {
1994 let mut i = i.clone();
1995 i.create_sql = do_rewrite(i.create_sql)?;
1996 Ok(CatalogItem::Sink(i))
1997 }
1998 CatalogItem::View(i) => {
1999 let mut i = i.clone();
2000 i.create_sql = do_rewrite(i.create_sql)?;
2001 Ok(CatalogItem::View(i))
2002 }
2003 CatalogItem::MaterializedView(i) => {
2004 let mut i = i.clone();
2005 i.create_sql = do_rewrite(i.create_sql)?;
2006 Ok(CatalogItem::MaterializedView(i))
2007 }
2008 CatalogItem::Index(i) => {
2009 let mut i = i.clone();
2010 i.create_sql = do_rewrite(i.create_sql)?;
2011 Ok(CatalogItem::Index(i))
2012 }
2013 CatalogItem::Secret(i) => {
2014 let mut i = i.clone();
2015 i.create_sql = do_rewrite(i.create_sql)?;
2016 Ok(CatalogItem::Secret(i))
2017 }
2018 CatalogItem::Connection(i) => {
2019 let mut i = i.clone();
2020 i.create_sql = do_rewrite(i.create_sql)?;
2021 Ok(CatalogItem::Connection(i))
2022 }
2023 CatalogItem::Type(i) => {
2024 let mut i = i.clone();
2025 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2026 Ok(CatalogItem::Type(i))
2027 }
2028 CatalogItem::Func(i) => Ok(CatalogItem::Func(i.clone())),
2029 CatalogItem::ContinualTask(i) => {
2030 let mut i = i.clone();
2031 i.create_sql = do_rewrite(i.create_sql)?;
2032 Ok(CatalogItem::ContinualTask(i))
2033 }
2034 }
2035 }
2036
2037 pub fn rename_item_refs(
2041 &self,
2042 from: FullItemName,
2043 to_item_name: String,
2044 rename_self: bool,
2045 ) -> Result<CatalogItem, String> {
2046 let do_rewrite = |create_sql: String| -> Result<String, String> {
2047 let mut create_stmt = mz_sql::parse::parse(&create_sql)
2048 .expect("invalid create sql persisted to catalog")
2049 .into_element()
2050 .ast;
2051 if rename_self {
2052 mz_sql::ast::transform::create_stmt_rename(&mut create_stmt, to_item_name.clone());
2053 }
2054 mz_sql::ast::transform::create_stmt_rename_refs(&mut create_stmt, from, to_item_name)?;
2056 Ok(create_stmt.to_ast_string_stable())
2057 };
2058
2059 match self {
2060 CatalogItem::Table(i) => {
2061 let mut i = i.clone();
2062 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2063 Ok(CatalogItem::Table(i))
2064 }
2065 CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
2066 CatalogItem::Source(i) => {
2067 let mut i = i.clone();
2068 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2069 Ok(CatalogItem::Source(i))
2070 }
2071 CatalogItem::Sink(i) => {
2072 let mut i = i.clone();
2073 i.create_sql = do_rewrite(i.create_sql)?;
2074 Ok(CatalogItem::Sink(i))
2075 }
2076 CatalogItem::View(i) => {
2077 let mut i = i.clone();
2078 i.create_sql = do_rewrite(i.create_sql)?;
2079 Ok(CatalogItem::View(i))
2080 }
2081 CatalogItem::MaterializedView(i) => {
2082 let mut i = i.clone();
2083 i.create_sql = do_rewrite(i.create_sql)?;
2084 Ok(CatalogItem::MaterializedView(i))
2085 }
2086 CatalogItem::Index(i) => {
2087 let mut i = i.clone();
2088 i.create_sql = do_rewrite(i.create_sql)?;
2089 Ok(CatalogItem::Index(i))
2090 }
2091 CatalogItem::Secret(i) => {
2092 let mut i = i.clone();
2093 i.create_sql = do_rewrite(i.create_sql)?;
2094 Ok(CatalogItem::Secret(i))
2095 }
2096 CatalogItem::Func(_) | CatalogItem::Type(_) => {
2097 unreachable!("{}s cannot be renamed", self.typ())
2098 }
2099 CatalogItem::Connection(i) => {
2100 let mut i = i.clone();
2101 i.create_sql = do_rewrite(i.create_sql)?;
2102 Ok(CatalogItem::Connection(i))
2103 }
2104 CatalogItem::ContinualTask(i) => {
2105 let mut i = i.clone();
2106 i.create_sql = do_rewrite(i.create_sql)?;
2107 Ok(CatalogItem::ContinualTask(i))
2108 }
2109 }
2110 }
2111
2112 pub fn replace_item_refs(&self, old_id: CatalogItemId, new_id: CatalogItemId) -> CatalogItem {
2114 let do_rewrite = |create_sql: String| -> String {
2115 let mut create_stmt = mz_sql::parse::parse(&create_sql)
2116 .expect("invalid create sql persisted to catalog")
2117 .into_element()
2118 .ast;
2119 mz_sql::ast::transform::create_stmt_replace_ids(
2120 &mut create_stmt,
2121 &[(old_id, new_id)].into(),
2122 );
2123 create_stmt.to_ast_string_stable()
2124 };
2125
2126 match self {
2127 CatalogItem::Table(i) => {
2128 let mut i = i.clone();
2129 i.create_sql = i.create_sql.map(do_rewrite);
2130 CatalogItem::Table(i)
2131 }
2132 CatalogItem::Log(i) => CatalogItem::Log(i.clone()),
2133 CatalogItem::Source(i) => {
2134 let mut i = i.clone();
2135 i.create_sql = i.create_sql.map(do_rewrite);
2136 CatalogItem::Source(i)
2137 }
2138 CatalogItem::Sink(i) => {
2139 let mut i = i.clone();
2140 i.create_sql = do_rewrite(i.create_sql);
2141 CatalogItem::Sink(i)
2142 }
2143 CatalogItem::View(i) => {
2144 let mut i = i.clone();
2145 i.create_sql = do_rewrite(i.create_sql);
2146 CatalogItem::View(i)
2147 }
2148 CatalogItem::MaterializedView(i) => {
2149 let mut i = i.clone();
2150 i.create_sql = do_rewrite(i.create_sql);
2151 CatalogItem::MaterializedView(i)
2152 }
2153 CatalogItem::Index(i) => {
2154 let mut i = i.clone();
2155 i.create_sql = do_rewrite(i.create_sql);
2156 CatalogItem::Index(i)
2157 }
2158 CatalogItem::Secret(i) => {
2159 let mut i = i.clone();
2160 i.create_sql = do_rewrite(i.create_sql);
2161 CatalogItem::Secret(i)
2162 }
2163 CatalogItem::Func(_) | CatalogItem::Type(_) => {
2164 unreachable!("references of {}s cannot be replaced", self.typ())
2165 }
2166 CatalogItem::Connection(i) => {
2167 let mut i = i.clone();
2168 i.create_sql = do_rewrite(i.create_sql);
2169 CatalogItem::Connection(i)
2170 }
2171 CatalogItem::ContinualTask(i) => {
2172 let mut i = i.clone();
2173 i.create_sql = do_rewrite(i.create_sql);
2174 CatalogItem::ContinualTask(i)
2175 }
2176 }
2177 }
2178 pub fn update_retain_history(
2181 &mut self,
2182 value: Option<Value>,
2183 window: CompactionWindow,
2184 ) -> Result<Option<WithOptionValue<Raw>>, ()> {
2185 let update = |mut ast: &mut Statement<Raw>| {
2186 macro_rules! update_retain_history {
2188 ( $stmt:ident, $opt:ident, $name:ident ) => {{
2189 let pos = $stmt
2191 .with_options
2192 .iter()
2193 .rposition(|o| o.name == mz_sql_parser::ast::$name::RetainHistory);
2195 if let Some(value) = value {
2196 let next = mz_sql_parser::ast::$opt {
2197 name: mz_sql_parser::ast::$name::RetainHistory,
2198 value: Some(WithOptionValue::RetainHistoryFor(value)),
2199 };
2200 if let Some(idx) = pos {
2201 let previous = $stmt.with_options[idx].clone();
2202 $stmt.with_options[idx] = next;
2203 previous.value
2204 } else {
2205 $stmt.with_options.push(next);
2206 None
2207 }
2208 } else {
2209 if let Some(idx) = pos {
2210 $stmt.with_options.swap_remove(idx).value
2211 } else {
2212 None
2213 }
2214 }
2215 }};
2216 }
2217 let previous = match &mut ast {
2218 Statement::CreateTable(stmt) => {
2219 update_retain_history!(stmt, TableOption, TableOptionName)
2220 }
2221 Statement::CreateIndex(stmt) => {
2222 update_retain_history!(stmt, IndexOption, IndexOptionName)
2223 }
2224 Statement::CreateSource(stmt) => {
2225 update_retain_history!(stmt, CreateSourceOption, CreateSourceOptionName)
2226 }
2227 Statement::CreateMaterializedView(stmt) => {
2228 update_retain_history!(stmt, MaterializedViewOption, MaterializedViewOptionName)
2229 }
2230 _ => {
2231 return Err(());
2232 }
2233 };
2234 Ok(previous)
2235 };
2236
2237 let res = self.update_sql(update)?;
2238 let cw = self
2239 .custom_logical_compaction_window_mut()
2240 .expect("item must have compaction window");
2241 *cw = Some(window);
2242 Ok(res)
2243 }
2244
2245 pub fn update_timestamp_interval(
2247 &mut self,
2248 value: Option<Value>,
2249 interval: Duration,
2250 ) -> Result<(), ()> {
2251 let update = |ast: &mut Statement<Raw>| {
2252 match ast {
2253 Statement::CreateSource(stmt) => {
2254 let pos = stmt.with_options.iter().rposition(|o| {
2255 o.name == mz_sql_parser::ast::CreateSourceOptionName::TimestampInterval
2256 });
2257 if let Some(value) = value {
2258 let next = mz_sql_parser::ast::CreateSourceOption {
2259 name: mz_sql_parser::ast::CreateSourceOptionName::TimestampInterval,
2260 value: Some(WithOptionValue::Value(value)),
2261 };
2262 if let Some(idx) = pos {
2263 stmt.with_options[idx] = next;
2264 } else {
2265 stmt.with_options.push(next);
2266 }
2267 } else {
2268 if let Some(idx) = pos {
2269 stmt.with_options.swap_remove(idx);
2270 }
2271 }
2272 }
2273 _ => return Err(()),
2274 };
2275 Ok(())
2276 };
2277
2278 self.update_sql(update)?;
2279
2280 match self {
2282 CatalogItem::Source(source) => {
2283 match &mut source.data_source {
2284 DataSourceDesc::Ingestion { desc, .. }
2285 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
2286 desc.timestamp_interval = interval;
2287 }
2288 _ => return Err(()),
2289 }
2290 Ok(())
2291 }
2292 _ => Err(()),
2293 }
2294 }
2295
2296 pub fn add_column(
2297 &mut self,
2298 name: ColumnName,
2299 typ: SqlColumnType,
2300 sql: RawDataType,
2301 ) -> Result<RelationVersion, PlanError> {
2302 let CatalogItem::Table(table) = self else {
2303 return Err(PlanError::Unsupported {
2304 feature: "adding columns to a non-Table".to_string(),
2305 discussion_no: None,
2306 });
2307 };
2308 let next_version = table.desc.add_column(name.clone(), typ);
2309
2310 let update = |mut ast: &mut Statement<Raw>| match &mut ast {
2311 Statement::CreateTable(stmt) => {
2312 let version = ColumnOptionDef {
2313 name: None,
2314 option: ColumnOption::Versioned {
2315 action: ColumnVersioned::Added,
2316 version: next_version.into(),
2317 },
2318 };
2319 let column = ColumnDef {
2320 name: name.into(),
2321 data_type: sql,
2322 collation: None,
2323 options: vec![version],
2324 };
2325 stmt.columns.push(column);
2326 Ok(())
2327 }
2328 _ => Err(()),
2329 };
2330
2331 self.update_sql(update)
2332 .map_err(|()| PlanError::Unstructured("expected CREATE TABLE statement".to_string()))?;
2333 Ok(next_version)
2334 }
2335
2336 pub fn update_sql<F, T>(&mut self, f: F) -> Result<T, ()>
2339 where
2340 F: FnOnce(&mut Statement<Raw>) -> Result<T, ()>,
2341 {
2342 let create_sql = match self {
2343 CatalogItem::Table(Table { create_sql, .. })
2344 | CatalogItem::Type(Type { create_sql, .. })
2345 | CatalogItem::Source(Source { create_sql, .. }) => create_sql.as_mut(),
2346 CatalogItem::Sink(Sink { create_sql, .. })
2347 | CatalogItem::View(View { create_sql, .. })
2348 | CatalogItem::MaterializedView(MaterializedView { create_sql, .. })
2349 | CatalogItem::Index(Index { create_sql, .. })
2350 | CatalogItem::Secret(Secret { create_sql, .. })
2351 | CatalogItem::Connection(Connection { create_sql, .. })
2352 | CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => Some(create_sql),
2353 CatalogItem::Func(_) | CatalogItem::Log(_) => None,
2354 };
2355 let Some(create_sql) = create_sql else {
2356 return Err(());
2357 };
2358 let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
2359 .expect("non-system items must be parseable")
2360 .into_element()
2361 .ast;
2362 debug!("rewrite: {}", ast.to_ast_string_redacted());
2363 let t = f(&mut ast)?;
2364 *create_sql = ast.to_ast_string_stable();
2365 debug!("rewrote: {}", ast.to_ast_string_redacted());
2366 Ok(t)
2367 }
2368
2369 pub fn is_compute_object_on_cluster(&self) -> Option<ClusterId> {
2376 match self {
2377 CatalogItem::Index(index) => Some(index.cluster_id),
2378 CatalogItem::Table(_)
2379 | CatalogItem::Source(_)
2380 | CatalogItem::Log(_)
2381 | CatalogItem::View(_)
2382 | CatalogItem::MaterializedView(_)
2383 | CatalogItem::Sink(_)
2384 | CatalogItem::Type(_)
2385 | CatalogItem::Func(_)
2386 | CatalogItem::Secret(_)
2387 | CatalogItem::Connection(_)
2388 | CatalogItem::ContinualTask(_) => None,
2389 }
2390 }
2391
2392 pub fn cluster_id(&self) -> Option<ClusterId> {
2393 match self {
2394 CatalogItem::MaterializedView(mv) => Some(mv.cluster_id),
2395 CatalogItem::Index(index) => Some(index.cluster_id),
2396 CatalogItem::Source(source) => match &source.data_source {
2397 DataSourceDesc::Ingestion { cluster_id, .. }
2398 | DataSourceDesc::OldSyntaxIngestion { cluster_id, .. } => Some(*cluster_id),
2399 DataSourceDesc::IngestionExport { .. } => None,
2403 DataSourceDesc::Webhook { cluster_id, .. } => Some(*cluster_id),
2404 DataSourceDesc::Introspection(_)
2405 | DataSourceDesc::Progress
2406 | DataSourceDesc::Catalog => None,
2407 },
2408 CatalogItem::Sink(sink) => Some(sink.cluster_id),
2409 CatalogItem::ContinualTask(ct) => Some(ct.cluster_id),
2410 CatalogItem::Table(_)
2411 | CatalogItem::Log(_)
2412 | CatalogItem::View(_)
2413 | CatalogItem::Type(_)
2414 | CatalogItem::Func(_)
2415 | CatalogItem::Secret(_)
2416 | CatalogItem::Connection(_) => None,
2417 }
2418 }
2419
2420 pub fn custom_logical_compaction_window(&self) -> Option<CompactionWindow> {
2423 match self {
2424 CatalogItem::Table(table) => table.custom_logical_compaction_window,
2425 CatalogItem::Source(source) => source.custom_logical_compaction_window,
2426 CatalogItem::Index(index) => index.custom_logical_compaction_window,
2427 CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
2428 CatalogItem::Log(_)
2429 | CatalogItem::View(_)
2430 | CatalogItem::Sink(_)
2431 | CatalogItem::Type(_)
2432 | CatalogItem::Func(_)
2433 | CatalogItem::Secret(_)
2434 | CatalogItem::Connection(_)
2435 | CatalogItem::ContinualTask(_) => None,
2436 }
2437 }
2438
2439 pub fn custom_logical_compaction_window_mut(
2443 &mut self,
2444 ) -> Option<&mut Option<CompactionWindow>> {
2445 let cw = match self {
2446 CatalogItem::Table(table) => &mut table.custom_logical_compaction_window,
2447 CatalogItem::Source(source) => &mut source.custom_logical_compaction_window,
2448 CatalogItem::Index(index) => &mut index.custom_logical_compaction_window,
2449 CatalogItem::MaterializedView(mview) => &mut mview.custom_logical_compaction_window,
2450 CatalogItem::Log(_)
2451 | CatalogItem::View(_)
2452 | CatalogItem::Sink(_)
2453 | CatalogItem::Type(_)
2454 | CatalogItem::Func(_)
2455 | CatalogItem::Secret(_)
2456 | CatalogItem::Connection(_)
2457 | CatalogItem::ContinualTask(_) => return None,
2458 };
2459 Some(cw)
2460 }
2461
2462 pub fn initial_logical_compaction_window(&self) -> Option<CompactionWindow> {
2470 let custom_logical_compaction_window = match self {
2471 CatalogItem::Table(_)
2472 | CatalogItem::Source(_)
2473 | CatalogItem::Index(_)
2474 | CatalogItem::MaterializedView(_)
2475 | CatalogItem::ContinualTask(_) => self.custom_logical_compaction_window(),
2476 CatalogItem::Log(_)
2477 | CatalogItem::View(_)
2478 | CatalogItem::Sink(_)
2479 | CatalogItem::Type(_)
2480 | CatalogItem::Func(_)
2481 | CatalogItem::Secret(_)
2482 | CatalogItem::Connection(_) => return None,
2483 };
2484 Some(custom_logical_compaction_window.unwrap_or(CompactionWindow::Default))
2485 }
2486
2487 pub fn is_retained_metrics_object(&self) -> bool {
2491 match self {
2492 CatalogItem::Table(table) => table.is_retained_metrics_object,
2493 CatalogItem::Source(source) => source.is_retained_metrics_object,
2494 CatalogItem::Index(index) => index.is_retained_metrics_object,
2495 CatalogItem::Log(_)
2496 | CatalogItem::View(_)
2497 | CatalogItem::MaterializedView(_)
2498 | CatalogItem::Sink(_)
2499 | CatalogItem::Type(_)
2500 | CatalogItem::Func(_)
2501 | CatalogItem::Secret(_)
2502 | CatalogItem::Connection(_)
2503 | CatalogItem::ContinualTask(_) => false,
2504 }
2505 }
2506
2507 pub fn to_serialized(&self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2508 match self {
2509 CatalogItem::Table(table) => {
2510 let create_sql = table
2511 .create_sql
2512 .clone()
2513 .expect("builtin tables cannot be serialized");
2514 let mut collections = table.collections.clone();
2515 let global_id = collections
2516 .remove(&RelationVersion::root())
2517 .expect("at least one version");
2518 (create_sql, global_id, collections)
2519 }
2520 CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2521 CatalogItem::Source(source) => {
2522 assert!(
2523 !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2524 "cannot serialize introspection/builtin sources",
2525 );
2526 let create_sql = source
2527 .create_sql
2528 .clone()
2529 .expect("builtin sources cannot be serialized");
2530 (create_sql, source.global_id, BTreeMap::new())
2531 }
2532 CatalogItem::View(view) => (view.create_sql.clone(), view.global_id, BTreeMap::new()),
2533 CatalogItem::MaterializedView(mview) => {
2534 let mut collections = mview.collections.clone();
2535 let global_id = collections
2536 .remove(&RelationVersion::root())
2537 .expect("at least one version");
2538 (mview.create_sql.clone(), global_id, collections)
2539 }
2540 CatalogItem::Index(index) => {
2541 (index.create_sql.clone(), index.global_id, BTreeMap::new())
2542 }
2543 CatalogItem::Sink(sink) => (sink.create_sql.clone(), sink.global_id, BTreeMap::new()),
2544 CatalogItem::Type(typ) => {
2545 let create_sql = typ
2546 .create_sql
2547 .clone()
2548 .expect("builtin types cannot be serialized");
2549 (create_sql, typ.global_id, BTreeMap::new())
2550 }
2551 CatalogItem::Secret(secret) => {
2552 (secret.create_sql.clone(), secret.global_id, BTreeMap::new())
2553 }
2554 CatalogItem::Connection(connection) => (
2555 connection.create_sql.clone(),
2556 connection.global_id,
2557 BTreeMap::new(),
2558 ),
2559 CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2560 CatalogItem::ContinualTask(ct) => {
2561 (ct.create_sql.clone(), ct.global_id, BTreeMap::new())
2562 }
2563 }
2564 }
2565
2566 pub fn into_serialized(self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2567 match self {
2568 CatalogItem::Table(mut table) => {
2569 let create_sql = table
2570 .create_sql
2571 .expect("builtin tables cannot be serialized");
2572 let global_id = table
2573 .collections
2574 .remove(&RelationVersion::root())
2575 .expect("at least one version");
2576 (create_sql, global_id, table.collections)
2577 }
2578 CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2579 CatalogItem::Source(source) => {
2580 assert!(
2581 !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2582 "cannot serialize introspection/builtin sources",
2583 );
2584 let create_sql = source
2585 .create_sql
2586 .expect("builtin sources cannot be serialized");
2587 (create_sql, source.global_id, BTreeMap::new())
2588 }
2589 CatalogItem::View(view) => (view.create_sql, view.global_id, BTreeMap::new()),
2590 CatalogItem::MaterializedView(mut mview) => {
2591 let global_id = mview
2592 .collections
2593 .remove(&RelationVersion::root())
2594 .expect("at least one version");
2595 (mview.create_sql, global_id, mview.collections)
2596 }
2597 CatalogItem::Index(index) => (index.create_sql, index.global_id, BTreeMap::new()),
2598 CatalogItem::Sink(sink) => (sink.create_sql, sink.global_id, BTreeMap::new()),
2599 CatalogItem::Type(typ) => {
2600 let create_sql = typ.create_sql.expect("builtin types cannot be serialized");
2601 (create_sql, typ.global_id, BTreeMap::new())
2602 }
2603 CatalogItem::Secret(secret) => (secret.create_sql, secret.global_id, BTreeMap::new()),
2604 CatalogItem::Connection(connection) => {
2605 (connection.create_sql, connection.global_id, BTreeMap::new())
2606 }
2607 CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2608 CatalogItem::ContinualTask(ct) => (ct.create_sql, ct.global_id, BTreeMap::new()),
2609 }
2610 }
2611
2612 pub fn global_id_for_version(&self, version: RelationVersionSelector) -> Option<GlobalId> {
2615 let collections = match self {
2616 CatalogItem::MaterializedView(mv) => &mv.collections,
2617 CatalogItem::Table(table) => &table.collections,
2618 CatalogItem::Source(source) => return Some(source.global_id),
2619 CatalogItem::Log(log) => return Some(log.global_id),
2620 CatalogItem::View(view) => return Some(view.global_id),
2621 CatalogItem::Sink(sink) => return Some(sink.global_id),
2622 CatalogItem::Index(index) => return Some(index.global_id),
2623 CatalogItem::Type(ty) => return Some(ty.global_id),
2624 CatalogItem::Func(func) => return Some(func.global_id),
2625 CatalogItem::Secret(secret) => return Some(secret.global_id),
2626 CatalogItem::Connection(conn) => return Some(conn.global_id),
2627 CatalogItem::ContinualTask(ct) => return Some(ct.global_id),
2628 };
2629 match version {
2630 RelationVersionSelector::Latest => collections.values().last().copied(),
2631 RelationVersionSelector::Specific(version) => collections.get(&version).copied(),
2632 }
2633 }
2634}
2635
2636impl CatalogEntry {
2637 pub fn relation_desc_latest(&self) -> Option<Cow<'_, RelationDesc>> {
2640 self.item.relation_desc(RelationVersionSelector::Latest)
2641 }
2642
2643 pub fn has_columns(&self) -> bool {
2645 match self.item() {
2646 CatalogItem::Type(Type { details, .. }) => {
2647 matches!(details.typ, CatalogType::Record { .. })
2648 }
2649 _ => self.relation_desc_latest().is_some(),
2650 }
2651 }
2652
2653 pub fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
2655 self.item.func(self)
2656 }
2657
2658 pub fn index(&self) -> Option<&Index> {
2660 match self.item() {
2661 CatalogItem::Index(idx) => Some(idx),
2662 _ => None,
2663 }
2664 }
2665
2666 pub fn materialized_view(&self) -> Option<&MaterializedView> {
2668 match self.item() {
2669 CatalogItem::MaterializedView(mv) => Some(mv),
2670 _ => None,
2671 }
2672 }
2673
2674 pub fn table(&self) -> Option<&Table> {
2676 match self.item() {
2677 CatalogItem::Table(tbl) => Some(tbl),
2678 _ => None,
2679 }
2680 }
2681
2682 pub fn source(&self) -> Option<&Source> {
2684 match self.item() {
2685 CatalogItem::Source(src) => Some(src),
2686 _ => None,
2687 }
2688 }
2689
2690 pub fn sink(&self) -> Option<&Sink> {
2692 match self.item() {
2693 CatalogItem::Sink(sink) => Some(sink),
2694 _ => None,
2695 }
2696 }
2697
2698 pub fn secret(&self) -> Option<&Secret> {
2700 match self.item() {
2701 CatalogItem::Secret(secret) => Some(secret),
2702 _ => None,
2703 }
2704 }
2705
2706 pub fn connection(&self) -> Result<&Connection, SqlCatalogError> {
2707 match self.item() {
2708 CatalogItem::Connection(connection) => Ok(connection),
2709 _ => {
2710 let db_name = match self.name().qualifiers.database_spec {
2711 ResolvedDatabaseSpecifier::Ambient => "".to_string(),
2712 ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
2713 };
2714 Err(SqlCatalogError::UnknownConnection(format!(
2715 "{}{}.{}",
2716 db_name,
2717 self.name().qualifiers.schema_spec,
2718 self.name().item
2719 )))
2720 }
2721 }
2722 }
2723
2724 pub fn source_desc(
2727 &self,
2728 ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
2729 self.item.source_desc(self)
2730 }
2731
2732 pub fn is_connection(&self) -> bool {
2734 matches!(self.item(), CatalogItem::Connection(_))
2735 }
2736
2737 pub fn is_table(&self) -> bool {
2739 matches!(self.item(), CatalogItem::Table(_))
2740 }
2741
2742 pub fn is_source(&self) -> bool {
2745 matches!(self.item(), CatalogItem::Source(_))
2746 }
2747
2748 pub fn subsource_details(
2751 &self,
2752 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
2753 match &self.item() {
2754 CatalogItem::Source(source) => match &source.data_source {
2755 DataSourceDesc::IngestionExport {
2756 ingestion_id,
2757 external_reference,
2758 details,
2759 data_config: _,
2760 } => Some((*ingestion_id, external_reference, details)),
2761 _ => None,
2762 },
2763 _ => None,
2764 }
2765 }
2766
2767 pub fn source_export_details(
2770 &self,
2771 ) -> Option<(
2772 CatalogItemId,
2773 &UnresolvedItemName,
2774 &SourceExportDetails,
2775 &SourceExportDataConfig<ReferencedConnection>,
2776 )> {
2777 match &self.item() {
2778 CatalogItem::Source(source) => match &source.data_source {
2779 DataSourceDesc::IngestionExport {
2780 ingestion_id,
2781 external_reference,
2782 details,
2783 data_config,
2784 } => Some((*ingestion_id, external_reference, details, data_config)),
2785 _ => None,
2786 },
2787 CatalogItem::Table(table) => match &table.data_source {
2788 TableDataSource::DataSource {
2789 desc:
2790 DataSourceDesc::IngestionExport {
2791 ingestion_id,
2792 external_reference,
2793 details,
2794 data_config,
2795 },
2796 timeline: _,
2797 } => Some((*ingestion_id, external_reference, details, data_config)),
2798 _ => None,
2799 },
2800 _ => None,
2801 }
2802 }
2803
2804 pub fn is_progress_source(&self) -> bool {
2806 self.item().is_progress_source()
2807 }
2808
2809 pub fn progress_id(&self) -> Option<CatalogItemId> {
2811 match &self.item() {
2812 CatalogItem::Source(source) => match &source.data_source {
2813 DataSourceDesc::Ingestion { .. } => Some(self.id),
2814 DataSourceDesc::OldSyntaxIngestion {
2815 progress_subsource, ..
2816 } => Some(*progress_subsource),
2817 DataSourceDesc::IngestionExport { .. }
2818 | DataSourceDesc::Introspection(_)
2819 | DataSourceDesc::Progress
2820 | DataSourceDesc::Webhook { .. }
2821 | DataSourceDesc::Catalog => None,
2822 },
2823 CatalogItem::Table(_)
2824 | CatalogItem::Log(_)
2825 | CatalogItem::View(_)
2826 | CatalogItem::MaterializedView(_)
2827 | CatalogItem::Sink(_)
2828 | CatalogItem::Index(_)
2829 | CatalogItem::Type(_)
2830 | CatalogItem::Func(_)
2831 | CatalogItem::Secret(_)
2832 | CatalogItem::Connection(_)
2833 | CatalogItem::ContinualTask(_) => None,
2834 }
2835 }
2836
2837 pub fn is_sink(&self) -> bool {
2839 matches!(self.item(), CatalogItem::Sink(_))
2840 }
2841
2842 pub fn is_materialized_view(&self) -> bool {
2844 matches!(self.item(), CatalogItem::MaterializedView(_))
2845 }
2846
2847 pub fn is_view(&self) -> bool {
2849 matches!(self.item(), CatalogItem::View(_))
2850 }
2851
2852 pub fn is_secret(&self) -> bool {
2854 matches!(self.item(), CatalogItem::Secret(_))
2855 }
2856
2857 pub fn is_introspection_source(&self) -> bool {
2859 matches!(self.item(), CatalogItem::Log(_))
2860 }
2861
2862 pub fn is_index(&self) -> bool {
2864 matches!(self.item(), CatalogItem::Index(_))
2865 }
2866
2867 pub fn is_continual_task(&self) -> bool {
2869 matches!(self.item(), CatalogItem::ContinualTask(_))
2870 }
2871
2872 pub fn is_relation(&self) -> bool {
2874 mz_sql::catalog::ObjectType::from(self.item_type()).is_relation()
2875 }
2876
2877 pub fn references(&self) -> &ResolvedIds {
2880 self.item.references()
2881 }
2882
2883 pub fn uses(&self) -> BTreeSet<CatalogItemId> {
2889 self.item.uses()
2890 }
2891
2892 pub fn item(&self) -> &CatalogItem {
2894 &self.item
2895 }
2896
2897 pub fn id(&self) -> CatalogItemId {
2899 self.id
2900 }
2901
2902 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2904 self.item().global_ids()
2905 }
2906
2907 pub fn latest_global_id(&self) -> GlobalId {
2908 self.item().latest_global_id()
2909 }
2910
2911 pub fn oid(&self) -> u32 {
2913 self.oid
2914 }
2915
2916 pub fn name(&self) -> &QualifiedItemName {
2918 &self.name
2919 }
2920
2921 pub fn referenced_by(&self) -> &[CatalogItemId] {
2923 &self.referenced_by
2924 }
2925
2926 pub fn used_by(&self) -> &[CatalogItemId] {
2928 &self.used_by
2929 }
2930
2931 pub fn conn_id(&self) -> Option<&ConnectionId> {
2934 self.item.conn_id()
2935 }
2936
2937 pub fn owner_id(&self) -> &RoleId {
2939 &self.owner_id
2940 }
2941
2942 pub fn privileges(&self) -> &PrivilegeMap {
2944 &self.privileges
2945 }
2946
2947 pub fn comment_object_id(&self) -> CommentObjectId {
2949 use CatalogItemType::*;
2950 match self.item_type() {
2951 Table => CommentObjectId::Table(self.id),
2952 Source => CommentObjectId::Source(self.id),
2953 Sink => CommentObjectId::Sink(self.id),
2954 View => CommentObjectId::View(self.id),
2955 MaterializedView => CommentObjectId::MaterializedView(self.id),
2956 Index => CommentObjectId::Index(self.id),
2957 Func => CommentObjectId::Func(self.id),
2958 Connection => CommentObjectId::Connection(self.id),
2959 Type => CommentObjectId::Type(self.id),
2960 Secret => CommentObjectId::Secret(self.id),
2961 ContinualTask => CommentObjectId::ContinualTask(self.id),
2962 }
2963 }
2964}
2965
2966#[derive(Debug, Clone, Default)]
2967pub struct CommentsMap {
2968 map: BTreeMap<CommentObjectId, BTreeMap<Option<usize>, String>>,
2969}
2970
2971impl CommentsMap {
2972 pub fn update_comment(
2973 &mut self,
2974 object_id: CommentObjectId,
2975 sub_component: Option<usize>,
2976 comment: Option<String>,
2977 ) -> Option<String> {
2978 let object_comments = self.map.entry(object_id).or_default();
2979
2980 let (empty, prev) = if let Some(comment) = comment {
2982 let prev = object_comments.insert(sub_component, comment);
2983 (false, prev)
2984 } else {
2985 let prev = object_comments.remove(&sub_component);
2986 (object_comments.is_empty(), prev)
2987 };
2988
2989 if empty {
2991 self.map.remove(&object_id);
2992 }
2993
2994 prev
2996 }
2997
2998 pub fn drop_comments(
3004 &mut self,
3005 object_ids: &BTreeSet<CommentObjectId>,
3006 ) -> Vec<(CommentObjectId, Option<usize>, String)> {
3007 let mut removed_comments = Vec::new();
3008
3009 for object_id in object_ids {
3010 if let Some(comments) = self.map.remove(object_id) {
3011 let removed = comments
3012 .into_iter()
3013 .map(|(sub_comp, comment)| (object_id.clone(), sub_comp, comment));
3014 removed_comments.extend(removed);
3015 }
3016 }
3017
3018 removed_comments
3019 }
3020
3021 pub fn iter(&self) -> impl Iterator<Item = (CommentObjectId, Option<usize>, &str)> {
3022 self.map
3023 .iter()
3024 .map(|(id, comments)| {
3025 comments
3026 .iter()
3027 .map(|(pos, comment)| (*id, *pos, comment.as_str()))
3028 })
3029 .flatten()
3030 }
3031
3032 pub fn get_object_comments(
3033 &self,
3034 object_id: CommentObjectId,
3035 ) -> Option<&BTreeMap<Option<usize>, String>> {
3036 self.map.get(&object_id)
3037 }
3038}
3039
3040impl Serialize for CommentsMap {
3041 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
3042 where
3043 S: serde::Serializer,
3044 {
3045 let comment_count = self
3046 .map
3047 .iter()
3048 .map(|(_object_id, comments)| comments.len())
3049 .sum();
3050
3051 let mut seq = serializer.serialize_seq(Some(comment_count))?;
3052 for (object_id, sub) in &self.map {
3053 for (sub_component, comment) in sub {
3054 seq.serialize_element(&(
3055 format!("{object_id:?}"),
3056 format!("{sub_component:?}"),
3057 comment,
3058 ))?;
3059 }
3060 }
3061 seq.end()
3062 }
3063}
3064
3065#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
3066pub struct DefaultPrivileges {
3067 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
3068 privileges: BTreeMap<DefaultPrivilegeObject, RoleDefaultPrivileges>,
3069}
3070
3071#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
3074struct RoleDefaultPrivileges(
3075 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
3077 BTreeMap<RoleId, DefaultPrivilegeAclItem>,
3078);
3079
3080impl Deref for RoleDefaultPrivileges {
3081 type Target = BTreeMap<RoleId, DefaultPrivilegeAclItem>;
3082
3083 fn deref(&self) -> &Self::Target {
3084 &self.0
3085 }
3086}
3087
3088impl DerefMut for RoleDefaultPrivileges {
3089 fn deref_mut(&mut self) -> &mut Self::Target {
3090 &mut self.0
3091 }
3092}
3093
3094impl DefaultPrivileges {
3095 pub fn grant(&mut self, object: DefaultPrivilegeObject, privilege: DefaultPrivilegeAclItem) {
3097 if privilege.acl_mode.is_empty() {
3098 return;
3099 }
3100
3101 let privileges = self.privileges.entry(object).or_default();
3102 if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
3103 default_privilege.acl_mode |= privilege.acl_mode;
3104 } else {
3105 privileges.insert(privilege.grantee, privilege);
3106 }
3107 }
3108
3109 pub fn revoke(&mut self, object: &DefaultPrivilegeObject, privilege: &DefaultPrivilegeAclItem) {
3111 if let Some(privileges) = self.privileges.get_mut(object) {
3112 if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
3113 default_privilege.acl_mode =
3114 default_privilege.acl_mode.difference(privilege.acl_mode);
3115 if default_privilege.acl_mode.is_empty() {
3116 privileges.remove(&privilege.grantee);
3117 }
3118 }
3119 if privileges.is_empty() {
3120 self.privileges.remove(object);
3121 }
3122 }
3123 }
3124
3125 pub fn get_privileges_for_grantee(
3128 &self,
3129 object: &DefaultPrivilegeObject,
3130 grantee: &RoleId,
3131 ) -> Option<&AclMode> {
3132 self.privileges
3133 .get(object)
3134 .and_then(|privileges| privileges.get(grantee))
3135 .map(|privilege| &privilege.acl_mode)
3136 }
3137
3138 pub fn get_applicable_privileges(
3140 &self,
3141 role_id: RoleId,
3142 database_id: Option<DatabaseId>,
3143 schema_id: Option<SchemaId>,
3144 object_type: mz_sql::catalog::ObjectType,
3145 ) -> impl Iterator<Item = DefaultPrivilegeAclItem> + '_ {
3146 let privilege_object_type = if object_type.is_relation() {
3150 mz_sql::catalog::ObjectType::Table
3151 } else {
3152 object_type
3153 };
3154 let valid_acl_mode = rbac::all_object_privileges(SystemObjectType::Object(object_type));
3155
3156 [
3160 DefaultPrivilegeObject {
3161 role_id,
3162 database_id,
3163 schema_id,
3164 object_type: privilege_object_type,
3165 },
3166 DefaultPrivilegeObject {
3167 role_id,
3168 database_id,
3169 schema_id: None,
3170 object_type: privilege_object_type,
3171 },
3172 DefaultPrivilegeObject {
3173 role_id,
3174 database_id: None,
3175 schema_id: None,
3176 object_type: privilege_object_type,
3177 },
3178 DefaultPrivilegeObject {
3179 role_id: RoleId::Public,
3180 database_id,
3181 schema_id,
3182 object_type: privilege_object_type,
3183 },
3184 DefaultPrivilegeObject {
3185 role_id: RoleId::Public,
3186 database_id,
3187 schema_id: None,
3188 object_type: privilege_object_type,
3189 },
3190 DefaultPrivilegeObject {
3191 role_id: RoleId::Public,
3192 database_id: None,
3193 schema_id: None,
3194 object_type: privilege_object_type,
3195 },
3196 ]
3197 .into_iter()
3198 .filter_map(|object| self.privileges.get(&object))
3199 .flat_map(|acl_map| acl_map.values())
3200 .fold(
3202 BTreeMap::new(),
3203 |mut accum, DefaultPrivilegeAclItem { grantee, acl_mode }| {
3204 let accum_acl_mode = accum.entry(grantee).or_insert_with(AclMode::empty);
3205 *accum_acl_mode |= *acl_mode;
3206 accum
3207 },
3208 )
3209 .into_iter()
3210 .map(move |(grantee, acl_mode)| (grantee, acl_mode & valid_acl_mode))
3215 .filter(|(_, acl_mode)| !acl_mode.is_empty())
3217 .map(|(grantee, acl_mode)| DefaultPrivilegeAclItem {
3218 grantee: *grantee,
3219 acl_mode,
3220 })
3221 }
3222
3223 pub fn iter(
3224 &self,
3225 ) -> impl Iterator<
3226 Item = (
3227 &DefaultPrivilegeObject,
3228 impl Iterator<Item = &DefaultPrivilegeAclItem>,
3229 ),
3230 > {
3231 self.privileges
3232 .iter()
3233 .map(|(object, acl_map)| (object, acl_map.values()))
3234 }
3235}
3236
3237#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3238pub struct ClusterConfig {
3239 pub variant: ClusterVariant,
3240 pub workload_class: Option<String>,
3241}
3242
3243impl ClusterConfig {
3244 pub fn features(&self) -> Option<&OptimizerFeatureOverrides> {
3245 match &self.variant {
3246 ClusterVariant::Managed(managed) => Some(&managed.optimizer_feature_overrides),
3247 ClusterVariant::Unmanaged => None,
3248 }
3249 }
3250}
3251
3252impl From<ClusterConfig> for durable::ClusterConfig {
3253 fn from(config: ClusterConfig) -> Self {
3254 Self {
3255 variant: config.variant.into(),
3256 workload_class: config.workload_class,
3257 }
3258 }
3259}
3260
3261impl From<durable::ClusterConfig> for ClusterConfig {
3262 fn from(config: durable::ClusterConfig) -> Self {
3263 Self {
3264 variant: config.variant.into(),
3265 workload_class: config.workload_class,
3266 }
3267 }
3268}
3269
3270#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3271pub struct ClusterVariantManaged {
3272 pub size: String,
3273 pub availability_zones: Vec<String>,
3274 pub logging: ReplicaLogging,
3275 pub replication_factor: u32,
3276 pub optimizer_feature_overrides: OptimizerFeatureOverrides,
3277 pub schedule: ClusterSchedule,
3278}
3279
3280impl From<ClusterVariantManaged> for durable::ClusterVariantManaged {
3281 fn from(managed: ClusterVariantManaged) -> Self {
3282 Self {
3283 size: managed.size,
3284 availability_zones: managed.availability_zones,
3285 logging: managed.logging,
3286 replication_factor: managed.replication_factor,
3287 optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3288 schedule: managed.schedule,
3289 }
3290 }
3291}
3292
3293impl From<durable::ClusterVariantManaged> for ClusterVariantManaged {
3294 fn from(managed: durable::ClusterVariantManaged) -> Self {
3295 Self {
3296 size: managed.size,
3297 availability_zones: managed.availability_zones,
3298 logging: managed.logging,
3299 replication_factor: managed.replication_factor,
3300 optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3301 schedule: managed.schedule,
3302 }
3303 }
3304}
3305
3306#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3307pub enum ClusterVariant {
3308 Managed(ClusterVariantManaged),
3309 Unmanaged,
3310}
3311
3312impl From<ClusterVariant> for durable::ClusterVariant {
3313 fn from(variant: ClusterVariant) -> Self {
3314 match variant {
3315 ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3316 ClusterVariant::Unmanaged => Self::Unmanaged,
3317 }
3318 }
3319}
3320
3321impl From<durable::ClusterVariant> for ClusterVariant {
3322 fn from(variant: durable::ClusterVariant) -> Self {
3323 match variant {
3324 durable::ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3325 durable::ClusterVariant::Unmanaged => Self::Unmanaged,
3326 }
3327 }
3328}
3329
3330impl mz_sql::catalog::CatalogDatabase for Database {
3331 fn name(&self) -> &str {
3332 &self.name
3333 }
3334
3335 fn id(&self) -> DatabaseId {
3336 self.id
3337 }
3338
3339 fn has_schemas(&self) -> bool {
3340 !self.schemas_by_name.is_empty()
3341 }
3342
3343 fn schema_ids(&self) -> &BTreeMap<String, SchemaId> {
3344 &self.schemas_by_name
3345 }
3346
3347 #[allow(clippy::as_conversions)]
3349 fn schemas(&self) -> Vec<&dyn CatalogSchema> {
3350 self.schemas_by_id
3351 .values()
3352 .map(|schema| schema as &dyn CatalogSchema)
3353 .collect()
3354 }
3355
3356 fn owner_id(&self) -> RoleId {
3357 self.owner_id
3358 }
3359
3360 fn privileges(&self) -> &PrivilegeMap {
3361 &self.privileges
3362 }
3363}
3364
3365impl mz_sql::catalog::CatalogSchema for Schema {
3366 fn database(&self) -> &ResolvedDatabaseSpecifier {
3367 &self.name.database
3368 }
3369
3370 fn name(&self) -> &QualifiedSchemaName {
3371 &self.name
3372 }
3373
3374 fn id(&self) -> &SchemaSpecifier {
3375 &self.id
3376 }
3377
3378 fn has_items(&self) -> bool {
3379 !self.items.is_empty()
3380 }
3381
3382 fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_> {
3383 Box::new(
3384 self.items
3385 .values()
3386 .chain(self.functions.values())
3387 .chain(self.types.values())
3388 .copied(),
3389 )
3390 }
3391
3392 fn owner_id(&self) -> RoleId {
3393 self.owner_id
3394 }
3395
3396 fn privileges(&self) -> &PrivilegeMap {
3397 &self.privileges
3398 }
3399}
3400
3401impl mz_sql::catalog::CatalogRole for Role {
3402 fn name(&self) -> &str {
3403 &self.name
3404 }
3405
3406 fn id(&self) -> RoleId {
3407 self.id
3408 }
3409
3410 fn membership(&self) -> &BTreeMap<RoleId, RoleId> {
3411 &self.membership.map
3412 }
3413
3414 fn attributes(&self) -> &RoleAttributes {
3415 &self.attributes
3416 }
3417
3418 fn vars(&self) -> &BTreeMap<String, OwnedVarInput> {
3419 &self.vars.map
3420 }
3421}
3422
3423impl mz_sql::catalog::CatalogNetworkPolicy for NetworkPolicy {
3424 fn name(&self) -> &str {
3425 &self.name
3426 }
3427
3428 fn id(&self) -> NetworkPolicyId {
3429 self.id
3430 }
3431
3432 fn owner_id(&self) -> RoleId {
3433 self.owner_id
3434 }
3435
3436 fn privileges(&self) -> &PrivilegeMap {
3437 &self.privileges
3438 }
3439}
3440
3441impl mz_sql::catalog::CatalogCluster<'_> for Cluster {
3442 fn name(&self) -> &str {
3443 &self.name
3444 }
3445
3446 fn id(&self) -> ClusterId {
3447 self.id
3448 }
3449
3450 fn bound_objects(&self) -> &BTreeSet<CatalogItemId> {
3451 &self.bound_objects
3452 }
3453
3454 fn replica_ids(&self) -> &BTreeMap<String, ReplicaId> {
3455 &self.replica_id_by_name_
3456 }
3457
3458 #[allow(clippy::as_conversions)]
3460 fn replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>> {
3461 self.replicas()
3462 .map(|replica| replica as &dyn CatalogClusterReplica)
3463 .collect()
3464 }
3465
3466 fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica<'_> {
3467 self.replica(id).expect("catalog out of sync")
3468 }
3469
3470 fn owner_id(&self) -> RoleId {
3471 self.owner_id
3472 }
3473
3474 fn privileges(&self) -> &PrivilegeMap {
3475 &self.privileges
3476 }
3477
3478 fn is_managed(&self) -> bool {
3479 self.is_managed()
3480 }
3481
3482 fn managed_size(&self) -> Option<&str> {
3483 match &self.config.variant {
3484 ClusterVariant::Managed(ClusterVariantManaged { size, .. }) => Some(size),
3485 ClusterVariant::Unmanaged => None,
3486 }
3487 }
3488
3489 fn schedule(&self) -> Option<&ClusterSchedule> {
3490 match &self.config.variant {
3491 ClusterVariant::Managed(ClusterVariantManaged { schedule, .. }) => Some(schedule),
3492 ClusterVariant::Unmanaged => None,
3493 }
3494 }
3495
3496 fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
3497 self.try_to_plan()
3498 }
3499}
3500
3501impl mz_sql::catalog::CatalogClusterReplica<'_> for ClusterReplica {
3502 fn name(&self) -> &str {
3503 &self.name
3504 }
3505
3506 fn cluster_id(&self) -> ClusterId {
3507 self.cluster_id
3508 }
3509
3510 fn replica_id(&self) -> ReplicaId {
3511 self.replica_id
3512 }
3513
3514 fn owner_id(&self) -> RoleId {
3515 self.owner_id
3516 }
3517
3518 fn internal(&self) -> bool {
3519 self.config.location.internal()
3520 }
3521}
3522
3523impl mz_sql::catalog::CatalogItem for CatalogEntry {
3524 fn name(&self) -> &QualifiedItemName {
3525 self.name()
3526 }
3527
3528 fn id(&self) -> CatalogItemId {
3529 self.id()
3530 }
3531
3532 fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
3533 Box::new(self.global_ids())
3534 }
3535
3536 fn oid(&self) -> u32 {
3537 self.oid()
3538 }
3539
3540 fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
3541 self.func()
3542 }
3543
3544 fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
3545 self.source_desc()
3546 }
3547
3548 fn connection(
3549 &self,
3550 ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
3551 {
3552 Ok(self.connection()?.details.to_connection())
3553 }
3554
3555 fn create_sql(&self) -> &str {
3556 match self.item() {
3557 CatalogItem::Table(Table { create_sql, .. }) => {
3558 create_sql.as_deref().unwrap_or("<builtin>")
3559 }
3560 CatalogItem::Source(Source { create_sql, .. }) => {
3561 create_sql.as_deref().unwrap_or("<builtin>")
3562 }
3563 CatalogItem::Sink(Sink { create_sql, .. }) => create_sql,
3564 CatalogItem::View(View { create_sql, .. }) => create_sql,
3565 CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) => create_sql,
3566 CatalogItem::Index(Index { create_sql, .. }) => create_sql,
3567 CatalogItem::Type(Type { create_sql, .. }) => {
3568 create_sql.as_deref().unwrap_or("<builtin>")
3569 }
3570 CatalogItem::Secret(Secret { create_sql, .. }) => create_sql,
3571 CatalogItem::Connection(Connection { create_sql, .. }) => create_sql,
3572 CatalogItem::Func(_) => "<builtin>",
3573 CatalogItem::Log(_) => "<builtin>",
3574 CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => create_sql,
3575 }
3576 }
3577
3578 fn item_type(&self) -> SqlCatalogItemType {
3579 self.item().typ()
3580 }
3581
3582 fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
3583 if let CatalogItem::Index(Index { keys, on, .. }) = self.item() {
3584 Some((keys, *on))
3585 } else {
3586 None
3587 }
3588 }
3589
3590 fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
3591 if let CatalogItem::Table(Table {
3592 data_source: TableDataSource::TableWrites { defaults },
3593 ..
3594 }) = self.item()
3595 {
3596 Some(defaults.as_slice())
3597 } else {
3598 None
3599 }
3600 }
3601
3602 fn replacement_target(&self) -> Option<CatalogItemId> {
3603 if let CatalogItem::MaterializedView(mv) = self.item() {
3604 mv.replacement_target
3605 } else {
3606 None
3607 }
3608 }
3609
3610 fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
3611 if let CatalogItem::Type(Type { details, .. }) = self.item() {
3612 Some(details)
3613 } else {
3614 None
3615 }
3616 }
3617
3618 fn references(&self) -> &ResolvedIds {
3619 self.references()
3620 }
3621
3622 fn uses(&self) -> BTreeSet<CatalogItemId> {
3623 self.uses()
3624 }
3625
3626 fn referenced_by(&self) -> &[CatalogItemId] {
3627 self.referenced_by()
3628 }
3629
3630 fn used_by(&self) -> &[CatalogItemId] {
3631 self.used_by()
3632 }
3633
3634 fn subsource_details(
3635 &self,
3636 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
3637 self.subsource_details()
3638 }
3639
3640 fn source_export_details(
3641 &self,
3642 ) -> Option<(
3643 CatalogItemId,
3644 &UnresolvedItemName,
3645 &SourceExportDetails,
3646 &SourceExportDataConfig<ReferencedConnection>,
3647 )> {
3648 self.source_export_details()
3649 }
3650
3651 fn is_progress_source(&self) -> bool {
3652 self.is_progress_source()
3653 }
3654
3655 fn progress_id(&self) -> Option<CatalogItemId> {
3656 self.progress_id()
3657 }
3658
3659 fn owner_id(&self) -> RoleId {
3660 self.owner_id
3661 }
3662
3663 fn privileges(&self) -> &PrivilegeMap {
3664 &self.privileges
3665 }
3666
3667 fn cluster_id(&self) -> Option<ClusterId> {
3668 self.item().cluster_id()
3669 }
3670
3671 fn at_version(
3672 &self,
3673 version: RelationVersionSelector,
3674 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
3675 Box::new(CatalogCollectionEntry {
3676 entry: self.clone(),
3677 version,
3678 })
3679 }
3680
3681 fn latest_version(&self) -> Option<RelationVersion> {
3682 self.table().map(|t| t.desc.latest_version())
3683 }
3684}
3685
3686#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3688pub struct StateUpdate {
3689 pub kind: StateUpdateKind,
3690 pub ts: Timestamp,
3691 pub diff: StateDiff,
3692}
3693
3694#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3698pub enum StateUpdateKind {
3699 Role(durable::objects::Role),
3700 RoleAuth(durable::objects::RoleAuth),
3701 Database(durable::objects::Database),
3702 Schema(durable::objects::Schema),
3703 DefaultPrivilege(durable::objects::DefaultPrivilege),
3704 SystemPrivilege(MzAclItem),
3705 SystemConfiguration(durable::objects::SystemConfiguration),
3706 Cluster(durable::objects::Cluster),
3707 NetworkPolicy(durable::objects::NetworkPolicy),
3708 IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3709 ClusterReplica(durable::objects::ClusterReplica),
3710 SourceReferences(durable::objects::SourceReferences),
3711 SystemObjectMapping(durable::objects::SystemObjectMapping),
3712 TemporaryItem(TemporaryItem),
3716 Item(durable::objects::Item),
3717 Comment(durable::objects::Comment),
3718 AuditLog(durable::objects::AuditLog),
3719 StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3721 UnfinalizedShard(durable::objects::UnfinalizedShard),
3722}
3723
3724#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
3726pub enum StateDiff {
3727 Retraction,
3728 Addition,
3729}
3730
3731impl From<StateDiff> for Diff {
3732 fn from(diff: StateDiff) -> Self {
3733 match diff {
3734 StateDiff::Retraction => Diff::MINUS_ONE,
3735 StateDiff::Addition => Diff::ONE,
3736 }
3737 }
3738}
3739impl TryFrom<Diff> for StateDiff {
3740 type Error = String;
3741
3742 fn try_from(diff: Diff) -> Result<Self, Self::Error> {
3743 match diff {
3744 Diff::MINUS_ONE => Ok(Self::Retraction),
3745 Diff::ONE => Ok(Self::Addition),
3746 diff => Err(format!("invalid diff {diff}")),
3747 }
3748 }
3749}
3750
3751#[derive(Debug, Clone, Ord, PartialOrd, PartialEq, Eq)]
3753pub struct TemporaryItem {
3754 pub id: CatalogItemId,
3755 pub oid: u32,
3756 pub global_id: GlobalId,
3757 pub schema_id: SchemaId,
3758 pub name: String,
3759 pub conn_id: Option<ConnectionId>,
3760 pub create_sql: String,
3761 pub owner_id: RoleId,
3762 pub privileges: Vec<MzAclItem>,
3763 pub extra_versions: BTreeMap<RelationVersion, GlobalId>,
3764}
3765
3766impl From<CatalogEntry> for TemporaryItem {
3767 fn from(entry: CatalogEntry) -> Self {
3768 let conn_id = entry.conn_id().cloned();
3769 let (create_sql, global_id, extra_versions) = entry.item.to_serialized();
3770
3771 TemporaryItem {
3772 id: entry.id,
3773 oid: entry.oid,
3774 global_id,
3775 schema_id: entry.name.qualifiers.schema_spec.into(),
3776 name: entry.name.item,
3777 conn_id,
3778 create_sql,
3779 owner_id: entry.owner_id,
3780 privileges: entry.privileges.into_all_values().collect(),
3781 extra_versions,
3782 }
3783 }
3784}
3785
3786impl TemporaryItem {
3787 pub fn item_type(&self) -> CatalogItemType {
3788 item_type(&self.create_sql)
3789 }
3790}
3791
3792#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
3794pub enum BootstrapStateUpdateKind {
3795 Role(durable::objects::Role),
3796 RoleAuth(durable::objects::RoleAuth),
3797 Database(durable::objects::Database),
3798 Schema(durable::objects::Schema),
3799 DefaultPrivilege(durable::objects::DefaultPrivilege),
3800 SystemPrivilege(MzAclItem),
3801 SystemConfiguration(durable::objects::SystemConfiguration),
3802 Cluster(durable::objects::Cluster),
3803 NetworkPolicy(durable::objects::NetworkPolicy),
3804 IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3805 ClusterReplica(durable::objects::ClusterReplica),
3806 SourceReferences(durable::objects::SourceReferences),
3807 SystemObjectMapping(durable::objects::SystemObjectMapping),
3808 Item(durable::objects::Item),
3809 Comment(durable::objects::Comment),
3810 AuditLog(durable::objects::AuditLog),
3811 StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3813 UnfinalizedShard(durable::objects::UnfinalizedShard),
3814}
3815
3816impl From<BootstrapStateUpdateKind> for StateUpdateKind {
3817 fn from(value: BootstrapStateUpdateKind) -> Self {
3818 match value {
3819 BootstrapStateUpdateKind::Role(kind) => StateUpdateKind::Role(kind),
3820 BootstrapStateUpdateKind::RoleAuth(kind) => StateUpdateKind::RoleAuth(kind),
3821 BootstrapStateUpdateKind::Database(kind) => StateUpdateKind::Database(kind),
3822 BootstrapStateUpdateKind::Schema(kind) => StateUpdateKind::Schema(kind),
3823 BootstrapStateUpdateKind::DefaultPrivilege(kind) => {
3824 StateUpdateKind::DefaultPrivilege(kind)
3825 }
3826 BootstrapStateUpdateKind::SystemPrivilege(kind) => {
3827 StateUpdateKind::SystemPrivilege(kind)
3828 }
3829 BootstrapStateUpdateKind::SystemConfiguration(kind) => {
3830 StateUpdateKind::SystemConfiguration(kind)
3831 }
3832 BootstrapStateUpdateKind::SourceReferences(kind) => {
3833 StateUpdateKind::SourceReferences(kind)
3834 }
3835 BootstrapStateUpdateKind::Cluster(kind) => StateUpdateKind::Cluster(kind),
3836 BootstrapStateUpdateKind::NetworkPolicy(kind) => StateUpdateKind::NetworkPolicy(kind),
3837 BootstrapStateUpdateKind::IntrospectionSourceIndex(kind) => {
3838 StateUpdateKind::IntrospectionSourceIndex(kind)
3839 }
3840 BootstrapStateUpdateKind::ClusterReplica(kind) => StateUpdateKind::ClusterReplica(kind),
3841 BootstrapStateUpdateKind::SystemObjectMapping(kind) => {
3842 StateUpdateKind::SystemObjectMapping(kind)
3843 }
3844 BootstrapStateUpdateKind::Item(kind) => StateUpdateKind::Item(kind),
3845 BootstrapStateUpdateKind::Comment(kind) => StateUpdateKind::Comment(kind),
3846 BootstrapStateUpdateKind::AuditLog(kind) => StateUpdateKind::AuditLog(kind),
3847 BootstrapStateUpdateKind::StorageCollectionMetadata(kind) => {
3848 StateUpdateKind::StorageCollectionMetadata(kind)
3849 }
3850 BootstrapStateUpdateKind::UnfinalizedShard(kind) => {
3851 StateUpdateKind::UnfinalizedShard(kind)
3852 }
3853 }
3854 }
3855}
3856
3857impl TryFrom<StateUpdateKind> for BootstrapStateUpdateKind {
3858 type Error = TemporaryItem;
3859
3860 fn try_from(value: StateUpdateKind) -> Result<Self, Self::Error> {
3861 match value {
3862 StateUpdateKind::Role(kind) => Ok(BootstrapStateUpdateKind::Role(kind)),
3863 StateUpdateKind::RoleAuth(kind) => Ok(BootstrapStateUpdateKind::RoleAuth(kind)),
3864 StateUpdateKind::Database(kind) => Ok(BootstrapStateUpdateKind::Database(kind)),
3865 StateUpdateKind::Schema(kind) => Ok(BootstrapStateUpdateKind::Schema(kind)),
3866 StateUpdateKind::DefaultPrivilege(kind) => {
3867 Ok(BootstrapStateUpdateKind::DefaultPrivilege(kind))
3868 }
3869 StateUpdateKind::SystemPrivilege(kind) => {
3870 Ok(BootstrapStateUpdateKind::SystemPrivilege(kind))
3871 }
3872 StateUpdateKind::SystemConfiguration(kind) => {
3873 Ok(BootstrapStateUpdateKind::SystemConfiguration(kind))
3874 }
3875 StateUpdateKind::Cluster(kind) => Ok(BootstrapStateUpdateKind::Cluster(kind)),
3876 StateUpdateKind::NetworkPolicy(kind) => {
3877 Ok(BootstrapStateUpdateKind::NetworkPolicy(kind))
3878 }
3879 StateUpdateKind::IntrospectionSourceIndex(kind) => {
3880 Ok(BootstrapStateUpdateKind::IntrospectionSourceIndex(kind))
3881 }
3882 StateUpdateKind::ClusterReplica(kind) => {
3883 Ok(BootstrapStateUpdateKind::ClusterReplica(kind))
3884 }
3885 StateUpdateKind::SourceReferences(kind) => {
3886 Ok(BootstrapStateUpdateKind::SourceReferences(kind))
3887 }
3888 StateUpdateKind::SystemObjectMapping(kind) => {
3889 Ok(BootstrapStateUpdateKind::SystemObjectMapping(kind))
3890 }
3891 StateUpdateKind::TemporaryItem(kind) => Err(kind),
3892 StateUpdateKind::Item(kind) => Ok(BootstrapStateUpdateKind::Item(kind)),
3893 StateUpdateKind::Comment(kind) => Ok(BootstrapStateUpdateKind::Comment(kind)),
3894 StateUpdateKind::AuditLog(kind) => Ok(BootstrapStateUpdateKind::AuditLog(kind)),
3895 StateUpdateKind::StorageCollectionMetadata(kind) => {
3896 Ok(BootstrapStateUpdateKind::StorageCollectionMetadata(kind))
3897 }
3898 StateUpdateKind::UnfinalizedShard(kind) => {
3899 Ok(BootstrapStateUpdateKind::UnfinalizedShard(kind))
3900 }
3901 }
3902 }
3903}