1use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use chrono::{DateTime, Utc};
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_compute_client::logging::LogVariant;
24use mz_compute_types::dataflows::DataflowDescription;
25use mz_compute_types::plan::Plan as ComputePlan;
26use mz_controller::clusters::{ClusterRole, ClusterStatus, ReplicaConfig, ReplicaLogging};
27use mz_controller_types::{ClusterId, ReplicaId};
28use mz_expr::{MirScalarExpr, OptimizedMirRelationExpr};
29use mz_ore::collections::CollectionExt;
30use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
31use mz_repr::network_policy_id::NetworkPolicyId;
32use mz_repr::optimize::OptimizerFeatureOverrides;
33use mz_repr::refresh_schedule::RefreshSchedule;
34use mz_repr::role_id::RoleId;
35use mz_repr::{
36 CatalogItemId, ColumnName, Diff, GlobalId, RelationDesc, RelationVersion,
37 RelationVersionSelector, SqlColumnType, Timestamp, VersionedRelationDesc,
38};
39use mz_sql::ast::display::AstDisplay;
40use mz_sql::ast::{
41 ColumnDef, ColumnOption, ColumnOptionDef, ColumnVersioned, Expr, Raw, RawDataType, Statement,
42 UnresolvedItemName, Value, WithOptionValue,
43};
44use mz_sql::catalog::{
45 CatalogClusterReplica, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem,
46 CatalogItemType as SqlCatalogItemType, CatalogItemType, CatalogSchema, CatalogType,
47 CatalogTypeDetails, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference,
48 RoleAttributes, RoleMembership, RoleVars, SystemObjectType,
49};
50use mz_sql::names::{
51 Aug, CommentObjectId, DatabaseId, DependencyIds, FullItemName, QualifiedItemName,
52 QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier,
53};
54use mz_sql::plan::{
55 AutoScalingStrategy, ClusterSchedule, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig,
56 ConnectionDetails, CreateClusterManagedPlan, CreateClusterPlan, CreateClusterVariant,
57 CreateSourcePlan, HirRelationExpr, NetworkPolicyRule, OnTimeoutAction, PlanError,
58 WebhookBodyFormat, WebhookHeaders, WebhookValidation,
59};
60use mz_sql::rbac;
61use mz_sql::session::vars::OwnedVarInput;
62use mz_storage_client::controller::IntrospectionType;
63use mz_storage_types::connections::inline::ReferencedConnection;
64use mz_storage_types::sinks::{SinkEnvelope, StorageSinkConnection};
65use mz_storage_types::sources::load_generator::LoadGenerator;
66use mz_storage_types::sources::{
67 GenericSourceConnection, SourceConnection, SourceDesc, SourceEnvelope, SourceExportDataConfig,
68 SourceExportDetails, Timeline,
69};
70use mz_transform::dataflow::DataflowMetainfo;
71use mz_transform::notice::OptimizerNotice;
72use serde::ser::SerializeSeq;
73use serde::{Deserialize, Serialize};
74use timely::progress::Antichain;
75use tracing::debug;
76
77use crate::builtin::{MZ_CATALOG_SERVER_CLUSTER, MZ_SYSTEM_CLUSTER};
78use crate::durable;
79use crate::durable::objects::item_type;
80
81pub trait UpdateFrom<T>: From<T> {
83 fn update_from(&mut self, from: T);
84}
85
86#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
87pub struct Database {
88 pub name: String,
89 pub id: DatabaseId,
90 pub oid: u32,
91 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
92 pub schemas_by_id: BTreeMap<SchemaId, Schema>,
93 pub schemas_by_name: BTreeMap<String, SchemaId>,
94 pub owner_id: RoleId,
95 pub privileges: PrivilegeMap,
96}
97
98impl From<Database> for durable::Database {
99 fn from(database: Database) -> durable::Database {
100 durable::Database {
101 id: database.id,
102 oid: database.oid,
103 name: database.name,
104 owner_id: database.owner_id,
105 privileges: database.privileges.into_all_values().collect(),
106 }
107 }
108}
109
110impl From<durable::Database> for Database {
111 fn from(
112 durable::Database {
113 id,
114 oid,
115 name,
116 owner_id,
117 privileges,
118 }: durable::Database,
119 ) -> Database {
120 Database {
121 id,
122 oid,
123 schemas_by_id: BTreeMap::new(),
124 schemas_by_name: BTreeMap::new(),
125 name,
126 owner_id,
127 privileges: PrivilegeMap::from_mz_acl_items(privileges),
128 }
129 }
130}
131
132impl UpdateFrom<durable::Database> for Database {
133 fn update_from(
134 &mut self,
135 durable::Database {
136 id,
137 oid,
138 name,
139 owner_id,
140 privileges,
141 }: durable::Database,
142 ) {
143 self.id = id;
144 self.oid = oid;
145 self.name = name;
146 self.owner_id = owner_id;
147 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
148 }
149}
150
151#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
152pub struct Schema {
153 pub name: QualifiedSchemaName,
154 pub id: SchemaSpecifier,
155 pub oid: u32,
156 pub items: BTreeMap<String, CatalogItemId>,
157 pub functions: BTreeMap<String, CatalogItemId>,
158 pub types: BTreeMap<String, CatalogItemId>,
159 pub owner_id: RoleId,
160 pub privileges: PrivilegeMap,
161}
162
163impl From<Schema> for durable::Schema {
164 fn from(schema: Schema) -> durable::Schema {
165 durable::Schema {
166 id: schema.id.into(),
167 oid: schema.oid,
168 name: schema.name.schema,
169 database_id: schema.name.database.id(),
170 owner_id: schema.owner_id,
171 privileges: schema.privileges.into_all_values().collect(),
172 }
173 }
174}
175
176impl From<durable::Schema> for Schema {
177 fn from(
178 durable::Schema {
179 id,
180 oid,
181 name,
182 database_id,
183 owner_id,
184 privileges,
185 }: durable::Schema,
186 ) -> Schema {
187 Schema {
188 name: QualifiedSchemaName {
189 database: database_id.into(),
190 schema: name,
191 },
192 id: id.into(),
193 oid,
194 items: BTreeMap::new(),
195 functions: BTreeMap::new(),
196 types: BTreeMap::new(),
197 owner_id,
198 privileges: PrivilegeMap::from_mz_acl_items(privileges),
199 }
200 }
201}
202
203impl UpdateFrom<durable::Schema> for Schema {
204 fn update_from(
205 &mut self,
206 durable::Schema {
207 id,
208 oid,
209 name,
210 database_id,
211 owner_id,
212 privileges,
213 }: durable::Schema,
214 ) {
215 self.name = QualifiedSchemaName {
216 database: database_id.into(),
217 schema: name,
218 };
219 self.id = id.into();
220 self.oid = oid;
221 self.owner_id = owner_id;
222 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
223 }
224}
225
226#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
227pub struct Role {
228 pub name: String,
229 pub id: RoleId,
230 pub oid: u32,
231 pub attributes: RoleAttributes,
232 pub membership: RoleMembership,
233 pub vars: RoleVars,
234}
235
236impl Role {
237 pub fn is_user(&self) -> bool {
238 self.id.is_user()
239 }
240
241 pub fn vars<'a>(&'a self) -> impl Iterator<Item = (&'a str, &'a OwnedVarInput)> {
242 self.vars.map.iter().map(|(name, val)| (name.as_str(), val))
243 }
244}
245
246impl From<Role> for durable::Role {
247 fn from(role: Role) -> durable::Role {
248 durable::Role {
249 id: role.id,
250 oid: role.oid,
251 name: role.name,
252 attributes: role.attributes,
253 membership: role.membership,
254 vars: role.vars,
255 }
256 }
257}
258
259impl From<durable::Role> for Role {
260 fn from(
261 durable::Role {
262 id,
263 oid,
264 name,
265 attributes,
266 membership,
267 vars,
268 }: durable::Role,
269 ) -> Self {
270 Role {
271 name,
272 id,
273 oid,
274 attributes,
275 membership,
276 vars,
277 }
278 }
279}
280
281impl UpdateFrom<durable::Role> for Role {
282 fn update_from(
283 &mut self,
284 durable::Role {
285 id,
286 oid,
287 name,
288 attributes,
289 membership,
290 vars,
291 }: durable::Role,
292 ) {
293 self.id = id;
294 self.oid = oid;
295 self.name = name;
296 self.attributes = attributes;
297 self.membership = membership;
298 self.vars = vars;
299 }
300}
301
302#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
303pub struct RoleAuth {
304 pub role_id: RoleId,
305 pub password_hash: Option<String>,
306 pub updated_at: u64,
307}
308
309impl From<RoleAuth> for durable::RoleAuth {
310 fn from(role_auth: RoleAuth) -> durable::RoleAuth {
311 durable::RoleAuth {
312 role_id: role_auth.role_id,
313 password_hash: role_auth.password_hash,
314 updated_at: role_auth.updated_at,
315 }
316 }
317}
318
319impl From<durable::RoleAuth> for RoleAuth {
320 fn from(
321 durable::RoleAuth {
322 role_id,
323 password_hash,
324 updated_at,
325 }: durable::RoleAuth,
326 ) -> RoleAuth {
327 RoleAuth {
328 role_id,
329 password_hash,
330 updated_at,
331 }
332 }
333}
334
335impl UpdateFrom<durable::RoleAuth> for RoleAuth {
336 fn update_from(&mut self, from: durable::RoleAuth) {
337 self.role_id = from.role_id;
338 self.password_hash = from.password_hash;
339 self.updated_at = from.updated_at;
340 }
341}
342
343#[derive(Debug, Serialize, Clone, PartialEq)]
344pub struct Cluster {
345 pub name: String,
346 pub id: ClusterId,
347 pub config: ClusterConfig,
348 #[serde(skip)]
349 pub log_indexes: BTreeMap<LogVariant, GlobalId>,
350 pub bound_objects: BTreeSet<CatalogItemId>,
353 pub replica_id_by_name_: BTreeMap<String, ReplicaId>,
354 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
355 pub replicas_by_id_: BTreeMap<ReplicaId, ClusterReplica>,
356 pub owner_id: RoleId,
357 pub privileges: PrivilegeMap,
358}
359
360impl Cluster {
361 pub fn role(&self) -> ClusterRole {
363 if self.name == MZ_SYSTEM_CLUSTER.name {
366 ClusterRole::SystemCritical
367 } else if self.name == MZ_CATALOG_SERVER_CLUSTER.name {
368 ClusterRole::System
369 } else {
370 ClusterRole::User
371 }
372 }
373
374 pub fn is_managed(&self) -> bool {
376 matches!(self.config.variant, ClusterVariant::Managed { .. })
377 }
378
379 pub fn user_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
381 self.replicas().filter(|r| !r.config.location.internal())
382 }
383
384 pub fn replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
386 self.replicas_by_id_.values()
387 }
388
389 pub fn replica(&self, replica_id: ReplicaId) -> Option<&ClusterReplica> {
391 self.replicas_by_id_.get(&replica_id)
392 }
393
394 pub fn replica_id(&self, name: &str) -> Option<ReplicaId> {
396 self.replica_id_by_name_.get(name).copied()
397 }
398
399 pub fn availability_zones(&self) -> Option<&[String]> {
401 match &self.config.variant {
402 ClusterVariant::Managed(managed) => Some(&managed.availability_zones),
403 ClusterVariant::Unmanaged => None,
404 }
405 }
406
407 pub fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
408 let name = self.name.clone();
409 let variant = match &self.config.variant {
410 ClusterVariant::Managed(ClusterVariantManaged {
411 size,
412 availability_zones,
413 logging,
414 replication_factor,
415 optimizer_feature_overrides,
416 schedule,
417 auto_scaling_strategy: _,
420 reconfiguration: _,
421 burst: _,
422 }) => {
423 let introspection = match logging {
424 ReplicaLogging {
425 log_logging,
426 interval: Some(interval),
427 } => Some(ComputeReplicaIntrospectionConfig {
428 debugging: *log_logging,
429 interval: interval.clone(),
430 }),
431 ReplicaLogging {
432 log_logging: _,
433 interval: None,
434 } => None,
435 };
436 let compute = ComputeReplicaConfig { introspection };
437 CreateClusterVariant::Managed(CreateClusterManagedPlan {
438 replication_factor: replication_factor.clone(),
439 size: size.clone(),
440 availability_zones: availability_zones.clone(),
441 compute,
442 optimizer_feature_overrides: optimizer_feature_overrides.clone(),
443 schedule: schedule.clone(),
444 })
445 }
446 ClusterVariant::Unmanaged => {
447 return Err(PlanError::Unsupported {
450 feature: "SHOW CREATE for unmanaged clusters".to_string(),
451 discussion_no: None,
452 });
453 }
454 };
455 let workload_class = self.config.workload_class.clone();
456 Ok(CreateClusterPlan {
457 name,
458 variant,
459 workload_class,
460 })
461 }
462}
463
464impl From<Cluster> for durable::Cluster {
465 fn from(cluster: Cluster) -> durable::Cluster {
466 durable::Cluster {
467 id: cluster.id,
468 name: cluster.name,
469 owner_id: cluster.owner_id,
470 privileges: cluster.privileges.into_all_values().collect(),
471 config: cluster.config.into(),
472 }
473 }
474}
475
476impl From<durable::Cluster> for Cluster {
477 fn from(
478 durable::Cluster {
479 id,
480 name,
481 owner_id,
482 privileges,
483 config,
484 }: durable::Cluster,
485 ) -> Self {
486 Cluster {
487 name: name.clone(),
488 id,
489 bound_objects: BTreeSet::new(),
490 log_indexes: BTreeMap::new(),
491 replica_id_by_name_: BTreeMap::new(),
492 replicas_by_id_: BTreeMap::new(),
493 owner_id,
494 privileges: PrivilegeMap::from_mz_acl_items(privileges),
495 config: config.into(),
496 }
497 }
498}
499
500impl UpdateFrom<durable::Cluster> for Cluster {
501 fn update_from(
502 &mut self,
503 durable::Cluster {
504 id,
505 name,
506 owner_id,
507 privileges,
508 config,
509 }: durable::Cluster,
510 ) {
511 self.id = id;
512 self.name = name;
513 self.owner_id = owner_id;
514 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
515 self.config = config.into();
516 }
517}
518
519#[derive(Debug, Serialize, Clone, PartialEq)]
520pub struct ClusterReplica {
521 pub name: String,
522 pub cluster_id: ClusterId,
523 pub replica_id: ReplicaId,
524 pub config: ReplicaConfig,
525 pub owner_id: RoleId,
526}
527
528impl From<ClusterReplica> for durable::ClusterReplica {
529 fn from(replica: ClusterReplica) -> durable::ClusterReplica {
530 durable::ClusterReplica {
531 cluster_id: replica.cluster_id,
532 replica_id: replica.replica_id,
533 name: replica.name,
534 config: replica.config.into(),
535 owner_id: replica.owner_id,
536 }
537 }
538}
539
540#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
541pub struct ClusterReplicaProcessStatus {
542 pub status: ClusterStatus,
543 pub time: DateTime<Utc>,
544}
545
546#[derive(Debug, Serialize, Clone, PartialEq)]
547pub struct SourceReferences {
548 pub updated_at: u64,
549 pub references: Vec<SourceReference>,
550}
551
552#[derive(Debug, Serialize, Clone, PartialEq)]
553pub struct SourceReference {
554 pub name: String,
555 pub namespace: Option<String>,
556 pub columns: Vec<String>,
557}
558
559impl From<SourceReference> for durable::SourceReference {
560 fn from(source_reference: SourceReference) -> durable::SourceReference {
561 durable::SourceReference {
562 name: source_reference.name,
563 namespace: source_reference.namespace,
564 columns: source_reference.columns,
565 }
566 }
567}
568
569impl SourceReferences {
570 pub fn to_durable(self, source_id: CatalogItemId) -> durable::SourceReferences {
571 durable::SourceReferences {
572 source_id,
573 updated_at: self.updated_at,
574 references: self.references.into_iter().map(Into::into).collect(),
575 }
576 }
577}
578
579impl From<durable::SourceReference> for SourceReference {
580 fn from(source_reference: durable::SourceReference) -> SourceReference {
581 SourceReference {
582 name: source_reference.name,
583 namespace: source_reference.namespace,
584 columns: source_reference.columns,
585 }
586 }
587}
588
589impl From<durable::SourceReferences> for SourceReferences {
590 fn from(source_references: durable::SourceReferences) -> SourceReferences {
591 SourceReferences {
592 updated_at: source_references.updated_at,
593 references: source_references
594 .references
595 .into_iter()
596 .map(|source_reference| source_reference.into())
597 .collect(),
598 }
599 }
600}
601
602impl From<mz_sql::plan::SourceReference> for SourceReference {
603 fn from(source_reference: mz_sql::plan::SourceReference) -> SourceReference {
604 SourceReference {
605 name: source_reference.name,
606 namespace: source_reference.namespace,
607 columns: source_reference.columns,
608 }
609 }
610}
611
612impl From<mz_sql::plan::SourceReferences> for SourceReferences {
613 fn from(source_references: mz_sql::plan::SourceReferences) -> SourceReferences {
614 SourceReferences {
615 updated_at: source_references.updated_at,
616 references: source_references
617 .references
618 .into_iter()
619 .map(|source_reference| source_reference.into())
620 .collect(),
621 }
622 }
623}
624
625impl From<SourceReferences> for mz_sql::plan::SourceReferences {
626 fn from(source_references: SourceReferences) -> mz_sql::plan::SourceReferences {
627 mz_sql::plan::SourceReferences {
628 updated_at: source_references.updated_at,
629 references: source_references
630 .references
631 .into_iter()
632 .map(|source_reference| source_reference.into())
633 .collect(),
634 }
635 }
636}
637
638impl From<SourceReference> for mz_sql::plan::SourceReference {
639 fn from(source_reference: SourceReference) -> mz_sql::plan::SourceReference {
640 mz_sql::plan::SourceReference {
641 name: source_reference.name,
642 namespace: source_reference.namespace,
643 columns: source_reference.columns,
644 }
645 }
646}
647
648#[derive(Clone, Debug, Serialize)]
649pub struct CatalogEntry {
650 pub item: CatalogItem,
651 #[serde(skip)]
652 pub referenced_by: Vec<CatalogItemId>,
653 #[serde(skip)]
657 pub used_by: Vec<CatalogItemId>,
658 pub id: CatalogItemId,
659 pub oid: u32,
660 pub name: QualifiedItemName,
661 pub owner_id: RoleId,
662 pub privileges: PrivilegeMap,
663}
664
665#[derive(Clone, Debug)]
680pub struct CatalogCollectionEntry {
681 pub entry: CatalogEntry,
682 pub version: RelationVersionSelector,
683}
684
685impl CatalogCollectionEntry {
686 pub fn relation_desc(&self) -> Option<Cow<'_, RelationDesc>> {
687 self.item().relation_desc(self.version)
688 }
689}
690
691impl mz_sql::catalog::CatalogCollectionItem for CatalogCollectionEntry {
692 fn relation_desc(&self) -> Option<Cow<'_, RelationDesc>> {
693 self.item().relation_desc(self.version)
694 }
695
696 fn global_id(&self) -> GlobalId {
697 self.entry
698 .item()
699 .global_id_for_version(self.version)
700 .expect("catalog corruption, missing version!")
701 }
702}
703
704impl Deref for CatalogCollectionEntry {
705 type Target = CatalogEntry;
706
707 fn deref(&self) -> &CatalogEntry {
708 &self.entry
709 }
710}
711
712impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry {
713 fn name(&self) -> &QualifiedItemName {
714 self.entry.name()
715 }
716
717 fn id(&self) -> CatalogItemId {
718 self.entry.id()
719 }
720
721 fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
722 Box::new(self.entry.global_ids())
723 }
724
725 fn oid(&self) -> u32 {
726 self.entry.oid()
727 }
728
729 fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
730 self.entry.func()
731 }
732
733 fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
734 self.entry.source_desc()
735 }
736
737 fn connection(
738 &self,
739 ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
740 {
741 mz_sql::catalog::CatalogItem::connection(&self.entry)
742 }
743
744 fn create_sql(&self) -> &str {
745 self.entry.create_sql()
746 }
747
748 fn item_type(&self) -> SqlCatalogItemType {
749 self.entry.item_type()
750 }
751
752 fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
753 self.entry.index_details()
754 }
755
756 fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
757 self.entry.writable_table_details()
758 }
759
760 fn replacement_target(&self) -> Option<CatalogItemId> {
761 self.entry.replacement_target()
762 }
763
764 fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
765 self.entry.type_details()
766 }
767
768 fn references(&self) -> &ResolvedIds {
769 self.entry.references()
770 }
771
772 fn uses(&self) -> BTreeSet<CatalogItemId> {
773 self.entry.uses()
774 }
775
776 fn referenced_by(&self) -> &[CatalogItemId] {
777 self.entry.referenced_by()
778 }
779
780 fn used_by(&self) -> &[CatalogItemId] {
781 self.entry.used_by()
782 }
783
784 fn subsource_details(
785 &self,
786 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
787 self.entry.subsource_details()
788 }
789
790 fn source_export_details(
791 &self,
792 ) -> Option<(
793 CatalogItemId,
794 &UnresolvedItemName,
795 &SourceExportDetails,
796 &SourceExportDataConfig<ReferencedConnection>,
797 )> {
798 self.entry.source_export_details()
799 }
800
801 fn is_progress_source(&self) -> bool {
802 self.entry.is_progress_source()
803 }
804
805 fn progress_id(&self) -> Option<CatalogItemId> {
806 self.entry.progress_id()
807 }
808
809 fn owner_id(&self) -> RoleId {
810 *self.entry.owner_id()
811 }
812
813 fn privileges(&self) -> &PrivilegeMap {
814 self.entry.privileges()
815 }
816
817 fn cluster_id(&self) -> Option<ClusterId> {
818 self.entry.item().cluster_id()
819 }
820
821 fn at_version(
822 &self,
823 version: RelationVersionSelector,
824 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
825 Box::new(CatalogCollectionEntry {
826 entry: self.entry.clone(),
827 version,
828 })
829 }
830
831 fn latest_version(&self) -> Option<RelationVersion> {
832 self.entry.latest_version()
833 }
834}
835
836#[derive(Debug, Clone, Serialize)]
837pub enum CatalogItem {
838 Table(Table),
839 Source(Source),
840 Log(Log),
841 View(View),
842 MaterializedView(MaterializedView),
843 Sink(Sink),
844 Index(Index),
845 Type(Type),
846 Func(Func),
847 Secret(Secret),
848 Connection(Connection),
849}
850
851impl From<CatalogEntry> for durable::Item {
852 fn from(entry: CatalogEntry) -> durable::Item {
853 let (create_sql, global_id, extra_versions) = entry.item.into_serialized();
854 durable::Item {
855 id: entry.id,
856 oid: entry.oid,
857 global_id,
858 schema_id: entry.name.qualifiers.schema_spec.into(),
859 name: entry.name.item,
860 create_sql,
861 owner_id: entry.owner_id,
862 privileges: entry.privileges.into_all_values().collect(),
863 extra_versions,
864 }
865 }
866}
867
868#[derive(Debug, Clone, Serialize)]
869pub struct Table {
870 pub create_sql: Option<String>,
872 pub desc: VersionedRelationDesc,
874 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
876 pub collections: BTreeMap<RelationVersion, GlobalId>,
877 #[serde(skip)]
879 pub conn_id: Option<ConnectionId>,
880 pub resolved_ids: ResolvedIds,
882 pub custom_logical_compaction_window: Option<CompactionWindow>,
884 pub is_retained_metrics_object: bool,
889 pub data_source: TableDataSource,
891}
892
893impl Table {
894 pub fn timeline(&self) -> Timeline {
895 match &self.data_source {
896 TableDataSource::TableWrites { .. } => Timeline::EpochMilliseconds,
899 TableDataSource::DataSource { timeline, .. } => timeline.clone(),
900 }
901 }
902
903 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
905 self.collections.values().copied()
906 }
907
908 pub fn global_id_writes(&self) -> GlobalId {
910 *self
911 .collections
912 .last_key_value()
913 .expect("at least one version of a table")
914 .1
915 }
916
917 pub fn collection_descs(
919 &self,
920 ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
921 self.collections.iter().map(|(version, gid)| {
922 let desc = self
923 .desc
924 .at_version(RelationVersionSelector::Specific(*version));
925 (*gid, *version, desc)
926 })
927 }
928
929 pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
931 let (version, _gid) = self
932 .collections
933 .iter()
934 .find(|(_version, gid)| *gid == id)
935 .expect("GlobalId to exist");
936 self.desc
937 .at_version(RelationVersionSelector::Specific(*version))
938 }
939}
940
941#[derive(Clone, Debug, Serialize)]
942pub enum TableDataSource {
943 TableWrites {
945 #[serde(skip)]
946 defaults: Vec<Expr<Aug>>,
947 },
948
949 DataSource {
952 desc: DataSourceDesc,
953 timeline: Timeline,
954 },
955}
956
957#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
958pub enum DataSourceDesc {
959 Ingestion {
961 desc: SourceDesc<ReferencedConnection>,
962 cluster_id: ClusterId,
963 },
964 OldSyntaxIngestion {
966 desc: SourceDesc<ReferencedConnection>,
967 cluster_id: ClusterId,
968 progress_subsource: CatalogItemId,
971 data_config: SourceExportDataConfig<ReferencedConnection>,
972 details: SourceExportDetails,
973 },
974 IngestionExport {
982 ingestion_id: CatalogItemId,
983 external_reference: UnresolvedItemName,
984 details: SourceExportDetails,
985 data_config: SourceExportDataConfig<ReferencedConnection>,
986 },
987 Introspection(IntrospectionType),
989 Progress,
991 Webhook {
993 validate_using: Option<WebhookValidation>,
995 body_format: WebhookBodyFormat,
997 headers: WebhookHeaders,
999 cluster_id: ClusterId,
1001 },
1002 Catalog,
1004}
1005
1006impl From<IntrospectionType> for DataSourceDesc {
1007 fn from(typ: IntrospectionType) -> Self {
1008 Self::Introspection(typ)
1009 }
1010}
1011
1012impl DataSourceDesc {
1013 pub fn formats(&self) -> (Option<&str>, Option<&str>) {
1015 match &self {
1016 DataSourceDesc::Ingestion { .. } => (None, None),
1017 DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1018 match &data_config.encoding.as_ref() {
1019 Some(encoding) => match &encoding.key {
1020 Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1021 None => (None, Some(encoding.value.type_())),
1022 },
1023 None => (None, None),
1024 }
1025 }
1026 DataSourceDesc::IngestionExport { data_config, .. } => match &data_config.encoding {
1027 Some(encoding) => match &encoding.key {
1028 Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1029 None => (None, Some(encoding.value.type_())),
1030 },
1031 None => (None, None),
1032 },
1033 DataSourceDesc::Introspection(_)
1034 | DataSourceDesc::Webhook { .. }
1035 | DataSourceDesc::Progress
1036 | DataSourceDesc::Catalog => (None, None),
1037 }
1038 }
1039
1040 pub fn envelope(&self) -> Option<&str> {
1042 fn envelope_string(envelope: &SourceEnvelope) -> &str {
1047 match envelope {
1048 SourceEnvelope::None(_) => "none",
1049 SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
1050 mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert",
1051 mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
1052 "debezium"
1056 }
1057 mz_storage_types::sources::envelope::UpsertStyle::ValueErrInline { .. } => {
1058 "upsert-value-err-inline"
1059 }
1060 },
1061 SourceEnvelope::CdcV2 => {
1062 "materialize"
1065 }
1066 }
1067 }
1068
1069 match self {
1070 DataSourceDesc::Ingestion { .. } => None,
1075 DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1076 Some(envelope_string(&data_config.envelope))
1077 }
1078 DataSourceDesc::IngestionExport { data_config, .. } => {
1079 Some(envelope_string(&data_config.envelope))
1080 }
1081 DataSourceDesc::Introspection(_)
1082 | DataSourceDesc::Webhook { .. }
1083 | DataSourceDesc::Progress
1084 | DataSourceDesc::Catalog => None,
1085 }
1086 }
1087}
1088
1089#[derive(Debug, Clone, Serialize)]
1090pub struct Source {
1091 pub create_sql: Option<String>,
1093 pub global_id: GlobalId,
1095 #[serde(skip)]
1097 pub data_source: DataSourceDesc,
1098 pub desc: RelationDesc,
1100 pub timeline: Timeline,
1102 pub resolved_ids: ResolvedIds,
1104 pub custom_logical_compaction_window: Option<CompactionWindow>,
1108 pub is_retained_metrics_object: bool,
1111}
1112
1113impl Source {
1114 pub fn new(
1121 plan: CreateSourcePlan,
1122 global_id: GlobalId,
1123 resolved_ids: ResolvedIds,
1124 custom_logical_compaction_window: Option<CompactionWindow>,
1125 is_retained_metrics_object: bool,
1126 ) -> Source {
1127 Source {
1128 create_sql: Some(plan.source.create_sql),
1129 data_source: match plan.source.data_source {
1130 mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1131 desc,
1132 cluster_id: plan
1133 .in_cluster
1134 .expect("ingestion-based sources must be given a cluster ID"),
1135 },
1136 mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1137 desc,
1138 progress_subsource,
1139 data_config,
1140 details,
1141 } => DataSourceDesc::OldSyntaxIngestion {
1142 desc,
1143 cluster_id: plan
1144 .in_cluster
1145 .expect("ingestion-based sources must be given a cluster ID"),
1146 progress_subsource,
1147 data_config,
1148 details,
1149 },
1150 mz_sql::plan::DataSourceDesc::Progress => {
1151 assert!(
1152 plan.in_cluster.is_none(),
1153 "subsources must not have a host config or cluster_id defined"
1154 );
1155 DataSourceDesc::Progress
1156 }
1157 mz_sql::plan::DataSourceDesc::IngestionExport {
1158 ingestion_id,
1159 external_reference,
1160 details,
1161 data_config,
1162 } => {
1163 assert!(
1164 plan.in_cluster.is_none(),
1165 "subsources must not have a host config or cluster_id defined"
1166 );
1167 DataSourceDesc::IngestionExport {
1168 ingestion_id,
1169 external_reference,
1170 details,
1171 data_config,
1172 }
1173 }
1174 mz_sql::plan::DataSourceDesc::Webhook {
1175 validate_using,
1176 body_format,
1177 headers,
1178 cluster_id,
1179 } => {
1180 mz_ore::soft_assert_or_log!(
1181 cluster_id.is_none(),
1182 "cluster_id set at Source level for Webhooks"
1183 );
1184 DataSourceDesc::Webhook {
1185 validate_using,
1186 body_format,
1187 headers,
1188 cluster_id: plan
1189 .in_cluster
1190 .expect("webhook sources must be given a cluster ID"),
1191 }
1192 }
1193 },
1194 desc: plan.source.desc,
1195 global_id,
1196 timeline: plan.timeline,
1197 resolved_ids,
1198 custom_logical_compaction_window: plan
1199 .source
1200 .compaction_window
1201 .or(custom_logical_compaction_window),
1202 is_retained_metrics_object,
1203 }
1204 }
1205
1206 pub fn source_type(&self) -> &str {
1208 match &self.data_source {
1209 DataSourceDesc::Ingestion { desc, .. }
1210 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.name(),
1211 DataSourceDesc::Progress => "progress",
1212 DataSourceDesc::IngestionExport { .. } => "subsource",
1213 DataSourceDesc::Introspection(_) | DataSourceDesc::Catalog => "source",
1214 DataSourceDesc::Webhook { .. } => "webhook",
1215 }
1216 }
1217
1218 pub fn connection_id(&self) -> Option<CatalogItemId> {
1220 match &self.data_source {
1221 DataSourceDesc::Ingestion { desc, .. }
1222 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.connection_id(),
1223 DataSourceDesc::IngestionExport { .. }
1224 | DataSourceDesc::Introspection(_)
1225 | DataSourceDesc::Webhook { .. }
1226 | DataSourceDesc::Progress
1227 | DataSourceDesc::Catalog => None,
1228 }
1229 }
1230
1231 pub fn global_id(&self) -> GlobalId {
1233 self.global_id
1234 }
1235
1236 pub fn user_controllable_persist_shard_count(&self) -> i64 {
1244 match &self.data_source {
1245 DataSourceDesc::Ingestion { .. } => 0,
1246 DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1247 match &desc.connection {
1248 GenericSourceConnection::Postgres(_)
1252 | GenericSourceConnection::MySql(_)
1253 | GenericSourceConnection::SqlServer(_) => 0,
1254 GenericSourceConnection::LoadGenerator(lg) => match lg.load_generator {
1255 LoadGenerator::Clock
1257 | LoadGenerator::Counter { .. }
1258 | LoadGenerator::Datums
1259 | LoadGenerator::KeyValue(_) => 1,
1260 LoadGenerator::Auction
1261 | LoadGenerator::Marketing
1262 | LoadGenerator::Tpch { .. } => 0,
1263 },
1264 GenericSourceConnection::Kafka(_) => 1,
1265 }
1266 }
1267 DataSourceDesc::IngestionExport { .. } => 1,
1270 DataSourceDesc::Webhook { .. } => 1,
1271 DataSourceDesc::Introspection(_)
1274 | DataSourceDesc::Progress
1275 | DataSourceDesc::Catalog => 0,
1276 }
1277 }
1278}
1279
1280#[derive(Debug, Clone, Serialize)]
1281pub struct Log {
1282 pub variant: LogVariant,
1284 pub global_id: GlobalId,
1286}
1287
1288impl Log {
1289 pub fn global_id(&self) -> GlobalId {
1291 self.global_id
1292 }
1293}
1294
1295#[derive(Debug, Clone, Serialize)]
1296pub struct Sink {
1297 pub create_sql: String,
1299 pub global_id: GlobalId,
1301 pub from: GlobalId,
1303 pub connection: StorageSinkConnection<ReferencedConnection>,
1305 pub envelope: SinkEnvelope,
1309 pub with_snapshot: bool,
1311 pub version: u64,
1313 pub resolved_ids: ResolvedIds,
1315 pub cluster_id: ClusterId,
1317 pub commit_interval: Option<Duration>,
1319}
1320
1321impl Sink {
1322 pub fn sink_type(&self) -> &str {
1323 self.connection.name()
1324 }
1325
1326 pub fn envelope(&self) -> Option<&str> {
1328 match &self.envelope {
1329 SinkEnvelope::Debezium => Some("debezium"),
1330 SinkEnvelope::Upsert => Some("upsert"),
1331 SinkEnvelope::Append => Some("append"),
1332 }
1333 }
1334
1335 pub fn combined_format(&self) -> Option<Cow<'_, str>> {
1340 match &self.connection {
1341 StorageSinkConnection::Kafka(connection) => Some(connection.format.get_format_name()),
1342 StorageSinkConnection::Iceberg(_) => None,
1343 }
1344 }
1345
1346 pub fn formats(&self) -> Option<(Option<&str>, &str)> {
1348 match &self.connection {
1349 StorageSinkConnection::Kafka(connection) => {
1350 let key_format = connection
1351 .format
1352 .key_format
1353 .as_ref()
1354 .map(|f| f.get_format_name());
1355 let value_format = connection.format.value_format.get_format_name();
1356 Some((key_format, value_format))
1357 }
1358 StorageSinkConnection::Iceberg(_) => None,
1359 }
1360 }
1361
1362 pub fn connection_id(&self) -> Option<CatalogItemId> {
1363 self.connection.connection_id()
1364 }
1365
1366 pub fn global_id(&self) -> GlobalId {
1368 self.global_id
1369 }
1370}
1371
1372#[derive(Debug, Clone, Serialize)]
1373pub struct View {
1374 pub create_sql: String,
1376 pub global_id: GlobalId,
1378 pub raw_expr: Arc<HirRelationExpr>,
1380 pub locally_optimized_expr: Arc<OptimizedMirRelationExpr>,
1382 pub desc: RelationDesc,
1384 pub conn_id: Option<ConnectionId>,
1386 pub resolved_ids: ResolvedIds,
1388 pub dependencies: DependencyIds,
1390}
1391
1392impl View {
1393 pub fn global_id(&self) -> GlobalId {
1395 self.global_id
1396 }
1397}
1398
1399#[derive(Debug, Clone, Serialize)]
1400pub struct MaterializedView {
1401 pub create_sql: String,
1403 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
1405 pub collections: BTreeMap<RelationVersion, GlobalId>,
1406 pub raw_expr: Arc<HirRelationExpr>,
1408 pub locally_optimized_expr: Arc<OptimizedMirRelationExpr>,
1410 pub desc: VersionedRelationDesc,
1412 pub resolved_ids: ResolvedIds,
1414 pub dependencies: DependencyIds,
1416 pub replacement_target: Option<CatalogItemId>,
1418 pub cluster_id: ClusterId,
1420 pub target_replica: Option<ReplicaId>,
1422 pub non_null_assertions: Vec<usize>,
1426 pub custom_logical_compaction_window: Option<CompactionWindow>,
1428 pub refresh_schedule: Option<RefreshSchedule>,
1430 pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1435 #[serde(skip)]
1441 pub optimized_plan: Option<Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
1442 #[serde(skip)]
1444 pub physical_plan: Option<Arc<DataflowDescription<ComputePlan>>>,
1445 #[serde(skip)]
1447 pub dataflow_metainfo: Option<DataflowMetainfo<Arc<OptimizerNotice>>>,
1448}
1449
1450impl MaterializedView {
1451 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1453 self.collections.values().copied()
1454 }
1455
1456 pub fn global_id_writes(&self) -> GlobalId {
1459 *self
1460 .collections
1461 .last_key_value()
1462 .expect("at least one version of a materialized view")
1463 .1
1464 }
1465
1466 pub fn collection_descs(
1468 &self,
1469 ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
1470 self.collections.iter().map(|(version, gid)| {
1471 let desc = self
1472 .desc
1473 .at_version(RelationVersionSelector::Specific(*version));
1474 (*gid, *version, desc)
1475 })
1476 }
1477
1478 pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
1480 let (version, _gid) = self
1481 .collections
1482 .iter()
1483 .find(|(_version, gid)| *gid == id)
1484 .expect("GlobalId to exist");
1485 self.desc
1486 .at_version(RelationVersionSelector::Specific(*version))
1487 }
1488
1489 pub fn apply_replacement(&mut self, replacement: Self) {
1491 let target_id = replacement
1492 .replacement_target
1493 .expect("replacement has target");
1494
1495 fn parse(create_sql: &str) -> mz_sql::ast::CreateMaterializedViewStatement<Raw> {
1496 let res = mz_sql::parse::parse(create_sql).unwrap_or_else(|e| {
1497 panic!("invalid create_sql persisted in catalog: {e}\n{create_sql}");
1498 });
1499 if let Statement::CreateMaterializedView(cmvs) = res.into_element().ast {
1500 cmvs
1501 } else {
1502 panic!("invalid MV create_sql persisted in catalog\n{create_sql}");
1503 }
1504 }
1505
1506 let old_stmt = parse(&self.create_sql);
1507 let rpl_stmt = parse(&replacement.create_sql);
1508 let new_stmt = mz_sql::ast::CreateMaterializedViewStatement {
1509 if_exists: old_stmt.if_exists,
1510 name: old_stmt.name,
1511 columns: rpl_stmt.columns,
1512 replacement_for: None,
1513 in_cluster: rpl_stmt.in_cluster,
1514 in_cluster_replica: rpl_stmt.in_cluster_replica,
1515 query: rpl_stmt.query,
1516 as_of: rpl_stmt.as_of,
1517 with_options: rpl_stmt.with_options,
1518 };
1519 let create_sql = new_stmt.to_ast_string_stable();
1520
1521 let mut collections = std::mem::take(&mut self.collections);
1522 let latest_version = collections.keys().max().expect("at least one version");
1526 let new_version = latest_version.bump();
1527 collections.insert(new_version, replacement.global_id_writes());
1528
1529 let mut resolved_ids = replacement.resolved_ids;
1530 resolved_ids.remove_item(&target_id);
1531 let mut dependencies = replacement.dependencies;
1532 dependencies.0.remove(&target_id);
1533
1534 *self = Self {
1535 create_sql,
1536 collections,
1537 raw_expr: replacement.raw_expr,
1538 locally_optimized_expr: replacement.locally_optimized_expr,
1539 desc: replacement.desc,
1540 resolved_ids,
1541 dependencies,
1542 replacement_target: None,
1543 cluster_id: replacement.cluster_id,
1544 target_replica: replacement.target_replica,
1545 non_null_assertions: replacement.non_null_assertions,
1546 custom_logical_compaction_window: replacement.custom_logical_compaction_window,
1547 refresh_schedule: replacement.refresh_schedule,
1548 initial_as_of: replacement.initial_as_of,
1549 optimized_plan: replacement.optimized_plan,
1550 physical_plan: replacement.physical_plan,
1551 dataflow_metainfo: replacement.dataflow_metainfo,
1552 };
1553 }
1554}
1555
1556#[derive(Debug, Clone, Serialize)]
1557pub struct Index {
1558 pub create_sql: String,
1560 pub global_id: GlobalId,
1562 pub on: GlobalId,
1564 pub keys: Arc<[MirScalarExpr]>,
1566 pub conn_id: Option<ConnectionId>,
1568 pub resolved_ids: ResolvedIds,
1570 pub cluster_id: ClusterId,
1572 pub custom_logical_compaction_window: Option<CompactionWindow>,
1574 pub is_retained_metrics_object: bool,
1579 #[serde(skip)]
1585 pub optimized_plan: Option<Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
1586 #[serde(skip)]
1588 pub physical_plan: Option<Arc<DataflowDescription<ComputePlan>>>,
1589 #[serde(skip)]
1591 pub dataflow_metainfo: Option<DataflowMetainfo<Arc<OptimizerNotice>>>,
1592}
1593
1594impl Index {
1595 pub fn global_id(&self) -> GlobalId {
1597 self.global_id
1598 }
1599}
1600
1601#[derive(Debug, Clone, Serialize)]
1602pub struct Type {
1603 pub create_sql: Option<String>,
1605 pub global_id: GlobalId,
1607 #[serde(skip)]
1608 pub details: CatalogTypeDetails<IdReference>,
1609 pub resolved_ids: ResolvedIds,
1611}
1612
1613#[derive(Debug, Clone, Serialize)]
1614pub struct Func {
1615 #[serde(skip)]
1617 pub inner: &'static mz_sql::func::Func,
1618 pub global_id: GlobalId,
1620}
1621
1622#[derive(Debug, Clone, Serialize)]
1623pub struct Secret {
1624 pub create_sql: String,
1626 pub global_id: GlobalId,
1628}
1629
1630#[derive(Debug, Clone, Serialize)]
1631pub struct Connection {
1632 pub create_sql: String,
1634 pub global_id: GlobalId,
1636 pub details: ConnectionDetails,
1638 pub resolved_ids: ResolvedIds,
1640}
1641
1642impl Connection {
1643 pub fn global_id(&self) -> GlobalId {
1645 self.global_id
1646 }
1647}
1648
1649#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
1650pub struct NetworkPolicy {
1651 pub name: String,
1652 pub id: NetworkPolicyId,
1653 pub oid: u32,
1654 pub rules: Vec<NetworkPolicyRule>,
1655 pub owner_id: RoleId,
1656 pub privileges: PrivilegeMap,
1657}
1658
1659impl From<NetworkPolicy> for durable::NetworkPolicy {
1660 fn from(policy: NetworkPolicy) -> durable::NetworkPolicy {
1661 durable::NetworkPolicy {
1662 id: policy.id,
1663 oid: policy.oid,
1664 name: policy.name,
1665 rules: policy.rules,
1666 owner_id: policy.owner_id,
1667 privileges: policy.privileges.into_all_values().collect(),
1668 }
1669 }
1670}
1671
1672impl From<durable::NetworkPolicy> for NetworkPolicy {
1673 fn from(
1674 durable::NetworkPolicy {
1675 id,
1676 oid,
1677 name,
1678 rules,
1679 owner_id,
1680 privileges,
1681 }: durable::NetworkPolicy,
1682 ) -> Self {
1683 NetworkPolicy {
1684 id,
1685 oid,
1686 name,
1687 rules,
1688 owner_id,
1689 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1690 }
1691 }
1692}
1693
1694impl UpdateFrom<durable::NetworkPolicy> for NetworkPolicy {
1695 fn update_from(
1696 &mut self,
1697 durable::NetworkPolicy {
1698 id,
1699 oid,
1700 name,
1701 rules,
1702 owner_id,
1703 privileges,
1704 }: durable::NetworkPolicy,
1705 ) {
1706 self.id = id;
1707 self.oid = oid;
1708 self.name = name;
1709 self.rules = rules;
1710 self.owner_id = owner_id;
1711 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1712 }
1713}
1714
1715impl CatalogItem {
1716 pub fn typ(&self) -> mz_sql::catalog::CatalogItemType {
1718 match self {
1719 CatalogItem::Table(_) => CatalogItemType::Table,
1720 CatalogItem::Source(_) => CatalogItemType::Source,
1721 CatalogItem::Log(_) => CatalogItemType::Source,
1722 CatalogItem::Sink(_) => CatalogItemType::Sink,
1723 CatalogItem::View(_) => CatalogItemType::View,
1724 CatalogItem::MaterializedView(_) => CatalogItemType::MaterializedView,
1725 CatalogItem::Index(_) => CatalogItemType::Index,
1726 CatalogItem::Type(_) => CatalogItemType::Type,
1727 CatalogItem::Func(_) => CatalogItemType::Func,
1728 CatalogItem::Secret(_) => CatalogItemType::Secret,
1729 CatalogItem::Connection(_) => CatalogItemType::Connection,
1730 }
1731 }
1732
1733 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1735 let gid = match self {
1736 CatalogItem::Source(source) => source.global_id,
1737 CatalogItem::Log(log) => log.global_id,
1738 CatalogItem::Sink(sink) => sink.global_id,
1739 CatalogItem::View(view) => view.global_id,
1740 CatalogItem::MaterializedView(mv) => {
1741 return itertools::Either::Left(mv.collections.values().copied());
1742 }
1743 CatalogItem::Index(index) => index.global_id,
1744 CatalogItem::Func(func) => func.global_id,
1745 CatalogItem::Type(ty) => ty.global_id,
1746 CatalogItem::Secret(secret) => secret.global_id,
1747 CatalogItem::Connection(conn) => conn.global_id,
1748 CatalogItem::Table(table) => {
1749 return itertools::Either::Left(table.collections.values().copied());
1750 }
1751 };
1752 itertools::Either::Right(std::iter::once(gid))
1753 }
1754
1755 pub fn latest_global_id(&self) -> GlobalId {
1759 match self {
1760 CatalogItem::Source(source) => source.global_id,
1761 CatalogItem::Log(log) => log.global_id,
1762 CatalogItem::Sink(sink) => sink.global_id,
1763 CatalogItem::View(view) => view.global_id,
1764 CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
1765 CatalogItem::Index(index) => index.global_id,
1766 CatalogItem::Func(func) => func.global_id,
1767 CatalogItem::Type(ty) => ty.global_id,
1768 CatalogItem::Secret(secret) => secret.global_id,
1769 CatalogItem::Connection(conn) => conn.global_id,
1770 CatalogItem::Table(table) => table.global_id_writes(),
1771 }
1772 }
1773
1774 pub fn optimized_plan(&self) -> Option<&Arc<DataflowDescription<OptimizedMirRelationExpr>>> {
1776 match self {
1777 CatalogItem::Index(idx) => idx.optimized_plan.as_ref(),
1778 CatalogItem::MaterializedView(mv) => mv.optimized_plan.as_ref(),
1779 _ => None,
1780 }
1781 }
1782
1783 pub fn physical_plan(&self) -> Option<&Arc<DataflowDescription<ComputePlan>>> {
1785 match self {
1786 CatalogItem::Index(idx) => idx.physical_plan.as_ref(),
1787 CatalogItem::MaterializedView(mv) => mv.physical_plan.as_ref(),
1788 _ => None,
1789 }
1790 }
1791
1792 pub fn dataflow_metainfo(&self) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
1794 match self {
1795 CatalogItem::Index(idx) => idx.dataflow_metainfo.as_ref(),
1796 CatalogItem::MaterializedView(mv) => mv.dataflow_metainfo.as_ref(),
1797 _ => None,
1798 }
1799 }
1800
1801 pub fn plan_fields_mut(
1806 &mut self,
1807 ) -> Option<(
1808 &mut Option<Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
1809 &mut Option<Arc<DataflowDescription<ComputePlan>>>,
1810 &mut Option<DataflowMetainfo<Arc<OptimizerNotice>>>,
1811 )> {
1812 match self {
1813 CatalogItem::Index(idx) => Some((
1814 &mut idx.optimized_plan,
1815 &mut idx.physical_plan,
1816 &mut idx.dataflow_metainfo,
1817 )),
1818 CatalogItem::MaterializedView(mv) => Some((
1819 &mut mv.optimized_plan,
1820 &mut mv.physical_plan,
1821 &mut mv.dataflow_metainfo,
1822 )),
1823 _ => None,
1824 }
1825 }
1826
1827 pub fn is_storage_collection(&self) -> bool {
1829 match self {
1830 CatalogItem::Table(_)
1831 | CatalogItem::Source(_)
1832 | CatalogItem::MaterializedView(_)
1833 | CatalogItem::Sink(_) => true,
1834 CatalogItem::Log(_)
1835 | CatalogItem::View(_)
1836 | CatalogItem::Index(_)
1837 | CatalogItem::Type(_)
1838 | CatalogItem::Func(_)
1839 | CatalogItem::Secret(_)
1840 | CatalogItem::Connection(_) => false,
1841 }
1842 }
1843
1844 pub fn relation_desc(&self, version: RelationVersionSelector) -> Option<Cow<'_, RelationDesc>> {
1853 match &self {
1854 CatalogItem::Source(src) => Some(Cow::Borrowed(&src.desc)),
1855 CatalogItem::Log(log) => Some(Cow::Owned(log.variant.desc())),
1856 CatalogItem::Table(tbl) => Some(Cow::Owned(tbl.desc.at_version(version))),
1857 CatalogItem::View(view) => Some(Cow::Borrowed(&view.desc)),
1858 CatalogItem::MaterializedView(mview) => {
1859 Some(Cow::Owned(mview.desc.at_version(version)))
1860 }
1861 CatalogItem::Func(_)
1862 | CatalogItem::Index(_)
1863 | CatalogItem::Sink(_)
1864 | CatalogItem::Secret(_)
1865 | CatalogItem::Connection(_)
1866 | CatalogItem::Type(_) => None,
1867 }
1868 }
1869
1870 pub fn func(
1871 &self,
1872 entry: &CatalogEntry,
1873 ) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
1874 match &self {
1875 CatalogItem::Func(func) => Ok(func.inner),
1876 _ => Err(SqlCatalogError::UnexpectedType {
1877 name: entry.name().item.to_string(),
1878 actual_type: entry.item_type(),
1879 expected_type: CatalogItemType::Func,
1880 }),
1881 }
1882 }
1883
1884 pub fn source_desc(
1885 &self,
1886 entry: &CatalogEntry,
1887 ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
1888 match &self {
1889 CatalogItem::Source(source) => match &source.data_source {
1890 DataSourceDesc::Ingestion { desc, .. }
1891 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => Ok(Some(desc)),
1892 DataSourceDesc::IngestionExport { .. }
1893 | DataSourceDesc::Introspection(_)
1894 | DataSourceDesc::Webhook { .. }
1895 | DataSourceDesc::Progress
1896 | DataSourceDesc::Catalog => Ok(None),
1897 },
1898 _ => Err(SqlCatalogError::UnexpectedType {
1899 name: entry.name().item.to_string(),
1900 actual_type: entry.item_type(),
1901 expected_type: CatalogItemType::Source,
1902 }),
1903 }
1904 }
1905
1906 pub fn is_progress_source(&self) -> bool {
1908 matches!(
1909 self,
1910 CatalogItem::Source(Source {
1911 data_source: DataSourceDesc::Progress,
1912 ..
1913 })
1914 )
1915 }
1916
1917 pub fn references(&self) -> &ResolvedIds {
1920 static EMPTY: LazyLock<ResolvedIds> = LazyLock::new(ResolvedIds::empty);
1921 match self {
1922 CatalogItem::Func(_) => &*EMPTY,
1923 CatalogItem::Index(idx) => &idx.resolved_ids,
1924 CatalogItem::Sink(sink) => &sink.resolved_ids,
1925 CatalogItem::Source(source) => &source.resolved_ids,
1926 CatalogItem::Log(_) => &*EMPTY,
1927 CatalogItem::Table(table) => &table.resolved_ids,
1928 CatalogItem::Type(typ) => &typ.resolved_ids,
1929 CatalogItem::View(view) => &view.resolved_ids,
1930 CatalogItem::MaterializedView(mview) => &mview.resolved_ids,
1931 CatalogItem::Secret(_) => &*EMPTY,
1932 CatalogItem::Connection(connection) => &connection.resolved_ids,
1933 }
1934 }
1935
1936 pub fn uses(&self) -> BTreeSet<CatalogItemId> {
1942 let mut uses: BTreeSet<_> = self.references().items().copied().collect();
1943 match self {
1944 CatalogItem::Func(_) => {}
1947 CatalogItem::Index(_) => {}
1948 CatalogItem::Sink(_) => {}
1949 CatalogItem::Source(_) => {}
1950 CatalogItem::Log(_) => {}
1951 CatalogItem::Table(_) => {}
1952 CatalogItem::Type(_) => {}
1953 CatalogItem::View(view) => uses.extend(view.dependencies.0.iter().copied()),
1954 CatalogItem::MaterializedView(mview) => {
1955 uses.extend(mview.dependencies.0.iter().copied())
1956 }
1957 CatalogItem::Secret(_) => {}
1958 CatalogItem::Connection(_) => {}
1959 }
1960 uses
1961 }
1962
1963 pub fn conn_id(&self) -> Option<&ConnectionId> {
1966 match self {
1967 CatalogItem::View(view) => view.conn_id.as_ref(),
1968 CatalogItem::Index(index) => index.conn_id.as_ref(),
1969 CatalogItem::Table(table) => table.conn_id.as_ref(),
1970 CatalogItem::Log(_)
1971 | CatalogItem::Source(_)
1972 | CatalogItem::Sink(_)
1973 | CatalogItem::MaterializedView(_)
1974 | CatalogItem::Secret(_)
1975 | CatalogItem::Type(_)
1976 | CatalogItem::Func(_)
1977 | CatalogItem::Connection(_) => None,
1978 }
1979 }
1980
1981 pub fn set_conn_id(&mut self, conn_id: Option<ConnectionId>) {
1984 match self {
1985 CatalogItem::View(view) => view.conn_id = conn_id,
1986 CatalogItem::Index(index) => index.conn_id = conn_id,
1987 CatalogItem::Table(table) => table.conn_id = conn_id,
1988 CatalogItem::Log(_)
1989 | CatalogItem::Source(_)
1990 | CatalogItem::Sink(_)
1991 | CatalogItem::MaterializedView(_)
1992 | CatalogItem::Secret(_)
1993 | CatalogItem::Type(_)
1994 | CatalogItem::Func(_)
1995 | CatalogItem::Connection(_) => (),
1996 }
1997 }
1998
1999 pub fn is_temporary(&self) -> bool {
2001 self.conn_id().is_some()
2002 }
2003
2004 pub fn rename_schema_refs(
2005 &self,
2006 database_name: &str,
2007 cur_schema_name: &str,
2008 new_schema_name: &str,
2009 ) -> Result<CatalogItem, (String, String)> {
2010 let do_rewrite = |create_sql: String| -> Result<String, (String, String)> {
2011 let mut create_stmt = mz_sql::parse::parse(&create_sql)
2012 .expect("invalid create sql persisted to catalog")
2013 .into_element()
2014 .ast;
2015
2016 mz_sql::ast::transform::create_stmt_rename_schema_refs(
2018 &mut create_stmt,
2019 database_name,
2020 cur_schema_name,
2021 new_schema_name,
2022 )?;
2023
2024 Ok(create_stmt.to_ast_string_stable())
2025 };
2026
2027 match self {
2028 CatalogItem::Table(i) => {
2029 let mut i = i.clone();
2030 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2031 Ok(CatalogItem::Table(i))
2032 }
2033 CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
2034 CatalogItem::Source(i) => {
2035 let mut i = i.clone();
2036 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2037 Ok(CatalogItem::Source(i))
2038 }
2039 CatalogItem::Sink(i) => {
2040 let mut i = i.clone();
2041 i.create_sql = do_rewrite(i.create_sql)?;
2042 Ok(CatalogItem::Sink(i))
2043 }
2044 CatalogItem::View(i) => {
2045 let mut i = i.clone();
2046 i.create_sql = do_rewrite(i.create_sql)?;
2047 Ok(CatalogItem::View(i))
2048 }
2049 CatalogItem::MaterializedView(i) => {
2050 let mut i = i.clone();
2051 i.create_sql = do_rewrite(i.create_sql)?;
2052 Ok(CatalogItem::MaterializedView(i))
2053 }
2054 CatalogItem::Index(i) => {
2055 let mut i = i.clone();
2056 i.create_sql = do_rewrite(i.create_sql)?;
2057 Ok(CatalogItem::Index(i))
2058 }
2059 CatalogItem::Secret(i) => {
2060 let mut i = i.clone();
2061 i.create_sql = do_rewrite(i.create_sql)?;
2062 Ok(CatalogItem::Secret(i))
2063 }
2064 CatalogItem::Connection(i) => {
2065 let mut i = i.clone();
2066 i.create_sql = do_rewrite(i.create_sql)?;
2067 Ok(CatalogItem::Connection(i))
2068 }
2069 CatalogItem::Type(i) => {
2070 let mut i = i.clone();
2071 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2072 Ok(CatalogItem::Type(i))
2073 }
2074 CatalogItem::Func(i) => Ok(CatalogItem::Func(i.clone())),
2075 }
2076 }
2077
2078 pub fn rename_item_refs(
2082 &self,
2083 from: FullItemName,
2084 to_item_name: String,
2085 rename_self: bool,
2086 ) -> Result<CatalogItem, String> {
2087 let do_rewrite = |create_sql: String| -> Result<String, String> {
2088 let mut create_stmt = mz_sql::parse::parse(&create_sql)
2089 .expect("invalid create sql persisted to catalog")
2090 .into_element()
2091 .ast;
2092 if rename_self {
2093 mz_sql::ast::transform::create_stmt_rename(&mut create_stmt, to_item_name.clone());
2094 }
2095 mz_sql::ast::transform::create_stmt_rename_refs(&mut create_stmt, from, to_item_name)?;
2097 Ok(create_stmt.to_ast_string_stable())
2098 };
2099
2100 match self {
2101 CatalogItem::Table(i) => {
2102 let mut i = i.clone();
2103 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2104 Ok(CatalogItem::Table(i))
2105 }
2106 CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
2107 CatalogItem::Source(i) => {
2108 let mut i = i.clone();
2109 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2110 Ok(CatalogItem::Source(i))
2111 }
2112 CatalogItem::Sink(i) => {
2113 let mut i = i.clone();
2114 i.create_sql = do_rewrite(i.create_sql)?;
2115 Ok(CatalogItem::Sink(i))
2116 }
2117 CatalogItem::View(i) => {
2118 let mut i = i.clone();
2119 i.create_sql = do_rewrite(i.create_sql)?;
2120 Ok(CatalogItem::View(i))
2121 }
2122 CatalogItem::MaterializedView(i) => {
2123 let mut i = i.clone();
2124 i.create_sql = do_rewrite(i.create_sql)?;
2125 Ok(CatalogItem::MaterializedView(i))
2126 }
2127 CatalogItem::Index(i) => {
2128 let mut i = i.clone();
2129 i.create_sql = do_rewrite(i.create_sql)?;
2130 Ok(CatalogItem::Index(i))
2131 }
2132 CatalogItem::Secret(i) => {
2133 let mut i = i.clone();
2134 i.create_sql = do_rewrite(i.create_sql)?;
2135 Ok(CatalogItem::Secret(i))
2136 }
2137 CatalogItem::Func(_) | CatalogItem::Type(_) => {
2138 unreachable!("{}s cannot be renamed", self.typ())
2139 }
2140 CatalogItem::Connection(i) => {
2141 let mut i = i.clone();
2142 i.create_sql = do_rewrite(i.create_sql)?;
2143 Ok(CatalogItem::Connection(i))
2144 }
2145 }
2146 }
2147
2148 pub fn replace_item_refs(&self, old_id: CatalogItemId, new_id: CatalogItemId) -> CatalogItem {
2150 let do_rewrite = |create_sql: String| -> String {
2151 let mut create_stmt = mz_sql::parse::parse(&create_sql)
2152 .expect("invalid create sql persisted to catalog")
2153 .into_element()
2154 .ast;
2155 mz_sql::ast::transform::create_stmt_replace_ids(
2156 &mut create_stmt,
2157 &[(old_id, new_id)].into(),
2158 );
2159 create_stmt.to_ast_string_stable()
2160 };
2161
2162 match self {
2163 CatalogItem::Table(i) => {
2164 let mut i = i.clone();
2165 i.create_sql = i.create_sql.map(do_rewrite);
2166 CatalogItem::Table(i)
2167 }
2168 CatalogItem::Log(i) => CatalogItem::Log(i.clone()),
2169 CatalogItem::Source(i) => {
2170 let mut i = i.clone();
2171 i.create_sql = i.create_sql.map(do_rewrite);
2172 CatalogItem::Source(i)
2173 }
2174 CatalogItem::Sink(i) => {
2175 let mut i = i.clone();
2176 i.create_sql = do_rewrite(i.create_sql);
2177 CatalogItem::Sink(i)
2178 }
2179 CatalogItem::View(i) => {
2180 let mut i = i.clone();
2181 i.create_sql = do_rewrite(i.create_sql);
2182 CatalogItem::View(i)
2183 }
2184 CatalogItem::MaterializedView(i) => {
2185 let mut i = i.clone();
2186 i.create_sql = do_rewrite(i.create_sql);
2187 CatalogItem::MaterializedView(i)
2188 }
2189 CatalogItem::Index(i) => {
2190 let mut i = i.clone();
2191 i.create_sql = do_rewrite(i.create_sql);
2192 CatalogItem::Index(i)
2193 }
2194 CatalogItem::Secret(i) => {
2195 let mut i = i.clone();
2196 i.create_sql = do_rewrite(i.create_sql);
2197 CatalogItem::Secret(i)
2198 }
2199 CatalogItem::Func(_) | CatalogItem::Type(_) => {
2200 unreachable!("references of {}s cannot be replaced", self.typ())
2201 }
2202 CatalogItem::Connection(i) => {
2203 let mut i = i.clone();
2204 i.create_sql = do_rewrite(i.create_sql);
2205 CatalogItem::Connection(i)
2206 }
2207 }
2208 }
2209 pub fn update_retain_history(
2212 &mut self,
2213 value: Option<Value>,
2214 window: CompactionWindow,
2215 ) -> Result<Option<WithOptionValue<Raw>>, ()> {
2216 let update = |mut ast: &mut Statement<Raw>| {
2217 macro_rules! update_retain_history {
2219 ( $stmt:ident, $opt:ident, $name:ident ) => {{
2220 let pos = $stmt
2222 .with_options
2223 .iter()
2224 .rposition(|o| o.name == mz_sql_parser::ast::$name::RetainHistory);
2226 if let Some(value) = value {
2227 let next = mz_sql_parser::ast::$opt {
2228 name: mz_sql_parser::ast::$name::RetainHistory,
2229 value: Some(WithOptionValue::RetainHistoryFor(value)),
2230 };
2231 if let Some(idx) = pos {
2232 let previous = $stmt.with_options[idx].clone();
2233 $stmt.with_options[idx] = next;
2234 previous.value
2235 } else {
2236 $stmt.with_options.push(next);
2237 None
2238 }
2239 } else {
2240 if let Some(idx) = pos {
2241 $stmt.with_options.swap_remove(idx).value
2242 } else {
2243 None
2244 }
2245 }
2246 }};
2247 }
2248 let previous = match &mut ast {
2249 Statement::CreateTable(stmt) => {
2250 update_retain_history!(stmt, TableOption, TableOptionName)
2251 }
2252 Statement::CreateIndex(stmt) => {
2253 update_retain_history!(stmt, IndexOption, IndexOptionName)
2254 }
2255 Statement::CreateSource(stmt) => {
2256 update_retain_history!(stmt, CreateSourceOption, CreateSourceOptionName)
2257 }
2258 Statement::CreateMaterializedView(stmt) => {
2259 update_retain_history!(stmt, MaterializedViewOption, MaterializedViewOptionName)
2260 }
2261 _ => {
2262 return Err(());
2263 }
2264 };
2265 Ok(previous)
2266 };
2267
2268 let res = self.update_sql(update)?;
2269 let cw = self
2270 .custom_logical_compaction_window_mut()
2271 .expect("item must have compaction window");
2272 *cw = Some(window);
2273 Ok(res)
2274 }
2275
2276 pub fn update_timestamp_interval(
2279 &mut self,
2280 value: Option<Value>,
2281 interval: Duration,
2282 ) -> Result<Option<WithOptionValue<Raw>>, ()> {
2283 let update = |ast: &mut Statement<Raw>| match ast {
2284 Statement::CreateSource(stmt) => {
2285 let pos = stmt.with_options.iter().rposition(|o| {
2286 o.name == mz_sql_parser::ast::CreateSourceOptionName::TimestampInterval
2287 });
2288 let previous = if let Some(value) = value {
2289 let next = mz_sql_parser::ast::CreateSourceOption {
2290 name: mz_sql_parser::ast::CreateSourceOptionName::TimestampInterval,
2291 value: Some(WithOptionValue::Value(value)),
2292 };
2293 if let Some(idx) = pos {
2294 let previous = stmt.with_options[idx].clone();
2295 stmt.with_options[idx] = next;
2296 previous.value
2297 } else {
2298 stmt.with_options.push(next);
2299 None
2300 }
2301 } else if let Some(idx) = pos {
2302 stmt.with_options.swap_remove(idx).value
2303 } else {
2304 None
2305 };
2306 Ok(previous)
2307 }
2308 _ => Err(()),
2309 };
2310
2311 let previous = self.update_sql(update)?;
2312
2313 match self {
2315 CatalogItem::Source(source) => {
2316 match &mut source.data_source {
2317 DataSourceDesc::Ingestion { desc, .. }
2318 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
2319 desc.timestamp_interval = interval;
2320 }
2321 _ => return Err(()),
2322 }
2323 Ok(previous)
2324 }
2325 _ => Err(()),
2326 }
2327 }
2328
2329 pub fn add_column(
2330 &mut self,
2331 name: ColumnName,
2332 typ: SqlColumnType,
2333 sql: RawDataType,
2334 ) -> Result<RelationVersion, PlanError> {
2335 let CatalogItem::Table(table) = self else {
2336 return Err(PlanError::Unsupported {
2337 feature: "adding columns to a non-Table".to_string(),
2338 discussion_no: None,
2339 });
2340 };
2341 let next_version = table.desc.add_column(name.clone(), typ);
2342
2343 let update = |mut ast: &mut Statement<Raw>| match &mut ast {
2344 Statement::CreateTable(stmt) => {
2345 let version = ColumnOptionDef {
2346 name: None,
2347 option: ColumnOption::Versioned {
2348 action: ColumnVersioned::Added,
2349 version: next_version.into(),
2350 },
2351 };
2352 let column = ColumnDef {
2353 name: name.into(),
2354 data_type: sql,
2355 collation: None,
2356 options: vec![version],
2357 };
2358 stmt.columns.push(column);
2359 Ok(())
2360 }
2361 _ => Err(()),
2362 };
2363
2364 self.update_sql(update)
2365 .map_err(|()| PlanError::Unstructured("expected CREATE TABLE statement".to_string()))?;
2366 Ok(next_version)
2367 }
2368
2369 pub fn update_sql<F, T>(&mut self, f: F) -> Result<T, ()>
2372 where
2373 F: FnOnce(&mut Statement<Raw>) -> Result<T, ()>,
2374 {
2375 let create_sql = match self {
2376 CatalogItem::Table(Table { create_sql, .. })
2377 | CatalogItem::Type(Type { create_sql, .. })
2378 | CatalogItem::Source(Source { create_sql, .. }) => create_sql.as_mut(),
2379 CatalogItem::Sink(Sink { create_sql, .. })
2380 | CatalogItem::View(View { create_sql, .. })
2381 | CatalogItem::MaterializedView(MaterializedView { create_sql, .. })
2382 | CatalogItem::Index(Index { create_sql, .. })
2383 | CatalogItem::Secret(Secret { create_sql, .. })
2384 | CatalogItem::Connection(Connection { create_sql, .. }) => Some(create_sql),
2385 CatalogItem::Func(_) | CatalogItem::Log(_) => None,
2386 };
2387 let Some(create_sql) = create_sql else {
2388 return Err(());
2389 };
2390 let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
2391 .expect("non-system items must be parseable")
2392 .into_element()
2393 .ast;
2394 debug!("rewrite: {}", ast.to_ast_string_redacted());
2395 let t = f(&mut ast)?;
2396 *create_sql = ast.to_ast_string_stable();
2397 debug!("rewrote: {}", ast.to_ast_string_redacted());
2398 Ok(t)
2399 }
2400
2401 pub fn is_compute_object_on_cluster(&self) -> Option<ClusterId> {
2408 match self {
2409 CatalogItem::Index(index) => Some(index.cluster_id),
2410 CatalogItem::Table(_)
2411 | CatalogItem::Source(_)
2412 | CatalogItem::Log(_)
2413 | CatalogItem::View(_)
2414 | CatalogItem::MaterializedView(_)
2415 | CatalogItem::Sink(_)
2416 | CatalogItem::Type(_)
2417 | CatalogItem::Func(_)
2418 | CatalogItem::Secret(_)
2419 | CatalogItem::Connection(_) => None,
2420 }
2421 }
2422
2423 pub fn cluster_id(&self) -> Option<ClusterId> {
2424 match self {
2425 CatalogItem::MaterializedView(mv) => Some(mv.cluster_id),
2426 CatalogItem::Index(index) => Some(index.cluster_id),
2427 CatalogItem::Source(source) => match &source.data_source {
2428 DataSourceDesc::Ingestion { cluster_id, .. }
2429 | DataSourceDesc::OldSyntaxIngestion { cluster_id, .. } => Some(*cluster_id),
2430 DataSourceDesc::IngestionExport { .. } => None,
2434 DataSourceDesc::Webhook { cluster_id, .. } => Some(*cluster_id),
2435 DataSourceDesc::Introspection(_)
2436 | DataSourceDesc::Progress
2437 | DataSourceDesc::Catalog => None,
2438 },
2439 CatalogItem::Sink(sink) => Some(sink.cluster_id),
2440 CatalogItem::Table(_)
2441 | CatalogItem::Log(_)
2442 | CatalogItem::View(_)
2443 | CatalogItem::Type(_)
2444 | CatalogItem::Func(_)
2445 | CatalogItem::Secret(_)
2446 | CatalogItem::Connection(_) => None,
2447 }
2448 }
2449
2450 pub fn custom_logical_compaction_window(&self) -> Option<CompactionWindow> {
2453 match self {
2454 CatalogItem::Table(table) => table.custom_logical_compaction_window,
2455 CatalogItem::Source(source) => source.custom_logical_compaction_window,
2456 CatalogItem::Index(index) => index.custom_logical_compaction_window,
2457 CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
2458 CatalogItem::Log(_)
2459 | CatalogItem::View(_)
2460 | CatalogItem::Sink(_)
2461 | CatalogItem::Type(_)
2462 | CatalogItem::Func(_)
2463 | CatalogItem::Secret(_)
2464 | CatalogItem::Connection(_) => None,
2465 }
2466 }
2467
2468 pub fn custom_logical_compaction_window_mut(
2472 &mut self,
2473 ) -> Option<&mut Option<CompactionWindow>> {
2474 let cw = match self {
2475 CatalogItem::Table(table) => &mut table.custom_logical_compaction_window,
2476 CatalogItem::Source(source) => &mut source.custom_logical_compaction_window,
2477 CatalogItem::Index(index) => &mut index.custom_logical_compaction_window,
2478 CatalogItem::MaterializedView(mview) => &mut mview.custom_logical_compaction_window,
2479 CatalogItem::Log(_)
2480 | CatalogItem::View(_)
2481 | CatalogItem::Sink(_)
2482 | CatalogItem::Type(_)
2483 | CatalogItem::Func(_)
2484 | CatalogItem::Secret(_)
2485 | CatalogItem::Connection(_) => return None,
2486 };
2487 Some(cw)
2488 }
2489
2490 pub fn initial_logical_compaction_window(&self) -> Option<CompactionWindow> {
2498 let custom_logical_compaction_window = match self {
2499 CatalogItem::Table(_)
2500 | CatalogItem::Source(_)
2501 | CatalogItem::Index(_)
2502 | CatalogItem::MaterializedView(_) => self.custom_logical_compaction_window(),
2503 CatalogItem::Log(_)
2504 | CatalogItem::View(_)
2505 | CatalogItem::Sink(_)
2506 | CatalogItem::Type(_)
2507 | CatalogItem::Func(_)
2508 | CatalogItem::Secret(_)
2509 | CatalogItem::Connection(_) => return None,
2510 };
2511 Some(custom_logical_compaction_window.unwrap_or(CompactionWindow::Default))
2512 }
2513
2514 pub fn is_retained_metrics_object(&self) -> bool {
2518 match self {
2519 CatalogItem::Table(table) => table.is_retained_metrics_object,
2520 CatalogItem::Source(source) => source.is_retained_metrics_object,
2521 CatalogItem::Index(index) => index.is_retained_metrics_object,
2522 CatalogItem::Log(_)
2523 | CatalogItem::View(_)
2524 | CatalogItem::MaterializedView(_)
2525 | CatalogItem::Sink(_)
2526 | CatalogItem::Type(_)
2527 | CatalogItem::Func(_)
2528 | CatalogItem::Secret(_)
2529 | CatalogItem::Connection(_) => false,
2530 }
2531 }
2532
2533 pub fn to_serialized(&self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2534 match self {
2535 CatalogItem::Table(table) => {
2536 let create_sql = table
2537 .create_sql
2538 .clone()
2539 .expect("builtin tables cannot be serialized");
2540 let mut collections = table.collections.clone();
2541 let global_id = collections
2542 .remove(&RelationVersion::root())
2543 .expect("at least one version");
2544 (create_sql, global_id, collections)
2545 }
2546 CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2547 CatalogItem::Source(source) => {
2548 assert!(
2549 !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2550 "cannot serialize introspection/builtin sources",
2551 );
2552 let create_sql = source
2553 .create_sql
2554 .clone()
2555 .expect("builtin sources cannot be serialized");
2556 (create_sql, source.global_id, BTreeMap::new())
2557 }
2558 CatalogItem::View(view) => (view.create_sql.clone(), view.global_id, BTreeMap::new()),
2559 CatalogItem::MaterializedView(mview) => {
2560 let mut collections = mview.collections.clone();
2561 let global_id = collections
2562 .remove(&RelationVersion::root())
2563 .expect("at least one version");
2564 (mview.create_sql.clone(), global_id, collections)
2565 }
2566 CatalogItem::Index(index) => {
2567 (index.create_sql.clone(), index.global_id, BTreeMap::new())
2568 }
2569 CatalogItem::Sink(sink) => (sink.create_sql.clone(), sink.global_id, BTreeMap::new()),
2570 CatalogItem::Type(typ) => {
2571 let create_sql = typ
2572 .create_sql
2573 .clone()
2574 .expect("builtin types cannot be serialized");
2575 (create_sql, typ.global_id, BTreeMap::new())
2576 }
2577 CatalogItem::Secret(secret) => {
2578 (secret.create_sql.clone(), secret.global_id, BTreeMap::new())
2579 }
2580 CatalogItem::Connection(connection) => (
2581 connection.create_sql.clone(),
2582 connection.global_id,
2583 BTreeMap::new(),
2584 ),
2585 CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2586 }
2587 }
2588
2589 pub fn into_serialized(self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2590 match self {
2591 CatalogItem::Table(mut table) => {
2592 let create_sql = table
2593 .create_sql
2594 .expect("builtin tables cannot be serialized");
2595 let global_id = table
2596 .collections
2597 .remove(&RelationVersion::root())
2598 .expect("at least one version");
2599 (create_sql, global_id, table.collections)
2600 }
2601 CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2602 CatalogItem::Source(source) => {
2603 assert!(
2604 !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2605 "cannot serialize introspection/builtin sources",
2606 );
2607 let create_sql = source
2608 .create_sql
2609 .expect("builtin sources cannot be serialized");
2610 (create_sql, source.global_id, BTreeMap::new())
2611 }
2612 CatalogItem::View(view) => (view.create_sql, view.global_id, BTreeMap::new()),
2613 CatalogItem::MaterializedView(mut mview) => {
2614 let global_id = mview
2615 .collections
2616 .remove(&RelationVersion::root())
2617 .expect("at least one version");
2618 (mview.create_sql, global_id, mview.collections)
2619 }
2620 CatalogItem::Index(index) => (index.create_sql, index.global_id, BTreeMap::new()),
2621 CatalogItem::Sink(sink) => (sink.create_sql, sink.global_id, BTreeMap::new()),
2622 CatalogItem::Type(typ) => {
2623 let create_sql = typ.create_sql.expect("builtin types cannot be serialized");
2624 (create_sql, typ.global_id, BTreeMap::new())
2625 }
2626 CatalogItem::Secret(secret) => (secret.create_sql, secret.global_id, BTreeMap::new()),
2627 CatalogItem::Connection(connection) => {
2628 (connection.create_sql, connection.global_id, BTreeMap::new())
2629 }
2630 CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2631 }
2632 }
2633
2634 pub fn global_id_for_version(&self, version: RelationVersionSelector) -> Option<GlobalId> {
2637 let collections = match self {
2638 CatalogItem::MaterializedView(mv) => &mv.collections,
2639 CatalogItem::Table(table) => &table.collections,
2640 CatalogItem::Source(source) => return Some(source.global_id),
2641 CatalogItem::Log(log) => return Some(log.global_id),
2642 CatalogItem::View(view) => return Some(view.global_id),
2643 CatalogItem::Sink(sink) => return Some(sink.global_id),
2644 CatalogItem::Index(index) => return Some(index.global_id),
2645 CatalogItem::Type(ty) => return Some(ty.global_id),
2646 CatalogItem::Func(func) => return Some(func.global_id),
2647 CatalogItem::Secret(secret) => return Some(secret.global_id),
2648 CatalogItem::Connection(conn) => return Some(conn.global_id),
2649 };
2650 match version {
2651 RelationVersionSelector::Latest => collections.values().last().copied(),
2652 RelationVersionSelector::Specific(version) => collections.get(&version).copied(),
2653 }
2654 }
2655}
2656
2657impl CatalogEntry {
2658 pub fn relation_desc_latest(&self) -> Option<Cow<'_, RelationDesc>> {
2661 self.item.relation_desc(RelationVersionSelector::Latest)
2662 }
2663
2664 pub fn has_columns(&self) -> bool {
2666 match self.item() {
2667 CatalogItem::Type(Type { details, .. }) => {
2668 matches!(details.typ, CatalogType::Record { .. })
2669 }
2670 _ => self.relation_desc_latest().is_some(),
2671 }
2672 }
2673
2674 pub fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
2676 self.item.func(self)
2677 }
2678
2679 pub fn index(&self) -> Option<&Index> {
2681 match self.item() {
2682 CatalogItem::Index(idx) => Some(idx),
2683 _ => None,
2684 }
2685 }
2686
2687 pub fn materialized_view(&self) -> Option<&MaterializedView> {
2689 match self.item() {
2690 CatalogItem::MaterializedView(mv) => Some(mv),
2691 _ => None,
2692 }
2693 }
2694
2695 pub fn table(&self) -> Option<&Table> {
2697 match self.item() {
2698 CatalogItem::Table(tbl) => Some(tbl),
2699 _ => None,
2700 }
2701 }
2702
2703 pub fn source(&self) -> Option<&Source> {
2705 match self.item() {
2706 CatalogItem::Source(src) => Some(src),
2707 _ => None,
2708 }
2709 }
2710
2711 pub fn sink(&self) -> Option<&Sink> {
2713 match self.item() {
2714 CatalogItem::Sink(sink) => Some(sink),
2715 _ => None,
2716 }
2717 }
2718
2719 pub fn secret(&self) -> Option<&Secret> {
2721 match self.item() {
2722 CatalogItem::Secret(secret) => Some(secret),
2723 _ => None,
2724 }
2725 }
2726
2727 pub fn connection(&self) -> Result<&Connection, SqlCatalogError> {
2728 match self.item() {
2729 CatalogItem::Connection(connection) => Ok(connection),
2730 _ => {
2731 let db_name = match self.name().qualifiers.database_spec {
2732 ResolvedDatabaseSpecifier::Ambient => "".to_string(),
2733 ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
2734 };
2735 Err(SqlCatalogError::UnknownConnection(format!(
2736 "{}{}.{}",
2737 db_name,
2738 self.name().qualifiers.schema_spec,
2739 self.name().item
2740 )))
2741 }
2742 }
2743 }
2744
2745 pub fn source_desc(
2748 &self,
2749 ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
2750 self.item.source_desc(self)
2751 }
2752
2753 pub fn is_connection(&self) -> bool {
2755 matches!(self.item(), CatalogItem::Connection(_))
2756 }
2757
2758 pub fn is_table(&self) -> bool {
2760 matches!(self.item(), CatalogItem::Table(_))
2761 }
2762
2763 pub fn is_source(&self) -> bool {
2766 matches!(self.item(), CatalogItem::Source(_))
2767 }
2768
2769 pub fn subsource_details(
2772 &self,
2773 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
2774 match &self.item() {
2775 CatalogItem::Source(source) => match &source.data_source {
2776 DataSourceDesc::IngestionExport {
2777 ingestion_id,
2778 external_reference,
2779 details,
2780 data_config: _,
2781 } => Some((*ingestion_id, external_reference, details)),
2782 _ => None,
2783 },
2784 _ => None,
2785 }
2786 }
2787
2788 pub fn source_export_details(
2791 &self,
2792 ) -> Option<(
2793 CatalogItemId,
2794 &UnresolvedItemName,
2795 &SourceExportDetails,
2796 &SourceExportDataConfig<ReferencedConnection>,
2797 )> {
2798 match &self.item() {
2799 CatalogItem::Source(source) => match &source.data_source {
2800 DataSourceDesc::IngestionExport {
2801 ingestion_id,
2802 external_reference,
2803 details,
2804 data_config,
2805 } => Some((*ingestion_id, external_reference, details, data_config)),
2806 _ => None,
2807 },
2808 CatalogItem::Table(table) => match &table.data_source {
2809 TableDataSource::DataSource {
2810 desc:
2811 DataSourceDesc::IngestionExport {
2812 ingestion_id,
2813 external_reference,
2814 details,
2815 data_config,
2816 },
2817 timeline: _,
2818 } => Some((*ingestion_id, external_reference, details, data_config)),
2819 _ => None,
2820 },
2821 _ => None,
2822 }
2823 }
2824
2825 pub fn is_progress_source(&self) -> bool {
2827 self.item().is_progress_source()
2828 }
2829
2830 pub fn progress_id(&self) -> Option<CatalogItemId> {
2832 match &self.item() {
2833 CatalogItem::Source(source) => match &source.data_source {
2834 DataSourceDesc::Ingestion { .. } => Some(self.id),
2835 DataSourceDesc::OldSyntaxIngestion {
2836 progress_subsource, ..
2837 } => Some(*progress_subsource),
2838 DataSourceDesc::IngestionExport { .. }
2839 | DataSourceDesc::Introspection(_)
2840 | DataSourceDesc::Progress
2841 | DataSourceDesc::Webhook { .. }
2842 | DataSourceDesc::Catalog => None,
2843 },
2844 CatalogItem::Table(_)
2845 | CatalogItem::Log(_)
2846 | CatalogItem::View(_)
2847 | CatalogItem::MaterializedView(_)
2848 | CatalogItem::Sink(_)
2849 | CatalogItem::Index(_)
2850 | CatalogItem::Type(_)
2851 | CatalogItem::Func(_)
2852 | CatalogItem::Secret(_)
2853 | CatalogItem::Connection(_) => None,
2854 }
2855 }
2856
2857 pub fn is_sink(&self) -> bool {
2859 matches!(self.item(), CatalogItem::Sink(_))
2860 }
2861
2862 pub fn is_materialized_view(&self) -> bool {
2864 matches!(self.item(), CatalogItem::MaterializedView(_))
2865 }
2866
2867 pub fn is_view(&self) -> bool {
2869 matches!(self.item(), CatalogItem::View(_))
2870 }
2871
2872 pub fn is_secret(&self) -> bool {
2874 matches!(self.item(), CatalogItem::Secret(_))
2875 }
2876
2877 pub fn is_introspection_source(&self) -> bool {
2879 matches!(self.item(), CatalogItem::Log(_))
2880 }
2881
2882 pub fn is_index(&self) -> bool {
2884 matches!(self.item(), CatalogItem::Index(_))
2885 }
2886
2887 pub fn is_relation(&self) -> bool {
2889 mz_sql::catalog::ObjectType::from(self.item_type()).is_relation()
2890 }
2891
2892 pub fn references(&self) -> &ResolvedIds {
2895 self.item.references()
2896 }
2897
2898 pub fn uses(&self) -> BTreeSet<CatalogItemId> {
2904 self.item.uses()
2905 }
2906
2907 pub fn item(&self) -> &CatalogItem {
2909 &self.item
2910 }
2911
2912 pub fn item_mut(&mut self) -> &mut CatalogItem {
2915 &mut self.item
2916 }
2917
2918 pub fn id(&self) -> CatalogItemId {
2920 self.id
2921 }
2922
2923 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2925 self.item().global_ids()
2926 }
2927
2928 pub fn latest_global_id(&self) -> GlobalId {
2929 self.item().latest_global_id()
2930 }
2931
2932 pub fn oid(&self) -> u32 {
2934 self.oid
2935 }
2936
2937 pub fn name(&self) -> &QualifiedItemName {
2939 &self.name
2940 }
2941
2942 pub fn referenced_by(&self) -> &[CatalogItemId] {
2944 &self.referenced_by
2945 }
2946
2947 pub fn used_by(&self) -> &[CatalogItemId] {
2949 &self.used_by
2950 }
2951
2952 pub fn conn_id(&self) -> Option<&ConnectionId> {
2955 self.item.conn_id()
2956 }
2957
2958 pub fn owner_id(&self) -> &RoleId {
2960 &self.owner_id
2961 }
2962
2963 pub fn privileges(&self) -> &PrivilegeMap {
2965 &self.privileges
2966 }
2967
2968 pub fn comment_object_id(&self) -> CommentObjectId {
2970 use CatalogItemType::*;
2971 match self.item_type() {
2972 Table => CommentObjectId::Table(self.id),
2973 Source => CommentObjectId::Source(self.id),
2974 Sink => CommentObjectId::Sink(self.id),
2975 View => CommentObjectId::View(self.id),
2976 MaterializedView => CommentObjectId::MaterializedView(self.id),
2977 Index => CommentObjectId::Index(self.id),
2978 Func => CommentObjectId::Func(self.id),
2979 Connection => CommentObjectId::Connection(self.id),
2980 Type => CommentObjectId::Type(self.id),
2981 Secret => CommentObjectId::Secret(self.id),
2982 }
2983 }
2984}
2985
2986#[derive(Debug, Clone, Default)]
2987pub struct CommentsMap {
2988 map: BTreeMap<CommentObjectId, BTreeMap<Option<usize>, String>>,
2989}
2990
2991impl CommentsMap {
2992 pub fn update_comment(
2993 &mut self,
2994 object_id: CommentObjectId,
2995 sub_component: Option<usize>,
2996 comment: Option<String>,
2997 ) -> Option<String> {
2998 let object_comments = self.map.entry(object_id).or_default();
2999
3000 let (empty, prev) = if let Some(comment) = comment {
3002 let prev = object_comments.insert(sub_component, comment);
3003 (false, prev)
3004 } else {
3005 let prev = object_comments.remove(&sub_component);
3006 (object_comments.is_empty(), prev)
3007 };
3008
3009 if empty {
3011 self.map.remove(&object_id);
3012 }
3013
3014 prev
3016 }
3017
3018 pub fn drop_comments(
3024 &mut self,
3025 object_ids: &BTreeSet<CommentObjectId>,
3026 ) -> Vec<(CommentObjectId, Option<usize>, String)> {
3027 let mut removed_comments = Vec::new();
3028
3029 for object_id in object_ids {
3030 if let Some(comments) = self.map.remove(object_id) {
3031 let removed = comments
3032 .into_iter()
3033 .map(|(sub_comp, comment)| (object_id.clone(), sub_comp, comment));
3034 removed_comments.extend(removed);
3035 }
3036 }
3037
3038 removed_comments
3039 }
3040
3041 pub fn iter(&self) -> impl Iterator<Item = (CommentObjectId, Option<usize>, &str)> {
3042 self.map
3043 .iter()
3044 .map(|(id, comments)| {
3045 comments
3046 .iter()
3047 .map(|(pos, comment)| (*id, *pos, comment.as_str()))
3048 })
3049 .flatten()
3050 }
3051
3052 pub fn get_object_comments(
3053 &self,
3054 object_id: CommentObjectId,
3055 ) -> Option<&BTreeMap<Option<usize>, String>> {
3056 self.map.get(&object_id)
3057 }
3058}
3059
3060impl Serialize for CommentsMap {
3061 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
3062 where
3063 S: serde::Serializer,
3064 {
3065 let comment_count = self
3066 .map
3067 .iter()
3068 .map(|(_object_id, comments)| comments.len())
3069 .sum();
3070
3071 let mut seq = serializer.serialize_seq(Some(comment_count))?;
3072 for (object_id, sub) in &self.map {
3073 for (sub_component, comment) in sub {
3074 seq.serialize_element(&(
3075 format!("{object_id:?}"),
3076 format!("{sub_component:?}"),
3077 comment,
3078 ))?;
3079 }
3080 }
3081 seq.end()
3082 }
3083}
3084
3085#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
3086pub struct DefaultPrivileges {
3087 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
3088 privileges: BTreeMap<DefaultPrivilegeObject, RoleDefaultPrivileges>,
3089}
3090
3091#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
3094struct RoleDefaultPrivileges(
3095 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
3097 BTreeMap<RoleId, DefaultPrivilegeAclItem>,
3098);
3099
3100impl Deref for RoleDefaultPrivileges {
3101 type Target = BTreeMap<RoleId, DefaultPrivilegeAclItem>;
3102
3103 fn deref(&self) -> &Self::Target {
3104 &self.0
3105 }
3106}
3107
3108impl DerefMut for RoleDefaultPrivileges {
3109 fn deref_mut(&mut self) -> &mut Self::Target {
3110 &mut self.0
3111 }
3112}
3113
3114impl DefaultPrivileges {
3115 pub fn grant(&mut self, object: DefaultPrivilegeObject, privilege: DefaultPrivilegeAclItem) {
3117 if privilege.acl_mode.is_empty() {
3118 return;
3119 }
3120
3121 let privileges = self.privileges.entry(object).or_default();
3122 if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
3123 default_privilege.acl_mode |= privilege.acl_mode;
3124 } else {
3125 privileges.insert(privilege.grantee, privilege);
3126 }
3127 }
3128
3129 pub fn revoke(&mut self, object: &DefaultPrivilegeObject, privilege: &DefaultPrivilegeAclItem) {
3131 if let Some(privileges) = self.privileges.get_mut(object) {
3132 if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
3133 default_privilege.acl_mode =
3134 default_privilege.acl_mode.difference(privilege.acl_mode);
3135 if default_privilege.acl_mode.is_empty() {
3136 privileges.remove(&privilege.grantee);
3137 }
3138 }
3139 if privileges.is_empty() {
3140 self.privileges.remove(object);
3141 }
3142 }
3143 }
3144
3145 pub fn get_privileges_for_grantee(
3148 &self,
3149 object: &DefaultPrivilegeObject,
3150 grantee: &RoleId,
3151 ) -> Option<&AclMode> {
3152 self.privileges
3153 .get(object)
3154 .and_then(|privileges| privileges.get(grantee))
3155 .map(|privilege| &privilege.acl_mode)
3156 }
3157
3158 pub fn get_applicable_privileges(
3160 &self,
3161 role_id: RoleId,
3162 database_id: Option<DatabaseId>,
3163 schema_id: Option<SchemaId>,
3164 object_type: mz_sql::catalog::ObjectType,
3165 ) -> impl Iterator<Item = DefaultPrivilegeAclItem> + '_ {
3166 let privilege_object_type = if object_type.is_relation() {
3170 mz_sql::catalog::ObjectType::Table
3171 } else {
3172 object_type
3173 };
3174 let valid_acl_mode = rbac::all_object_privileges(SystemObjectType::Object(object_type));
3175
3176 [
3180 DefaultPrivilegeObject {
3181 role_id,
3182 database_id,
3183 schema_id,
3184 object_type: privilege_object_type,
3185 },
3186 DefaultPrivilegeObject {
3187 role_id,
3188 database_id,
3189 schema_id: None,
3190 object_type: privilege_object_type,
3191 },
3192 DefaultPrivilegeObject {
3193 role_id,
3194 database_id: None,
3195 schema_id: None,
3196 object_type: privilege_object_type,
3197 },
3198 DefaultPrivilegeObject {
3199 role_id: RoleId::Public,
3200 database_id,
3201 schema_id,
3202 object_type: privilege_object_type,
3203 },
3204 DefaultPrivilegeObject {
3205 role_id: RoleId::Public,
3206 database_id,
3207 schema_id: None,
3208 object_type: privilege_object_type,
3209 },
3210 DefaultPrivilegeObject {
3211 role_id: RoleId::Public,
3212 database_id: None,
3213 schema_id: None,
3214 object_type: privilege_object_type,
3215 },
3216 ]
3217 .into_iter()
3218 .filter_map(|object| self.privileges.get(&object))
3219 .flat_map(|acl_map| acl_map.values())
3220 .fold(
3222 BTreeMap::new(),
3223 |mut accum, DefaultPrivilegeAclItem { grantee, acl_mode }| {
3224 let accum_acl_mode = accum.entry(grantee).or_insert_with(AclMode::empty);
3225 *accum_acl_mode |= *acl_mode;
3226 accum
3227 },
3228 )
3229 .into_iter()
3230 .map(move |(grantee, acl_mode)| (grantee, acl_mode & valid_acl_mode))
3235 .filter(|(_, acl_mode)| !acl_mode.is_empty())
3237 .map(|(grantee, acl_mode)| DefaultPrivilegeAclItem {
3238 grantee: *grantee,
3239 acl_mode,
3240 })
3241 }
3242
3243 pub fn iter(
3244 &self,
3245 ) -> impl Iterator<
3246 Item = (
3247 &DefaultPrivilegeObject,
3248 impl Iterator<Item = &DefaultPrivilegeAclItem>,
3249 ),
3250 > {
3251 self.privileges
3252 .iter()
3253 .map(|(object, acl_map)| (object, acl_map.values()))
3254 }
3255}
3256
3257#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3258pub struct ClusterConfig {
3259 pub variant: ClusterVariant,
3260 pub workload_class: Option<String>,
3261}
3262
3263impl ClusterConfig {
3264 pub fn features(&self) -> Option<&OptimizerFeatureOverrides> {
3265 match &self.variant {
3266 ClusterVariant::Managed(managed) => Some(&managed.optimizer_feature_overrides),
3267 ClusterVariant::Unmanaged => None,
3268 }
3269 }
3270}
3271
3272impl From<ClusterConfig> for durable::ClusterConfig {
3273 fn from(config: ClusterConfig) -> Self {
3274 Self {
3275 variant: config.variant.into(),
3276 workload_class: config.workload_class,
3277 }
3278 }
3279}
3280
3281impl From<durable::ClusterConfig> for ClusterConfig {
3282 fn from(config: durable::ClusterConfig) -> Self {
3283 Self {
3284 variant: config.variant.into(),
3285 workload_class: config.workload_class,
3286 }
3287 }
3288}
3289
3290#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3291pub struct ClusterVariantManaged {
3292 pub size: String,
3293 pub availability_zones: Vec<String>,
3294 pub logging: ReplicaLogging,
3295 pub replication_factor: u32,
3296 pub optimizer_feature_overrides: OptimizerFeatureOverrides,
3297 pub schedule: ClusterSchedule,
3298 pub auto_scaling_strategy: Option<AutoScalingStrategy>,
3301 pub reconfiguration: Option<ReconfigurationState>,
3303 pub burst: Option<BurstState>,
3305}
3306
3307impl From<ClusterVariantManaged> for durable::ClusterVariantManaged {
3308 fn from(managed: ClusterVariantManaged) -> Self {
3309 let ClusterVariantManaged {
3312 size,
3313 availability_zones,
3314 logging,
3315 replication_factor,
3316 optimizer_feature_overrides,
3317 schedule,
3318 auto_scaling_strategy,
3319 reconfiguration,
3320 burst,
3321 } = managed;
3322 Self {
3323 size,
3324 availability_zones,
3325 logging,
3326 replication_factor,
3327 optimizer_feature_overrides: optimizer_feature_overrides.into(),
3328 schedule,
3329 auto_scaling_strategy,
3330 reconfiguration: reconfiguration.map(Into::into),
3331 burst: burst.map(Into::into),
3332 }
3333 }
3334}
3335
3336impl From<durable::ClusterVariantManaged> for ClusterVariantManaged {
3337 fn from(managed: durable::ClusterVariantManaged) -> Self {
3338 let durable::ClusterVariantManaged {
3341 size,
3342 availability_zones,
3343 logging,
3344 replication_factor,
3345 optimizer_feature_overrides,
3346 schedule,
3347 auto_scaling_strategy,
3348 reconfiguration,
3349 burst,
3350 } = managed;
3351 Self {
3352 size,
3353 availability_zones,
3354 logging,
3355 replication_factor,
3356 optimizer_feature_overrides: optimizer_feature_overrides.into(),
3357 schedule,
3358 auto_scaling_strategy,
3359 reconfiguration: reconfiguration.map(Into::into),
3360 burst: burst.map(Into::into),
3361 }
3362 }
3363}
3364
3365#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3372pub struct ReconfigurationState {
3373 pub target: ReconfigurationTarget,
3374 pub deadline: Timestamp,
3375 pub on_timeout: OnTimeoutAction,
3376}
3377
3378impl From<ReconfigurationState> for durable::ReconfigurationState {
3379 fn from(state: ReconfigurationState) -> Self {
3380 let ReconfigurationState {
3383 target,
3384 deadline,
3385 on_timeout,
3386 } = state;
3387 Self {
3388 target: target.into(),
3389 deadline,
3390 on_timeout,
3391 }
3392 }
3393}
3394
3395impl From<durable::ReconfigurationState> for ReconfigurationState {
3396 fn from(state: durable::ReconfigurationState) -> Self {
3397 let durable::ReconfigurationState {
3400 target,
3401 deadline,
3402 on_timeout,
3403 } = state;
3404 Self {
3405 target: target.into(),
3406 deadline,
3407 on_timeout,
3408 }
3409 }
3410}
3411
3412#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3414pub struct ReconfigurationTarget {
3415 pub size: String,
3416 pub replication_factor: u32,
3417 pub availability_zones: Vec<String>,
3418 pub logging: ReplicaLogging,
3419}
3420
3421impl From<ReconfigurationTarget> for durable::ReconfigurationTarget {
3422 fn from(target: ReconfigurationTarget) -> Self {
3423 let ReconfigurationTarget {
3426 size,
3427 replication_factor,
3428 availability_zones,
3429 logging,
3430 } = target;
3431 Self {
3432 size,
3433 replication_factor,
3434 availability_zones,
3435 logging,
3436 }
3437 }
3438}
3439
3440impl From<durable::ReconfigurationTarget> for ReconfigurationTarget {
3441 fn from(target: durable::ReconfigurationTarget) -> Self {
3442 let durable::ReconfigurationTarget {
3445 size,
3446 replication_factor,
3447 availability_zones,
3448 logging,
3449 } = target;
3450 Self {
3451 size,
3452 replication_factor,
3453 availability_zones,
3454 logging,
3455 }
3456 }
3457}
3458
3459#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3461pub struct BurstState {
3462 pub burst_size: String,
3463 pub linger_duration: Duration,
3464 pub steady_hydrated_at: Option<Timestamp>,
3465}
3466
3467impl From<BurstState> for durable::BurstState {
3468 fn from(burst: BurstState) -> Self {
3469 let BurstState {
3472 burst_size,
3473 linger_duration,
3474 steady_hydrated_at,
3475 } = burst;
3476 Self {
3477 burst_size,
3478 linger_duration,
3479 steady_hydrated_at,
3480 }
3481 }
3482}
3483
3484impl From<durable::BurstState> for BurstState {
3485 fn from(burst: durable::BurstState) -> Self {
3486 let durable::BurstState {
3489 burst_size,
3490 linger_duration,
3491 steady_hydrated_at,
3492 } = burst;
3493 Self {
3494 burst_size,
3495 linger_duration,
3496 steady_hydrated_at,
3497 }
3498 }
3499}
3500
3501#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3502pub enum ClusterVariant {
3503 Managed(ClusterVariantManaged),
3504 Unmanaged,
3505}
3506
3507impl From<ClusterVariant> for durable::ClusterVariant {
3508 fn from(variant: ClusterVariant) -> Self {
3509 match variant {
3510 ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3511 ClusterVariant::Unmanaged => Self::Unmanaged,
3512 }
3513 }
3514}
3515
3516impl From<durable::ClusterVariant> for ClusterVariant {
3517 fn from(variant: durable::ClusterVariant) -> Self {
3518 match variant {
3519 durable::ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3520 durable::ClusterVariant::Unmanaged => Self::Unmanaged,
3521 }
3522 }
3523}
3524
3525impl mz_sql::catalog::CatalogDatabase for Database {
3526 fn name(&self) -> &str {
3527 &self.name
3528 }
3529
3530 fn id(&self) -> DatabaseId {
3531 self.id
3532 }
3533
3534 fn has_schemas(&self) -> bool {
3535 !self.schemas_by_name.is_empty()
3536 }
3537
3538 fn schema_ids(&self) -> &BTreeMap<String, SchemaId> {
3539 &self.schemas_by_name
3540 }
3541
3542 #[allow(clippy::as_conversions)]
3544 fn schemas(&self) -> Vec<&dyn CatalogSchema> {
3545 self.schemas_by_id
3546 .values()
3547 .map(|schema| schema as &dyn CatalogSchema)
3548 .collect()
3549 }
3550
3551 fn owner_id(&self) -> RoleId {
3552 self.owner_id
3553 }
3554
3555 fn privileges(&self) -> &PrivilegeMap {
3556 &self.privileges
3557 }
3558}
3559
3560impl mz_sql::catalog::CatalogSchema for Schema {
3561 fn database(&self) -> &ResolvedDatabaseSpecifier {
3562 &self.name.database
3563 }
3564
3565 fn name(&self) -> &QualifiedSchemaName {
3566 &self.name
3567 }
3568
3569 fn id(&self) -> &SchemaSpecifier {
3570 &self.id
3571 }
3572
3573 fn has_items(&self) -> bool {
3574 !self.items.is_empty()
3575 }
3576
3577 fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_> {
3578 Box::new(
3579 self.items
3580 .values()
3581 .chain(self.functions.values())
3582 .chain(self.types.values())
3583 .copied(),
3584 )
3585 }
3586
3587 fn owner_id(&self) -> RoleId {
3588 self.owner_id
3589 }
3590
3591 fn privileges(&self) -> &PrivilegeMap {
3592 &self.privileges
3593 }
3594}
3595
3596impl mz_sql::catalog::CatalogRole for Role {
3597 fn name(&self) -> &str {
3598 &self.name
3599 }
3600
3601 fn id(&self) -> RoleId {
3602 self.id
3603 }
3604
3605 fn membership(&self) -> &BTreeMap<RoleId, RoleId> {
3606 &self.membership.map
3607 }
3608
3609 fn attributes(&self) -> &RoleAttributes {
3610 &self.attributes
3611 }
3612
3613 fn vars(&self) -> &BTreeMap<String, OwnedVarInput> {
3614 &self.vars.map
3615 }
3616}
3617
3618impl mz_sql::catalog::CatalogNetworkPolicy for NetworkPolicy {
3619 fn name(&self) -> &str {
3620 &self.name
3621 }
3622
3623 fn id(&self) -> NetworkPolicyId {
3624 self.id
3625 }
3626
3627 fn owner_id(&self) -> RoleId {
3628 self.owner_id
3629 }
3630
3631 fn privileges(&self) -> &PrivilegeMap {
3632 &self.privileges
3633 }
3634}
3635
3636impl mz_sql::catalog::CatalogCluster<'_> for Cluster {
3637 fn name(&self) -> &str {
3638 &self.name
3639 }
3640
3641 fn id(&self) -> ClusterId {
3642 self.id
3643 }
3644
3645 fn bound_objects(&self) -> &BTreeSet<CatalogItemId> {
3646 &self.bound_objects
3647 }
3648
3649 fn replica_ids(&self) -> &BTreeMap<String, ReplicaId> {
3650 &self.replica_id_by_name_
3651 }
3652
3653 #[allow(clippy::as_conversions)]
3655 fn replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>> {
3656 self.replicas()
3657 .map(|replica| replica as &dyn CatalogClusterReplica)
3658 .collect()
3659 }
3660
3661 fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica<'_> {
3662 self.replica(id).expect("catalog out of sync")
3663 }
3664
3665 fn owner_id(&self) -> RoleId {
3666 self.owner_id
3667 }
3668
3669 fn privileges(&self) -> &PrivilegeMap {
3670 &self.privileges
3671 }
3672
3673 fn is_managed(&self) -> bool {
3674 self.is_managed()
3675 }
3676
3677 fn managed_size(&self) -> Option<&str> {
3678 match &self.config.variant {
3679 ClusterVariant::Managed(ClusterVariantManaged { size, .. }) => Some(size),
3680 ClusterVariant::Unmanaged => None,
3681 }
3682 }
3683
3684 fn schedule(&self) -> Option<&ClusterSchedule> {
3685 match &self.config.variant {
3686 ClusterVariant::Managed(ClusterVariantManaged { schedule, .. }) => Some(schedule),
3687 ClusterVariant::Unmanaged => None,
3688 }
3689 }
3690
3691 fn replication_factor(&self) -> Option<u32> {
3692 match &self.config.variant {
3693 ClusterVariant::Managed(ClusterVariantManaged {
3694 replication_factor, ..
3695 }) => Some(*replication_factor),
3696 ClusterVariant::Unmanaged => None,
3697 }
3698 }
3699
3700 fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
3701 self.try_to_plan()
3702 }
3703}
3704
3705impl mz_sql::catalog::CatalogClusterReplica<'_> for ClusterReplica {
3706 fn name(&self) -> &str {
3707 &self.name
3708 }
3709
3710 fn cluster_id(&self) -> ClusterId {
3711 self.cluster_id
3712 }
3713
3714 fn replica_id(&self) -> ReplicaId {
3715 self.replica_id
3716 }
3717
3718 fn owner_id(&self) -> RoleId {
3719 self.owner_id
3720 }
3721
3722 fn internal(&self) -> bool {
3723 self.config.location.internal()
3724 }
3725}
3726
3727impl mz_sql::catalog::CatalogItem for CatalogEntry {
3728 fn name(&self) -> &QualifiedItemName {
3729 self.name()
3730 }
3731
3732 fn id(&self) -> CatalogItemId {
3733 self.id()
3734 }
3735
3736 fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
3737 Box::new(self.global_ids())
3738 }
3739
3740 fn oid(&self) -> u32 {
3741 self.oid()
3742 }
3743
3744 fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
3745 self.func()
3746 }
3747
3748 fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
3749 self.source_desc()
3750 }
3751
3752 fn connection(
3753 &self,
3754 ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
3755 {
3756 Ok(self.connection()?.details.to_connection())
3757 }
3758
3759 fn create_sql(&self) -> &str {
3760 match self.item() {
3761 CatalogItem::Table(Table { create_sql, .. }) => {
3762 create_sql.as_deref().unwrap_or("<builtin>")
3763 }
3764 CatalogItem::Source(Source { create_sql, .. }) => {
3765 create_sql.as_deref().unwrap_or("<builtin>")
3766 }
3767 CatalogItem::Sink(Sink { create_sql, .. }) => create_sql,
3768 CatalogItem::View(View { create_sql, .. }) => create_sql,
3769 CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) => create_sql,
3770 CatalogItem::Index(Index { create_sql, .. }) => create_sql,
3771 CatalogItem::Type(Type { create_sql, .. }) => {
3772 create_sql.as_deref().unwrap_or("<builtin>")
3773 }
3774 CatalogItem::Secret(Secret { create_sql, .. }) => create_sql,
3775 CatalogItem::Connection(Connection { create_sql, .. }) => create_sql,
3776 CatalogItem::Func(_) => "<builtin>",
3777 CatalogItem::Log(_) => "<builtin>",
3778 }
3779 }
3780
3781 fn item_type(&self) -> SqlCatalogItemType {
3782 self.item().typ()
3783 }
3784
3785 fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
3786 if let CatalogItem::Index(Index { keys, on, .. }) = self.item() {
3787 Some((keys, *on))
3788 } else {
3789 None
3790 }
3791 }
3792
3793 fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
3794 if let CatalogItem::Table(Table {
3795 data_source: TableDataSource::TableWrites { defaults },
3796 ..
3797 }) = self.item()
3798 {
3799 Some(defaults.as_slice())
3800 } else {
3801 None
3802 }
3803 }
3804
3805 fn replacement_target(&self) -> Option<CatalogItemId> {
3806 if let CatalogItem::MaterializedView(mv) = self.item() {
3807 mv.replacement_target
3808 } else {
3809 None
3810 }
3811 }
3812
3813 fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
3814 if let CatalogItem::Type(Type { details, .. }) = self.item() {
3815 Some(details)
3816 } else {
3817 None
3818 }
3819 }
3820
3821 fn references(&self) -> &ResolvedIds {
3822 self.references()
3823 }
3824
3825 fn uses(&self) -> BTreeSet<CatalogItemId> {
3826 self.uses()
3827 }
3828
3829 fn referenced_by(&self) -> &[CatalogItemId] {
3830 self.referenced_by()
3831 }
3832
3833 fn used_by(&self) -> &[CatalogItemId] {
3834 self.used_by()
3835 }
3836
3837 fn subsource_details(
3838 &self,
3839 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
3840 self.subsource_details()
3841 }
3842
3843 fn source_export_details(
3844 &self,
3845 ) -> Option<(
3846 CatalogItemId,
3847 &UnresolvedItemName,
3848 &SourceExportDetails,
3849 &SourceExportDataConfig<ReferencedConnection>,
3850 )> {
3851 self.source_export_details()
3852 }
3853
3854 fn is_progress_source(&self) -> bool {
3855 self.is_progress_source()
3856 }
3857
3858 fn progress_id(&self) -> Option<CatalogItemId> {
3859 self.progress_id()
3860 }
3861
3862 fn owner_id(&self) -> RoleId {
3863 self.owner_id
3864 }
3865
3866 fn privileges(&self) -> &PrivilegeMap {
3867 &self.privileges
3868 }
3869
3870 fn cluster_id(&self) -> Option<ClusterId> {
3871 self.item().cluster_id()
3872 }
3873
3874 fn at_version(
3875 &self,
3876 version: RelationVersionSelector,
3877 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
3878 Box::new(CatalogCollectionEntry {
3879 entry: self.clone(),
3880 version,
3881 })
3882 }
3883
3884 fn latest_version(&self) -> Option<RelationVersion> {
3885 self.table().map(|t| t.desc.latest_version())
3886 }
3887}
3888
3889#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3891pub struct StateUpdate {
3892 pub kind: StateUpdateKind,
3893 pub ts: Timestamp,
3894 pub diff: StateDiff,
3895}
3896
3897#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3901pub enum StateUpdateKind {
3902 Role(durable::objects::Role),
3903 RoleAuth(durable::objects::RoleAuth),
3904 Database(durable::objects::Database),
3905 Schema(durable::objects::Schema),
3906 DefaultPrivilege(durable::objects::DefaultPrivilege),
3907 SystemPrivilege(MzAclItem),
3908 SystemConfiguration(durable::objects::SystemConfiguration),
3909 Cluster(durable::objects::Cluster),
3910 ClusterSystemConfiguration(durable::objects::ClusterSystemConfiguration),
3911 NetworkPolicy(durable::objects::NetworkPolicy),
3912 IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3913 ClusterReplica(durable::objects::ClusterReplica),
3914 ReplicaSystemConfiguration(durable::objects::ReplicaSystemConfiguration),
3915 SourceReferences(durable::objects::SourceReferences),
3916 SystemObjectMapping(durable::objects::SystemObjectMapping),
3917 TemporaryItem(TemporaryItem),
3921 Item(durable::objects::Item),
3922 Comment(durable::objects::Comment),
3923 AuditLog(durable::objects::AuditLog),
3924 StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3926 UnfinalizedShard(durable::objects::UnfinalizedShard),
3927}
3928
3929#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
3931pub enum StateDiff {
3932 Retraction,
3933 Addition,
3934}
3935
3936impl From<StateDiff> for Diff {
3937 fn from(diff: StateDiff) -> Self {
3938 match diff {
3939 StateDiff::Retraction => Diff::MINUS_ONE,
3940 StateDiff::Addition => Diff::ONE,
3941 }
3942 }
3943}
3944impl TryFrom<Diff> for StateDiff {
3945 type Error = String;
3946
3947 fn try_from(diff: Diff) -> Result<Self, Self::Error> {
3948 match diff {
3949 Diff::MINUS_ONE => Ok(Self::Retraction),
3950 Diff::ONE => Ok(Self::Addition),
3951 diff => Err(format!("invalid diff {diff}")),
3952 }
3953 }
3954}
3955
3956#[derive(Debug, Clone, Ord, PartialOrd, PartialEq, Eq)]
3958pub struct TemporaryItem {
3959 pub id: CatalogItemId,
3960 pub oid: u32,
3961 pub global_id: GlobalId,
3962 pub schema_id: SchemaId,
3963 pub name: String,
3964 pub conn_id: Option<ConnectionId>,
3965 pub create_sql: String,
3966 pub owner_id: RoleId,
3967 pub privileges: Vec<MzAclItem>,
3968 pub extra_versions: BTreeMap<RelationVersion, GlobalId>,
3969}
3970
3971impl From<CatalogEntry> for TemporaryItem {
3972 fn from(entry: CatalogEntry) -> Self {
3973 let conn_id = entry.conn_id().cloned();
3974 let (create_sql, global_id, extra_versions) = entry.item.to_serialized();
3975
3976 TemporaryItem {
3977 id: entry.id,
3978 oid: entry.oid,
3979 global_id,
3980 schema_id: entry.name.qualifiers.schema_spec.into(),
3981 name: entry.name.item,
3982 conn_id,
3983 create_sql,
3984 owner_id: entry.owner_id,
3985 privileges: entry.privileges.into_all_values().collect(),
3986 extra_versions,
3987 }
3988 }
3989}
3990
3991impl TemporaryItem {
3992 pub fn item_type(&self) -> CatalogItemType {
3993 item_type(&self.create_sql)
3994 }
3995}
3996
3997#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
3999pub enum BootstrapStateUpdateKind {
4000 Role(durable::objects::Role),
4001 RoleAuth(durable::objects::RoleAuth),
4002 Database(durable::objects::Database),
4003 Schema(durable::objects::Schema),
4004 DefaultPrivilege(durable::objects::DefaultPrivilege),
4005 SystemPrivilege(MzAclItem),
4006 SystemConfiguration(durable::objects::SystemConfiguration),
4007 Cluster(durable::objects::Cluster),
4008 ClusterSystemConfiguration(durable::objects::ClusterSystemConfiguration),
4009 NetworkPolicy(durable::objects::NetworkPolicy),
4010 IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
4011 ClusterReplica(durable::objects::ClusterReplica),
4012 ReplicaSystemConfiguration(durable::objects::ReplicaSystemConfiguration),
4013 SourceReferences(durable::objects::SourceReferences),
4014 SystemObjectMapping(durable::objects::SystemObjectMapping),
4015 Item(durable::objects::Item),
4016 Comment(durable::objects::Comment),
4017 AuditLog(durable::objects::AuditLog),
4018 StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
4020 UnfinalizedShard(durable::objects::UnfinalizedShard),
4021}
4022
4023impl From<BootstrapStateUpdateKind> for StateUpdateKind {
4024 fn from(value: BootstrapStateUpdateKind) -> Self {
4025 match value {
4026 BootstrapStateUpdateKind::Role(kind) => StateUpdateKind::Role(kind),
4027 BootstrapStateUpdateKind::RoleAuth(kind) => StateUpdateKind::RoleAuth(kind),
4028 BootstrapStateUpdateKind::Database(kind) => StateUpdateKind::Database(kind),
4029 BootstrapStateUpdateKind::Schema(kind) => StateUpdateKind::Schema(kind),
4030 BootstrapStateUpdateKind::DefaultPrivilege(kind) => {
4031 StateUpdateKind::DefaultPrivilege(kind)
4032 }
4033 BootstrapStateUpdateKind::SystemPrivilege(kind) => {
4034 StateUpdateKind::SystemPrivilege(kind)
4035 }
4036 BootstrapStateUpdateKind::SystemConfiguration(kind) => {
4037 StateUpdateKind::SystemConfiguration(kind)
4038 }
4039 BootstrapStateUpdateKind::ClusterSystemConfiguration(kind) => {
4040 StateUpdateKind::ClusterSystemConfiguration(kind)
4041 }
4042 BootstrapStateUpdateKind::ReplicaSystemConfiguration(kind) => {
4043 StateUpdateKind::ReplicaSystemConfiguration(kind)
4044 }
4045 BootstrapStateUpdateKind::SourceReferences(kind) => {
4046 StateUpdateKind::SourceReferences(kind)
4047 }
4048 BootstrapStateUpdateKind::Cluster(kind) => StateUpdateKind::Cluster(kind),
4049 BootstrapStateUpdateKind::NetworkPolicy(kind) => StateUpdateKind::NetworkPolicy(kind),
4050 BootstrapStateUpdateKind::IntrospectionSourceIndex(kind) => {
4051 StateUpdateKind::IntrospectionSourceIndex(kind)
4052 }
4053 BootstrapStateUpdateKind::ClusterReplica(kind) => StateUpdateKind::ClusterReplica(kind),
4054 BootstrapStateUpdateKind::SystemObjectMapping(kind) => {
4055 StateUpdateKind::SystemObjectMapping(kind)
4056 }
4057 BootstrapStateUpdateKind::Item(kind) => StateUpdateKind::Item(kind),
4058 BootstrapStateUpdateKind::Comment(kind) => StateUpdateKind::Comment(kind),
4059 BootstrapStateUpdateKind::AuditLog(kind) => StateUpdateKind::AuditLog(kind),
4060 BootstrapStateUpdateKind::StorageCollectionMetadata(kind) => {
4061 StateUpdateKind::StorageCollectionMetadata(kind)
4062 }
4063 BootstrapStateUpdateKind::UnfinalizedShard(kind) => {
4064 StateUpdateKind::UnfinalizedShard(kind)
4065 }
4066 }
4067 }
4068}
4069
4070impl TryFrom<StateUpdateKind> for BootstrapStateUpdateKind {
4071 type Error = TemporaryItem;
4072
4073 fn try_from(value: StateUpdateKind) -> Result<Self, Self::Error> {
4074 match value {
4075 StateUpdateKind::Role(kind) => Ok(BootstrapStateUpdateKind::Role(kind)),
4076 StateUpdateKind::RoleAuth(kind) => Ok(BootstrapStateUpdateKind::RoleAuth(kind)),
4077 StateUpdateKind::Database(kind) => Ok(BootstrapStateUpdateKind::Database(kind)),
4078 StateUpdateKind::Schema(kind) => Ok(BootstrapStateUpdateKind::Schema(kind)),
4079 StateUpdateKind::DefaultPrivilege(kind) => {
4080 Ok(BootstrapStateUpdateKind::DefaultPrivilege(kind))
4081 }
4082 StateUpdateKind::SystemPrivilege(kind) => {
4083 Ok(BootstrapStateUpdateKind::SystemPrivilege(kind))
4084 }
4085 StateUpdateKind::SystemConfiguration(kind) => {
4086 Ok(BootstrapStateUpdateKind::SystemConfiguration(kind))
4087 }
4088 StateUpdateKind::ClusterSystemConfiguration(kind) => {
4089 Ok(BootstrapStateUpdateKind::ClusterSystemConfiguration(kind))
4090 }
4091 StateUpdateKind::ReplicaSystemConfiguration(kind) => {
4092 Ok(BootstrapStateUpdateKind::ReplicaSystemConfiguration(kind))
4093 }
4094 StateUpdateKind::Cluster(kind) => Ok(BootstrapStateUpdateKind::Cluster(kind)),
4095 StateUpdateKind::NetworkPolicy(kind) => {
4096 Ok(BootstrapStateUpdateKind::NetworkPolicy(kind))
4097 }
4098 StateUpdateKind::IntrospectionSourceIndex(kind) => {
4099 Ok(BootstrapStateUpdateKind::IntrospectionSourceIndex(kind))
4100 }
4101 StateUpdateKind::ClusterReplica(kind) => {
4102 Ok(BootstrapStateUpdateKind::ClusterReplica(kind))
4103 }
4104 StateUpdateKind::SourceReferences(kind) => {
4105 Ok(BootstrapStateUpdateKind::SourceReferences(kind))
4106 }
4107 StateUpdateKind::SystemObjectMapping(kind) => {
4108 Ok(BootstrapStateUpdateKind::SystemObjectMapping(kind))
4109 }
4110 StateUpdateKind::TemporaryItem(kind) => Err(kind),
4111 StateUpdateKind::Item(kind) => Ok(BootstrapStateUpdateKind::Item(kind)),
4112 StateUpdateKind::Comment(kind) => Ok(BootstrapStateUpdateKind::Comment(kind)),
4113 StateUpdateKind::AuditLog(kind) => Ok(BootstrapStateUpdateKind::AuditLog(kind)),
4114 StateUpdateKind::StorageCollectionMetadata(kind) => {
4115 Ok(BootstrapStateUpdateKind::StorageCollectionMetadata(kind))
4116 }
4117 StateUpdateKind::UnfinalizedShard(kind) => {
4118 Ok(BootstrapStateUpdateKind::UnfinalizedShard(kind))
4119 }
4120 }
4121 }
4122}