1use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use chrono::{DateTime, Utc};
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_compute_client::logging::LogVariant;
24use mz_compute_types::dataflows::DataflowDescription;
25use mz_compute_types::plan::Plan as ComputePlan;
26use mz_controller::clusters::{ClusterRole, ClusterStatus, ReplicaConfig, ReplicaLogging};
27use mz_controller_types::{ClusterId, ReplicaId};
28use mz_expr::{MirScalarExpr, OptimizedMirRelationExpr};
29use mz_ore::collections::CollectionExt;
30use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
31use mz_repr::network_policy_id::NetworkPolicyId;
32use mz_repr::optimize::OptimizerFeatureOverrides;
33use mz_repr::refresh_schedule::RefreshSchedule;
34use mz_repr::role_id::RoleId;
35use mz_repr::{
36 CatalogItemId, ColumnName, Diff, GlobalId, RelationDesc, RelationVersion,
37 RelationVersionSelector, SqlColumnType, Timestamp, VersionedRelationDesc,
38};
39use mz_sql::ast::display::AstDisplay;
40use mz_sql::ast::{
41 ColumnDef, ColumnOption, ColumnOptionDef, ColumnVersioned, Expr, Raw, RawDataType, Statement,
42 UnresolvedItemName, Value, WithOptionValue,
43};
44use mz_sql::catalog::{
45 CatalogClusterReplica, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem,
46 CatalogItemType as SqlCatalogItemType, CatalogItemType, CatalogSchema, CatalogType,
47 CatalogTypeDetails, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference,
48 RoleAttributes, RoleMembership, RoleVars, SystemObjectType,
49};
50use mz_sql::names::{
51 Aug, CommentObjectId, DatabaseId, DependencyIds, FullItemName, QualifiedItemName,
52 QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier,
53};
54use mz_sql::plan::{
55 ClusterSchedule, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, ConnectionDetails,
56 CreateClusterManagedPlan, CreateClusterPlan, CreateClusterVariant, CreateSourcePlan,
57 HirRelationExpr, NetworkPolicyRule, PlanError, WebhookBodyFormat, WebhookHeaders,
58 WebhookValidation,
59};
60use mz_sql::rbac;
61use mz_sql::session::vars::OwnedVarInput;
62use mz_storage_client::controller::IntrospectionType;
63use mz_storage_types::connections::inline::ReferencedConnection;
64use mz_storage_types::sinks::{SinkEnvelope, StorageSinkConnection};
65use mz_storage_types::sources::load_generator::LoadGenerator;
66use mz_storage_types::sources::{
67 GenericSourceConnection, SourceConnection, SourceDesc, SourceEnvelope, SourceExportDataConfig,
68 SourceExportDetails, Timeline,
69};
70use mz_transform::dataflow::DataflowMetainfo;
71use mz_transform::notice::OptimizerNotice;
72use serde::ser::SerializeSeq;
73use serde::{Deserialize, Serialize};
74use timely::progress::Antichain;
75use tracing::debug;
76
77use crate::builtin::{MZ_CATALOG_SERVER_CLUSTER, MZ_SYSTEM_CLUSTER};
78use crate::durable;
79use crate::durable::objects::item_type;
80
81pub trait UpdateFrom<T>: From<T> {
83 fn update_from(&mut self, from: T);
84}
85
86#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
87pub struct Database {
88 pub name: String,
89 pub id: DatabaseId,
90 pub oid: u32,
91 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
92 pub schemas_by_id: BTreeMap<SchemaId, Schema>,
93 pub schemas_by_name: BTreeMap<String, SchemaId>,
94 pub owner_id: RoleId,
95 pub privileges: PrivilegeMap,
96}
97
98impl From<Database> for durable::Database {
99 fn from(database: Database) -> durable::Database {
100 durable::Database {
101 id: database.id,
102 oid: database.oid,
103 name: database.name,
104 owner_id: database.owner_id,
105 privileges: database.privileges.into_all_values().collect(),
106 }
107 }
108}
109
110impl From<durable::Database> for Database {
111 fn from(
112 durable::Database {
113 id,
114 oid,
115 name,
116 owner_id,
117 privileges,
118 }: durable::Database,
119 ) -> Database {
120 Database {
121 id,
122 oid,
123 schemas_by_id: BTreeMap::new(),
124 schemas_by_name: BTreeMap::new(),
125 name,
126 owner_id,
127 privileges: PrivilegeMap::from_mz_acl_items(privileges),
128 }
129 }
130}
131
132impl UpdateFrom<durable::Database> for Database {
133 fn update_from(
134 &mut self,
135 durable::Database {
136 id,
137 oid,
138 name,
139 owner_id,
140 privileges,
141 }: durable::Database,
142 ) {
143 self.id = id;
144 self.oid = oid;
145 self.name = name;
146 self.owner_id = owner_id;
147 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
148 }
149}
150
151#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
152pub struct Schema {
153 pub name: QualifiedSchemaName,
154 pub id: SchemaSpecifier,
155 pub oid: u32,
156 pub items: BTreeMap<String, CatalogItemId>,
157 pub functions: BTreeMap<String, CatalogItemId>,
158 pub types: BTreeMap<String, CatalogItemId>,
159 pub owner_id: RoleId,
160 pub privileges: PrivilegeMap,
161}
162
163impl From<Schema> for durable::Schema {
164 fn from(schema: Schema) -> durable::Schema {
165 durable::Schema {
166 id: schema.id.into(),
167 oid: schema.oid,
168 name: schema.name.schema,
169 database_id: schema.name.database.id(),
170 owner_id: schema.owner_id,
171 privileges: schema.privileges.into_all_values().collect(),
172 }
173 }
174}
175
176impl From<durable::Schema> for Schema {
177 fn from(
178 durable::Schema {
179 id,
180 oid,
181 name,
182 database_id,
183 owner_id,
184 privileges,
185 }: durable::Schema,
186 ) -> Schema {
187 Schema {
188 name: QualifiedSchemaName {
189 database: database_id.into(),
190 schema: name,
191 },
192 id: id.into(),
193 oid,
194 items: BTreeMap::new(),
195 functions: BTreeMap::new(),
196 types: BTreeMap::new(),
197 owner_id,
198 privileges: PrivilegeMap::from_mz_acl_items(privileges),
199 }
200 }
201}
202
203impl UpdateFrom<durable::Schema> for Schema {
204 fn update_from(
205 &mut self,
206 durable::Schema {
207 id,
208 oid,
209 name,
210 database_id,
211 owner_id,
212 privileges,
213 }: durable::Schema,
214 ) {
215 self.name = QualifiedSchemaName {
216 database: database_id.into(),
217 schema: name,
218 };
219 self.id = id.into();
220 self.oid = oid;
221 self.owner_id = owner_id;
222 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
223 }
224}
225
226#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
227pub struct Role {
228 pub name: String,
229 pub id: RoleId,
230 pub oid: u32,
231 pub attributes: RoleAttributes,
232 pub membership: RoleMembership,
233 pub vars: RoleVars,
234}
235
236impl Role {
237 pub fn is_user(&self) -> bool {
238 self.id.is_user()
239 }
240
241 pub fn vars<'a>(&'a self) -> impl Iterator<Item = (&'a str, &'a OwnedVarInput)> {
242 self.vars.map.iter().map(|(name, val)| (name.as_str(), val))
243 }
244}
245
246impl From<Role> for durable::Role {
247 fn from(role: Role) -> durable::Role {
248 durable::Role {
249 id: role.id,
250 oid: role.oid,
251 name: role.name,
252 attributes: role.attributes,
253 membership: role.membership,
254 vars: role.vars,
255 }
256 }
257}
258
259impl From<durable::Role> for Role {
260 fn from(
261 durable::Role {
262 id,
263 oid,
264 name,
265 attributes,
266 membership,
267 vars,
268 }: durable::Role,
269 ) -> Self {
270 Role {
271 name,
272 id,
273 oid,
274 attributes,
275 membership,
276 vars,
277 }
278 }
279}
280
281impl UpdateFrom<durable::Role> for Role {
282 fn update_from(
283 &mut self,
284 durable::Role {
285 id,
286 oid,
287 name,
288 attributes,
289 membership,
290 vars,
291 }: durable::Role,
292 ) {
293 self.id = id;
294 self.oid = oid;
295 self.name = name;
296 self.attributes = attributes;
297 self.membership = membership;
298 self.vars = vars;
299 }
300}
301
302#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
303pub struct RoleAuth {
304 pub role_id: RoleId,
305 pub password_hash: Option<String>,
306 pub updated_at: u64,
307}
308
309impl From<RoleAuth> for durable::RoleAuth {
310 fn from(role_auth: RoleAuth) -> durable::RoleAuth {
311 durable::RoleAuth {
312 role_id: role_auth.role_id,
313 password_hash: role_auth.password_hash,
314 updated_at: role_auth.updated_at,
315 }
316 }
317}
318
319impl From<durable::RoleAuth> for RoleAuth {
320 fn from(
321 durable::RoleAuth {
322 role_id,
323 password_hash,
324 updated_at,
325 }: durable::RoleAuth,
326 ) -> RoleAuth {
327 RoleAuth {
328 role_id,
329 password_hash,
330 updated_at,
331 }
332 }
333}
334
335impl UpdateFrom<durable::RoleAuth> for RoleAuth {
336 fn update_from(&mut self, from: durable::RoleAuth) {
337 self.role_id = from.role_id;
338 self.password_hash = from.password_hash;
339 }
340}
341
342#[derive(Debug, Serialize, Clone, PartialEq)]
343pub struct Cluster {
344 pub name: String,
345 pub id: ClusterId,
346 pub config: ClusterConfig,
347 #[serde(skip)]
348 pub log_indexes: BTreeMap<LogVariant, GlobalId>,
349 pub bound_objects: BTreeSet<CatalogItemId>,
352 pub replica_id_by_name_: BTreeMap<String, ReplicaId>,
353 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
354 pub replicas_by_id_: BTreeMap<ReplicaId, ClusterReplica>,
355 pub owner_id: RoleId,
356 pub privileges: PrivilegeMap,
357}
358
359impl Cluster {
360 pub fn role(&self) -> ClusterRole {
362 if self.name == MZ_SYSTEM_CLUSTER.name {
365 ClusterRole::SystemCritical
366 } else if self.name == MZ_CATALOG_SERVER_CLUSTER.name {
367 ClusterRole::System
368 } else {
369 ClusterRole::User
370 }
371 }
372
373 pub fn is_managed(&self) -> bool {
375 matches!(self.config.variant, ClusterVariant::Managed { .. })
376 }
377
378 pub fn user_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
380 self.replicas().filter(|r| !r.config.location.internal())
381 }
382
383 pub fn replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
385 self.replicas_by_id_.values()
386 }
387
388 pub fn replica(&self, replica_id: ReplicaId) -> Option<&ClusterReplica> {
390 self.replicas_by_id_.get(&replica_id)
391 }
392
393 pub fn replica_id(&self, name: &str) -> Option<ReplicaId> {
395 self.replica_id_by_name_.get(name).copied()
396 }
397
398 pub fn availability_zones(&self) -> Option<&[String]> {
400 match &self.config.variant {
401 ClusterVariant::Managed(managed) => Some(&managed.availability_zones),
402 ClusterVariant::Unmanaged => None,
403 }
404 }
405
406 pub fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
407 let name = self.name.clone();
408 let variant = match &self.config.variant {
409 ClusterVariant::Managed(ClusterVariantManaged {
410 size,
411 availability_zones,
412 logging,
413 replication_factor,
414 optimizer_feature_overrides,
415 schedule,
416 }) => {
417 let introspection = match logging {
418 ReplicaLogging {
419 log_logging,
420 interval: Some(interval),
421 } => Some(ComputeReplicaIntrospectionConfig {
422 debugging: *log_logging,
423 interval: interval.clone(),
424 }),
425 ReplicaLogging {
426 log_logging: _,
427 interval: None,
428 } => None,
429 };
430 let compute = ComputeReplicaConfig { introspection };
431 CreateClusterVariant::Managed(CreateClusterManagedPlan {
432 replication_factor: replication_factor.clone(),
433 size: size.clone(),
434 availability_zones: availability_zones.clone(),
435 compute,
436 optimizer_feature_overrides: optimizer_feature_overrides.clone(),
437 schedule: schedule.clone(),
438 })
439 }
440 ClusterVariant::Unmanaged => {
441 return Err(PlanError::Unsupported {
444 feature: "SHOW CREATE for unmanaged clusters".to_string(),
445 discussion_no: None,
446 });
447 }
448 };
449 let workload_class = self.config.workload_class.clone();
450 Ok(CreateClusterPlan {
451 name,
452 variant,
453 workload_class,
454 })
455 }
456}
457
458impl From<Cluster> for durable::Cluster {
459 fn from(cluster: Cluster) -> durable::Cluster {
460 durable::Cluster {
461 id: cluster.id,
462 name: cluster.name,
463 owner_id: cluster.owner_id,
464 privileges: cluster.privileges.into_all_values().collect(),
465 config: cluster.config.into(),
466 }
467 }
468}
469
470impl From<durable::Cluster> for Cluster {
471 fn from(
472 durable::Cluster {
473 id,
474 name,
475 owner_id,
476 privileges,
477 config,
478 }: durable::Cluster,
479 ) -> Self {
480 Cluster {
481 name: name.clone(),
482 id,
483 bound_objects: BTreeSet::new(),
484 log_indexes: BTreeMap::new(),
485 replica_id_by_name_: BTreeMap::new(),
486 replicas_by_id_: BTreeMap::new(),
487 owner_id,
488 privileges: PrivilegeMap::from_mz_acl_items(privileges),
489 config: config.into(),
490 }
491 }
492}
493
494impl UpdateFrom<durable::Cluster> for Cluster {
495 fn update_from(
496 &mut self,
497 durable::Cluster {
498 id,
499 name,
500 owner_id,
501 privileges,
502 config,
503 }: durable::Cluster,
504 ) {
505 self.id = id;
506 self.name = name;
507 self.owner_id = owner_id;
508 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
509 self.config = config.into();
510 }
511}
512
513#[derive(Debug, Serialize, Clone, PartialEq)]
514pub struct ClusterReplica {
515 pub name: String,
516 pub cluster_id: ClusterId,
517 pub replica_id: ReplicaId,
518 pub config: ReplicaConfig,
519 pub owner_id: RoleId,
520}
521
522impl From<ClusterReplica> for durable::ClusterReplica {
523 fn from(replica: ClusterReplica) -> durable::ClusterReplica {
524 durable::ClusterReplica {
525 cluster_id: replica.cluster_id,
526 replica_id: replica.replica_id,
527 name: replica.name,
528 config: replica.config.into(),
529 owner_id: replica.owner_id,
530 }
531 }
532}
533
534#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
535pub struct ClusterReplicaProcessStatus {
536 pub status: ClusterStatus,
537 pub time: DateTime<Utc>,
538}
539
540#[derive(Debug, Serialize, Clone, PartialEq)]
541pub struct SourceReferences {
542 pub updated_at: u64,
543 pub references: Vec<SourceReference>,
544}
545
546#[derive(Debug, Serialize, Clone, PartialEq)]
547pub struct SourceReference {
548 pub name: String,
549 pub namespace: Option<String>,
550 pub columns: Vec<String>,
551}
552
553impl From<SourceReference> for durable::SourceReference {
554 fn from(source_reference: SourceReference) -> durable::SourceReference {
555 durable::SourceReference {
556 name: source_reference.name,
557 namespace: source_reference.namespace,
558 columns: source_reference.columns,
559 }
560 }
561}
562
563impl SourceReferences {
564 pub fn to_durable(self, source_id: CatalogItemId) -> durable::SourceReferences {
565 durable::SourceReferences {
566 source_id,
567 updated_at: self.updated_at,
568 references: self.references.into_iter().map(Into::into).collect(),
569 }
570 }
571}
572
573impl From<durable::SourceReference> for SourceReference {
574 fn from(source_reference: durable::SourceReference) -> SourceReference {
575 SourceReference {
576 name: source_reference.name,
577 namespace: source_reference.namespace,
578 columns: source_reference.columns,
579 }
580 }
581}
582
583impl From<durable::SourceReferences> for SourceReferences {
584 fn from(source_references: durable::SourceReferences) -> SourceReferences {
585 SourceReferences {
586 updated_at: source_references.updated_at,
587 references: source_references
588 .references
589 .into_iter()
590 .map(|source_reference| source_reference.into())
591 .collect(),
592 }
593 }
594}
595
596impl From<mz_sql::plan::SourceReference> for SourceReference {
597 fn from(source_reference: mz_sql::plan::SourceReference) -> SourceReference {
598 SourceReference {
599 name: source_reference.name,
600 namespace: source_reference.namespace,
601 columns: source_reference.columns,
602 }
603 }
604}
605
606impl From<mz_sql::plan::SourceReferences> for SourceReferences {
607 fn from(source_references: mz_sql::plan::SourceReferences) -> SourceReferences {
608 SourceReferences {
609 updated_at: source_references.updated_at,
610 references: source_references
611 .references
612 .into_iter()
613 .map(|source_reference| source_reference.into())
614 .collect(),
615 }
616 }
617}
618
619impl From<SourceReferences> for mz_sql::plan::SourceReferences {
620 fn from(source_references: SourceReferences) -> mz_sql::plan::SourceReferences {
621 mz_sql::plan::SourceReferences {
622 updated_at: source_references.updated_at,
623 references: source_references
624 .references
625 .into_iter()
626 .map(|source_reference| source_reference.into())
627 .collect(),
628 }
629 }
630}
631
632impl From<SourceReference> for mz_sql::plan::SourceReference {
633 fn from(source_reference: SourceReference) -> mz_sql::plan::SourceReference {
634 mz_sql::plan::SourceReference {
635 name: source_reference.name,
636 namespace: source_reference.namespace,
637 columns: source_reference.columns,
638 }
639 }
640}
641
642#[derive(Clone, Debug, Serialize)]
643pub struct CatalogEntry {
644 pub item: CatalogItem,
645 #[serde(skip)]
646 pub referenced_by: Vec<CatalogItemId>,
647 #[serde(skip)]
651 pub used_by: Vec<CatalogItemId>,
652 pub id: CatalogItemId,
653 pub oid: u32,
654 pub name: QualifiedItemName,
655 pub owner_id: RoleId,
656 pub privileges: PrivilegeMap,
657}
658
659#[derive(Clone, Debug)]
677pub struct CatalogCollectionEntry {
678 pub entry: CatalogEntry,
679 pub version: RelationVersionSelector,
680}
681
682impl CatalogCollectionEntry {
683 pub fn relation_desc(&self) -> Option<Cow<'_, RelationDesc>> {
684 self.item().relation_desc(self.version)
685 }
686}
687
688impl mz_sql::catalog::CatalogCollectionItem for CatalogCollectionEntry {
689 fn relation_desc(&self) -> Option<Cow<'_, RelationDesc>> {
690 self.item().relation_desc(self.version)
691 }
692
693 fn global_id(&self) -> GlobalId {
694 self.entry
695 .item()
696 .global_id_for_version(self.version)
697 .expect("catalog corruption, missing version!")
698 }
699}
700
701impl Deref for CatalogCollectionEntry {
702 type Target = CatalogEntry;
703
704 fn deref(&self) -> &CatalogEntry {
705 &self.entry
706 }
707}
708
709impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry {
710 fn name(&self) -> &QualifiedItemName {
711 self.entry.name()
712 }
713
714 fn id(&self) -> CatalogItemId {
715 self.entry.id()
716 }
717
718 fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
719 Box::new(self.entry.global_ids())
720 }
721
722 fn oid(&self) -> u32 {
723 self.entry.oid()
724 }
725
726 fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
727 self.entry.func()
728 }
729
730 fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
731 self.entry.source_desc()
732 }
733
734 fn connection(
735 &self,
736 ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
737 {
738 mz_sql::catalog::CatalogItem::connection(&self.entry)
739 }
740
741 fn create_sql(&self) -> &str {
742 self.entry.create_sql()
743 }
744
745 fn item_type(&self) -> SqlCatalogItemType {
746 self.entry.item_type()
747 }
748
749 fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
750 self.entry.index_details()
751 }
752
753 fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
754 self.entry.writable_table_details()
755 }
756
757 fn replacement_target(&self) -> Option<CatalogItemId> {
758 self.entry.replacement_target()
759 }
760
761 fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
762 self.entry.type_details()
763 }
764
765 fn references(&self) -> &ResolvedIds {
766 self.entry.references()
767 }
768
769 fn uses(&self) -> BTreeSet<CatalogItemId> {
770 self.entry.uses()
771 }
772
773 fn referenced_by(&self) -> &[CatalogItemId] {
774 self.entry.referenced_by()
775 }
776
777 fn used_by(&self) -> &[CatalogItemId] {
778 self.entry.used_by()
779 }
780
781 fn subsource_details(
782 &self,
783 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
784 self.entry.subsource_details()
785 }
786
787 fn source_export_details(
788 &self,
789 ) -> Option<(
790 CatalogItemId,
791 &UnresolvedItemName,
792 &SourceExportDetails,
793 &SourceExportDataConfig<ReferencedConnection>,
794 )> {
795 self.entry.source_export_details()
796 }
797
798 fn is_progress_source(&self) -> bool {
799 self.entry.is_progress_source()
800 }
801
802 fn progress_id(&self) -> Option<CatalogItemId> {
803 self.entry.progress_id()
804 }
805
806 fn owner_id(&self) -> RoleId {
807 *self.entry.owner_id()
808 }
809
810 fn privileges(&self) -> &PrivilegeMap {
811 self.entry.privileges()
812 }
813
814 fn cluster_id(&self) -> Option<ClusterId> {
815 self.entry.item().cluster_id()
816 }
817
818 fn at_version(
819 &self,
820 version: RelationVersionSelector,
821 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
822 Box::new(CatalogCollectionEntry {
823 entry: self.entry.clone(),
824 version,
825 })
826 }
827
828 fn latest_version(&self) -> Option<RelationVersion> {
829 self.entry.latest_version()
830 }
831}
832
833#[derive(Debug, Clone, Serialize)]
834pub enum CatalogItem {
835 Table(Table),
836 Source(Source),
837 Log(Log),
838 View(View),
839 MaterializedView(MaterializedView),
840 Sink(Sink),
841 Index(Index),
842 Type(Type),
843 Func(Func),
844 Secret(Secret),
845 Connection(Connection),
846 ContinualTask(ContinualTask),
847}
848
849impl From<CatalogEntry> for durable::Item {
850 fn from(entry: CatalogEntry) -> durable::Item {
851 let (create_sql, global_id, extra_versions) = entry.item.into_serialized();
852 durable::Item {
853 id: entry.id,
854 oid: entry.oid,
855 global_id,
856 schema_id: entry.name.qualifiers.schema_spec.into(),
857 name: entry.name.item,
858 create_sql,
859 owner_id: entry.owner_id,
860 privileges: entry.privileges.into_all_values().collect(),
861 extra_versions,
862 }
863 }
864}
865
866#[derive(Debug, Clone, Serialize)]
867pub struct Table {
868 pub create_sql: Option<String>,
870 pub desc: VersionedRelationDesc,
872 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
874 pub collections: BTreeMap<RelationVersion, GlobalId>,
875 #[serde(skip)]
877 pub conn_id: Option<ConnectionId>,
878 pub resolved_ids: ResolvedIds,
880 pub custom_logical_compaction_window: Option<CompactionWindow>,
882 pub is_retained_metrics_object: bool,
887 pub data_source: TableDataSource,
889}
890
891impl Table {
892 pub fn timeline(&self) -> Timeline {
893 match &self.data_source {
894 TableDataSource::TableWrites { .. } => Timeline::EpochMilliseconds,
897 TableDataSource::DataSource { timeline, .. } => timeline.clone(),
898 }
899 }
900
901 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
903 self.collections.values().copied()
904 }
905
906 pub fn global_id_writes(&self) -> GlobalId {
908 *self
909 .collections
910 .last_key_value()
911 .expect("at least one version of a table")
912 .1
913 }
914
915 pub fn collection_descs(
917 &self,
918 ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
919 self.collections.iter().map(|(version, gid)| {
920 let desc = self
921 .desc
922 .at_version(RelationVersionSelector::Specific(*version));
923 (*gid, *version, desc)
924 })
925 }
926
927 pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
929 let (version, _gid) = self
930 .collections
931 .iter()
932 .find(|(_version, gid)| *gid == id)
933 .expect("GlobalId to exist");
934 self.desc
935 .at_version(RelationVersionSelector::Specific(*version))
936 }
937}
938
939#[derive(Clone, Debug, Serialize)]
940pub enum TableDataSource {
941 TableWrites {
943 #[serde(skip)]
944 defaults: Vec<Expr<Aug>>,
945 },
946
947 DataSource {
950 desc: DataSourceDesc,
951 timeline: Timeline,
952 },
953}
954
955#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
956pub enum DataSourceDesc {
957 Ingestion {
959 desc: SourceDesc<ReferencedConnection>,
960 cluster_id: ClusterId,
961 },
962 OldSyntaxIngestion {
964 desc: SourceDesc<ReferencedConnection>,
965 cluster_id: ClusterId,
966 progress_subsource: CatalogItemId,
969 data_config: SourceExportDataConfig<ReferencedConnection>,
970 details: SourceExportDetails,
971 },
972 IngestionExport {
980 ingestion_id: CatalogItemId,
981 external_reference: UnresolvedItemName,
982 details: SourceExportDetails,
983 data_config: SourceExportDataConfig<ReferencedConnection>,
984 },
985 Introspection(IntrospectionType),
987 Progress,
989 Webhook {
991 validate_using: Option<WebhookValidation>,
993 body_format: WebhookBodyFormat,
995 headers: WebhookHeaders,
997 cluster_id: ClusterId,
999 },
1000 Catalog,
1002}
1003
1004impl From<IntrospectionType> for DataSourceDesc {
1005 fn from(typ: IntrospectionType) -> Self {
1006 Self::Introspection(typ)
1007 }
1008}
1009
1010impl DataSourceDesc {
1011 pub fn formats(&self) -> (Option<&str>, Option<&str>) {
1013 match &self {
1014 DataSourceDesc::Ingestion { .. } => (None, None),
1015 DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1016 match &data_config.encoding.as_ref() {
1017 Some(encoding) => match &encoding.key {
1018 Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1019 None => (None, Some(encoding.value.type_())),
1020 },
1021 None => (None, None),
1022 }
1023 }
1024 DataSourceDesc::IngestionExport { data_config, .. } => match &data_config.encoding {
1025 Some(encoding) => match &encoding.key {
1026 Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1027 None => (None, Some(encoding.value.type_())),
1028 },
1029 None => (None, None),
1030 },
1031 DataSourceDesc::Introspection(_)
1032 | DataSourceDesc::Webhook { .. }
1033 | DataSourceDesc::Progress
1034 | DataSourceDesc::Catalog => (None, None),
1035 }
1036 }
1037
1038 pub fn envelope(&self) -> Option<&str> {
1040 fn envelope_string(envelope: &SourceEnvelope) -> &str {
1045 match envelope {
1046 SourceEnvelope::None(_) => "none",
1047 SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
1048 mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert",
1049 mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
1050 "debezium"
1054 }
1055 mz_storage_types::sources::envelope::UpsertStyle::ValueErrInline { .. } => {
1056 "upsert-value-err-inline"
1057 }
1058 },
1059 SourceEnvelope::CdcV2 => {
1060 "materialize"
1063 }
1064 }
1065 }
1066
1067 match self {
1068 DataSourceDesc::Ingestion { .. } => None,
1073 DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1074 Some(envelope_string(&data_config.envelope))
1075 }
1076 DataSourceDesc::IngestionExport { data_config, .. } => {
1077 Some(envelope_string(&data_config.envelope))
1078 }
1079 DataSourceDesc::Introspection(_)
1080 | DataSourceDesc::Webhook { .. }
1081 | DataSourceDesc::Progress
1082 | DataSourceDesc::Catalog => None,
1083 }
1084 }
1085}
1086
1087#[derive(Debug, Clone, Serialize)]
1088pub struct Source {
1089 pub create_sql: Option<String>,
1091 pub global_id: GlobalId,
1093 #[serde(skip)]
1095 pub data_source: DataSourceDesc,
1096 pub desc: RelationDesc,
1098 pub timeline: Timeline,
1100 pub resolved_ids: ResolvedIds,
1102 pub custom_logical_compaction_window: Option<CompactionWindow>,
1106 pub is_retained_metrics_object: bool,
1109}
1110
1111impl Source {
1112 pub fn new(
1119 plan: CreateSourcePlan,
1120 global_id: GlobalId,
1121 resolved_ids: ResolvedIds,
1122 custom_logical_compaction_window: Option<CompactionWindow>,
1123 is_retained_metrics_object: bool,
1124 ) -> Source {
1125 Source {
1126 create_sql: Some(plan.source.create_sql),
1127 data_source: match plan.source.data_source {
1128 mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1129 desc,
1130 cluster_id: plan
1131 .in_cluster
1132 .expect("ingestion-based sources must be given a cluster ID"),
1133 },
1134 mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1135 desc,
1136 progress_subsource,
1137 data_config,
1138 details,
1139 } => DataSourceDesc::OldSyntaxIngestion {
1140 desc,
1141 cluster_id: plan
1142 .in_cluster
1143 .expect("ingestion-based sources must be given a cluster ID"),
1144 progress_subsource,
1145 data_config,
1146 details,
1147 },
1148 mz_sql::plan::DataSourceDesc::Progress => {
1149 assert!(
1150 plan.in_cluster.is_none(),
1151 "subsources must not have a host config or cluster_id defined"
1152 );
1153 DataSourceDesc::Progress
1154 }
1155 mz_sql::plan::DataSourceDesc::IngestionExport {
1156 ingestion_id,
1157 external_reference,
1158 details,
1159 data_config,
1160 } => {
1161 assert!(
1162 plan.in_cluster.is_none(),
1163 "subsources must not have a host config or cluster_id defined"
1164 );
1165 DataSourceDesc::IngestionExport {
1166 ingestion_id,
1167 external_reference,
1168 details,
1169 data_config,
1170 }
1171 }
1172 mz_sql::plan::DataSourceDesc::Webhook {
1173 validate_using,
1174 body_format,
1175 headers,
1176 cluster_id,
1177 } => {
1178 mz_ore::soft_assert_or_log!(
1179 cluster_id.is_none(),
1180 "cluster_id set at Source level for Webhooks"
1181 );
1182 DataSourceDesc::Webhook {
1183 validate_using,
1184 body_format,
1185 headers,
1186 cluster_id: plan
1187 .in_cluster
1188 .expect("webhook sources must be given a cluster ID"),
1189 }
1190 }
1191 },
1192 desc: plan.source.desc,
1193 global_id,
1194 timeline: plan.timeline,
1195 resolved_ids,
1196 custom_logical_compaction_window: plan
1197 .source
1198 .compaction_window
1199 .or(custom_logical_compaction_window),
1200 is_retained_metrics_object,
1201 }
1202 }
1203
1204 pub fn source_type(&self) -> &str {
1206 match &self.data_source {
1207 DataSourceDesc::Ingestion { desc, .. }
1208 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.name(),
1209 DataSourceDesc::Progress => "progress",
1210 DataSourceDesc::IngestionExport { .. } => "subsource",
1211 DataSourceDesc::Introspection(_) | DataSourceDesc::Catalog => "source",
1212 DataSourceDesc::Webhook { .. } => "webhook",
1213 }
1214 }
1215
1216 pub fn connection_id(&self) -> Option<CatalogItemId> {
1218 match &self.data_source {
1219 DataSourceDesc::Ingestion { desc, .. }
1220 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.connection_id(),
1221 DataSourceDesc::IngestionExport { .. }
1222 | DataSourceDesc::Introspection(_)
1223 | DataSourceDesc::Webhook { .. }
1224 | DataSourceDesc::Progress
1225 | DataSourceDesc::Catalog => None,
1226 }
1227 }
1228
1229 pub fn global_id(&self) -> GlobalId {
1231 self.global_id
1232 }
1233
1234 pub fn user_controllable_persist_shard_count(&self) -> i64 {
1242 match &self.data_source {
1243 DataSourceDesc::Ingestion { .. } => 0,
1244 DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1245 match &desc.connection {
1246 GenericSourceConnection::Postgres(_)
1250 | GenericSourceConnection::MySql(_)
1251 | GenericSourceConnection::SqlServer(_) => 0,
1252 GenericSourceConnection::LoadGenerator(lg) => match lg.load_generator {
1253 LoadGenerator::Clock
1255 | LoadGenerator::Counter { .. }
1256 | LoadGenerator::Datums
1257 | LoadGenerator::KeyValue(_) => 1,
1258 LoadGenerator::Auction
1259 | LoadGenerator::Marketing
1260 | LoadGenerator::Tpch { .. } => 0,
1261 },
1262 GenericSourceConnection::Kafka(_) => 1,
1263 }
1264 }
1265 DataSourceDesc::IngestionExport { .. } => 1,
1268 DataSourceDesc::Webhook { .. } => 1,
1269 DataSourceDesc::Introspection(_)
1272 | DataSourceDesc::Progress
1273 | DataSourceDesc::Catalog => 0,
1274 }
1275 }
1276}
1277
1278#[derive(Debug, Clone, Serialize)]
1279pub struct Log {
1280 pub variant: LogVariant,
1282 pub global_id: GlobalId,
1284}
1285
1286impl Log {
1287 pub fn global_id(&self) -> GlobalId {
1289 self.global_id
1290 }
1291}
1292
1293#[derive(Debug, Clone, Serialize)]
1294pub struct Sink {
1295 pub create_sql: String,
1297 pub global_id: GlobalId,
1299 pub from: GlobalId,
1301 pub connection: StorageSinkConnection<ReferencedConnection>,
1303 pub envelope: SinkEnvelope,
1307 pub with_snapshot: bool,
1309 pub version: u64,
1311 pub resolved_ids: ResolvedIds,
1313 pub cluster_id: ClusterId,
1315 pub commit_interval: Option<Duration>,
1317}
1318
1319impl Sink {
1320 pub fn sink_type(&self) -> &str {
1321 self.connection.name()
1322 }
1323
1324 pub fn envelope(&self) -> Option<&str> {
1326 match &self.envelope {
1327 SinkEnvelope::Debezium => Some("debezium"),
1328 SinkEnvelope::Upsert => Some("upsert"),
1329 SinkEnvelope::Append => Some("append"),
1330 }
1331 }
1332
1333 pub fn combined_format(&self) -> Option<Cow<'_, str>> {
1338 match &self.connection {
1339 StorageSinkConnection::Kafka(connection) => Some(connection.format.get_format_name()),
1340 StorageSinkConnection::Iceberg(_) => None,
1341 }
1342 }
1343
1344 pub fn formats(&self) -> Option<(Option<&str>, &str)> {
1346 match &self.connection {
1347 StorageSinkConnection::Kafka(connection) => {
1348 let key_format = connection
1349 .format
1350 .key_format
1351 .as_ref()
1352 .map(|f| f.get_format_name());
1353 let value_format = connection.format.value_format.get_format_name();
1354 Some((key_format, value_format))
1355 }
1356 StorageSinkConnection::Iceberg(_) => None,
1357 }
1358 }
1359
1360 pub fn connection_id(&self) -> Option<CatalogItemId> {
1361 self.connection.connection_id()
1362 }
1363
1364 pub fn global_id(&self) -> GlobalId {
1366 self.global_id
1367 }
1368}
1369
1370#[derive(Debug, Clone, Serialize)]
1371pub struct View {
1372 pub create_sql: String,
1374 pub global_id: GlobalId,
1376 pub raw_expr: Arc<HirRelationExpr>,
1378 pub locally_optimized_expr: Arc<OptimizedMirRelationExpr>,
1380 pub desc: RelationDesc,
1382 pub conn_id: Option<ConnectionId>,
1384 pub resolved_ids: ResolvedIds,
1386 pub dependencies: DependencyIds,
1388}
1389
1390impl View {
1391 pub fn global_id(&self) -> GlobalId {
1393 self.global_id
1394 }
1395}
1396
1397#[derive(Debug, Clone, Serialize)]
1398pub struct MaterializedView {
1399 pub create_sql: String,
1401 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
1403 pub collections: BTreeMap<RelationVersion, GlobalId>,
1404 pub raw_expr: Arc<HirRelationExpr>,
1406 pub locally_optimized_expr: Arc<OptimizedMirRelationExpr>,
1408 pub desc: VersionedRelationDesc,
1410 pub resolved_ids: ResolvedIds,
1412 pub dependencies: DependencyIds,
1414 pub replacement_target: Option<CatalogItemId>,
1416 pub cluster_id: ClusterId,
1418 pub target_replica: Option<ReplicaId>,
1420 pub non_null_assertions: Vec<usize>,
1424 pub custom_logical_compaction_window: Option<CompactionWindow>,
1426 pub refresh_schedule: Option<RefreshSchedule>,
1428 pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1433 #[serde(skip)]
1439 pub optimized_plan: Option<Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
1440 #[serde(skip)]
1442 pub physical_plan: Option<Arc<DataflowDescription<ComputePlan>>>,
1443 #[serde(skip)]
1445 pub dataflow_metainfo: Option<DataflowMetainfo<Arc<OptimizerNotice>>>,
1446}
1447
1448impl MaterializedView {
1449 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1451 self.collections.values().copied()
1452 }
1453
1454 pub fn global_id_writes(&self) -> GlobalId {
1457 *self
1458 .collections
1459 .last_key_value()
1460 .expect("at least one version of a materialized view")
1461 .1
1462 }
1463
1464 pub fn collection_descs(
1466 &self,
1467 ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
1468 self.collections.iter().map(|(version, gid)| {
1469 let desc = self
1470 .desc
1471 .at_version(RelationVersionSelector::Specific(*version));
1472 (*gid, *version, desc)
1473 })
1474 }
1475
1476 pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
1478 let (version, _gid) = self
1479 .collections
1480 .iter()
1481 .find(|(_version, gid)| *gid == id)
1482 .expect("GlobalId to exist");
1483 self.desc
1484 .at_version(RelationVersionSelector::Specific(*version))
1485 }
1486
1487 pub fn apply_replacement(&mut self, replacement: Self) {
1489 let target_id = replacement
1490 .replacement_target
1491 .expect("replacement has target");
1492
1493 fn parse(create_sql: &str) -> mz_sql::ast::CreateMaterializedViewStatement<Raw> {
1494 let res = mz_sql::parse::parse(create_sql).unwrap_or_else(|e| {
1495 panic!("invalid create_sql persisted in catalog: {e}\n{create_sql}");
1496 });
1497 if let Statement::CreateMaterializedView(cmvs) = res.into_element().ast {
1498 cmvs
1499 } else {
1500 panic!("invalid MV create_sql persisted in catalog\n{create_sql}");
1501 }
1502 }
1503
1504 let old_stmt = parse(&self.create_sql);
1505 let rpl_stmt = parse(&replacement.create_sql);
1506 let new_stmt = mz_sql::ast::CreateMaterializedViewStatement {
1507 if_exists: old_stmt.if_exists,
1508 name: old_stmt.name,
1509 columns: rpl_stmt.columns,
1510 replacement_for: None,
1511 in_cluster: rpl_stmt.in_cluster,
1512 in_cluster_replica: rpl_stmt.in_cluster_replica,
1513 query: rpl_stmt.query,
1514 as_of: rpl_stmt.as_of,
1515 with_options: rpl_stmt.with_options,
1516 };
1517 let create_sql = new_stmt.to_ast_string_stable();
1518
1519 let mut collections = std::mem::take(&mut self.collections);
1520 let latest_version = collections.keys().max().expect("at least one version");
1524 let new_version = latest_version.bump();
1525 collections.insert(new_version, replacement.global_id_writes());
1526
1527 let mut resolved_ids = replacement.resolved_ids;
1528 resolved_ids.remove_item(&target_id);
1529 let mut dependencies = replacement.dependencies;
1530 dependencies.0.remove(&target_id);
1531
1532 *self = Self {
1533 create_sql,
1534 collections,
1535 raw_expr: replacement.raw_expr,
1536 locally_optimized_expr: replacement.locally_optimized_expr,
1537 desc: replacement.desc,
1538 resolved_ids,
1539 dependencies,
1540 replacement_target: None,
1541 cluster_id: replacement.cluster_id,
1542 target_replica: replacement.target_replica,
1543 non_null_assertions: replacement.non_null_assertions,
1544 custom_logical_compaction_window: replacement.custom_logical_compaction_window,
1545 refresh_schedule: replacement.refresh_schedule,
1546 initial_as_of: replacement.initial_as_of,
1547 optimized_plan: replacement.optimized_plan,
1548 physical_plan: replacement.physical_plan,
1549 dataflow_metainfo: replacement.dataflow_metainfo,
1550 };
1551 }
1552}
1553
1554#[derive(Debug, Clone, Serialize)]
1555pub struct Index {
1556 pub create_sql: String,
1558 pub global_id: GlobalId,
1560 pub on: GlobalId,
1562 pub keys: Arc<[MirScalarExpr]>,
1564 pub conn_id: Option<ConnectionId>,
1566 pub resolved_ids: ResolvedIds,
1568 pub cluster_id: ClusterId,
1570 pub custom_logical_compaction_window: Option<CompactionWindow>,
1572 pub is_retained_metrics_object: bool,
1577 #[serde(skip)]
1583 pub optimized_plan: Option<Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
1584 #[serde(skip)]
1586 pub physical_plan: Option<Arc<DataflowDescription<ComputePlan>>>,
1587 #[serde(skip)]
1589 pub dataflow_metainfo: Option<DataflowMetainfo<Arc<OptimizerNotice>>>,
1590}
1591
1592impl Index {
1593 pub fn global_id(&self) -> GlobalId {
1595 self.global_id
1596 }
1597}
1598
1599#[derive(Debug, Clone, Serialize)]
1600pub struct Type {
1601 pub create_sql: Option<String>,
1603 pub global_id: GlobalId,
1605 #[serde(skip)]
1606 pub details: CatalogTypeDetails<IdReference>,
1607 pub resolved_ids: ResolvedIds,
1609}
1610
1611#[derive(Debug, Clone, Serialize)]
1612pub struct Func {
1613 #[serde(skip)]
1615 pub inner: &'static mz_sql::func::Func,
1616 pub global_id: GlobalId,
1618}
1619
1620#[derive(Debug, Clone, Serialize)]
1621pub struct Secret {
1622 pub create_sql: String,
1624 pub global_id: GlobalId,
1626}
1627
1628#[derive(Debug, Clone, Serialize)]
1629pub struct Connection {
1630 pub create_sql: String,
1632 pub global_id: GlobalId,
1634 pub details: ConnectionDetails,
1636 pub resolved_ids: ResolvedIds,
1638}
1639
1640impl Connection {
1641 pub fn global_id(&self) -> GlobalId {
1643 self.global_id
1644 }
1645}
1646
1647#[derive(Debug, Clone, Serialize)]
1648pub struct ContinualTask {
1649 pub create_sql: String,
1651 pub global_id: GlobalId,
1653 pub input_id: GlobalId,
1655 pub with_snapshot: bool,
1656 pub raw_expr: Arc<HirRelationExpr>,
1661 pub desc: RelationDesc,
1663 pub resolved_ids: ResolvedIds,
1665 pub dependencies: DependencyIds,
1667 pub cluster_id: ClusterId,
1669 pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1671 #[serde(skip)]
1677 pub optimized_plan: Option<Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
1678 #[serde(skip)]
1680 pub physical_plan: Option<Arc<DataflowDescription<ComputePlan>>>,
1681 #[serde(skip)]
1683 pub dataflow_metainfo: Option<DataflowMetainfo<Arc<OptimizerNotice>>>,
1684}
1685
1686impl ContinualTask {
1687 pub fn global_id(&self) -> GlobalId {
1689 self.global_id
1690 }
1691}
1692
1693#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
1694pub struct NetworkPolicy {
1695 pub name: String,
1696 pub id: NetworkPolicyId,
1697 pub oid: u32,
1698 pub rules: Vec<NetworkPolicyRule>,
1699 pub owner_id: RoleId,
1700 pub privileges: PrivilegeMap,
1701}
1702
1703impl From<NetworkPolicy> for durable::NetworkPolicy {
1704 fn from(policy: NetworkPolicy) -> durable::NetworkPolicy {
1705 durable::NetworkPolicy {
1706 id: policy.id,
1707 oid: policy.oid,
1708 name: policy.name,
1709 rules: policy.rules,
1710 owner_id: policy.owner_id,
1711 privileges: policy.privileges.into_all_values().collect(),
1712 }
1713 }
1714}
1715
1716impl From<durable::NetworkPolicy> for NetworkPolicy {
1717 fn from(
1718 durable::NetworkPolicy {
1719 id,
1720 oid,
1721 name,
1722 rules,
1723 owner_id,
1724 privileges,
1725 }: durable::NetworkPolicy,
1726 ) -> Self {
1727 NetworkPolicy {
1728 id,
1729 oid,
1730 name,
1731 rules,
1732 owner_id,
1733 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1734 }
1735 }
1736}
1737
1738impl UpdateFrom<durable::NetworkPolicy> for NetworkPolicy {
1739 fn update_from(
1740 &mut self,
1741 durable::NetworkPolicy {
1742 id,
1743 oid,
1744 name,
1745 rules,
1746 owner_id,
1747 privileges,
1748 }: durable::NetworkPolicy,
1749 ) {
1750 self.id = id;
1751 self.oid = oid;
1752 self.name = name;
1753 self.rules = rules;
1754 self.owner_id = owner_id;
1755 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1756 }
1757}
1758
1759impl CatalogItem {
1760 pub fn typ(&self) -> mz_sql::catalog::CatalogItemType {
1762 match self {
1763 CatalogItem::Table(_) => CatalogItemType::Table,
1764 CatalogItem::Source(_) => CatalogItemType::Source,
1765 CatalogItem::Log(_) => CatalogItemType::Source,
1766 CatalogItem::Sink(_) => CatalogItemType::Sink,
1767 CatalogItem::View(_) => CatalogItemType::View,
1768 CatalogItem::MaterializedView(_) => CatalogItemType::MaterializedView,
1769 CatalogItem::Index(_) => CatalogItemType::Index,
1770 CatalogItem::Type(_) => CatalogItemType::Type,
1771 CatalogItem::Func(_) => CatalogItemType::Func,
1772 CatalogItem::Secret(_) => CatalogItemType::Secret,
1773 CatalogItem::Connection(_) => CatalogItemType::Connection,
1774 CatalogItem::ContinualTask(_) => CatalogItemType::ContinualTask,
1775 }
1776 }
1777
1778 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1780 let gid = match self {
1781 CatalogItem::Source(source) => source.global_id,
1782 CatalogItem::Log(log) => log.global_id,
1783 CatalogItem::Sink(sink) => sink.global_id,
1784 CatalogItem::View(view) => view.global_id,
1785 CatalogItem::MaterializedView(mv) => {
1786 return itertools::Either::Left(mv.collections.values().copied());
1787 }
1788 CatalogItem::ContinualTask(ct) => ct.global_id,
1789 CatalogItem::Index(index) => index.global_id,
1790 CatalogItem::Func(func) => func.global_id,
1791 CatalogItem::Type(ty) => ty.global_id,
1792 CatalogItem::Secret(secret) => secret.global_id,
1793 CatalogItem::Connection(conn) => conn.global_id,
1794 CatalogItem::Table(table) => {
1795 return itertools::Either::Left(table.collections.values().copied());
1796 }
1797 };
1798 itertools::Either::Right(std::iter::once(gid))
1799 }
1800
1801 pub fn latest_global_id(&self) -> GlobalId {
1805 match self {
1806 CatalogItem::Source(source) => source.global_id,
1807 CatalogItem::Log(log) => log.global_id,
1808 CatalogItem::Sink(sink) => sink.global_id,
1809 CatalogItem::View(view) => view.global_id,
1810 CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
1811 CatalogItem::ContinualTask(ct) => ct.global_id,
1812 CatalogItem::Index(index) => index.global_id,
1813 CatalogItem::Func(func) => func.global_id,
1814 CatalogItem::Type(ty) => ty.global_id,
1815 CatalogItem::Secret(secret) => secret.global_id,
1816 CatalogItem::Connection(conn) => conn.global_id,
1817 CatalogItem::Table(table) => table.global_id_writes(),
1818 }
1819 }
1820
1821 pub fn optimized_plan(&self) -> Option<&Arc<DataflowDescription<OptimizedMirRelationExpr>>> {
1823 match self {
1824 CatalogItem::Index(idx) => idx.optimized_plan.as_ref(),
1825 CatalogItem::MaterializedView(mv) => mv.optimized_plan.as_ref(),
1826 CatalogItem::ContinualTask(ct) => ct.optimized_plan.as_ref(),
1827 _ => None,
1828 }
1829 }
1830
1831 pub fn physical_plan(&self) -> Option<&Arc<DataflowDescription<ComputePlan>>> {
1833 match self {
1834 CatalogItem::Index(idx) => idx.physical_plan.as_ref(),
1835 CatalogItem::MaterializedView(mv) => mv.physical_plan.as_ref(),
1836 CatalogItem::ContinualTask(ct) => ct.physical_plan.as_ref(),
1837 _ => None,
1838 }
1839 }
1840
1841 pub fn dataflow_metainfo(&self) -> Option<&DataflowMetainfo<Arc<OptimizerNotice>>> {
1843 match self {
1844 CatalogItem::Index(idx) => idx.dataflow_metainfo.as_ref(),
1845 CatalogItem::MaterializedView(mv) => mv.dataflow_metainfo.as_ref(),
1846 CatalogItem::ContinualTask(ct) => ct.dataflow_metainfo.as_ref(),
1847 _ => None,
1848 }
1849 }
1850
1851 pub fn plan_fields_mut(
1856 &mut self,
1857 ) -> Option<(
1858 &mut Option<Arc<DataflowDescription<OptimizedMirRelationExpr>>>,
1859 &mut Option<Arc<DataflowDescription<ComputePlan>>>,
1860 &mut Option<DataflowMetainfo<Arc<OptimizerNotice>>>,
1861 )> {
1862 match self {
1863 CatalogItem::Index(idx) => Some((
1864 &mut idx.optimized_plan,
1865 &mut idx.physical_plan,
1866 &mut idx.dataflow_metainfo,
1867 )),
1868 CatalogItem::MaterializedView(mv) => Some((
1869 &mut mv.optimized_plan,
1870 &mut mv.physical_plan,
1871 &mut mv.dataflow_metainfo,
1872 )),
1873 CatalogItem::ContinualTask(ct) => Some((
1874 &mut ct.optimized_plan,
1875 &mut ct.physical_plan,
1876 &mut ct.dataflow_metainfo,
1877 )),
1878 _ => None,
1879 }
1880 }
1881
1882 pub fn is_storage_collection(&self) -> bool {
1884 match self {
1885 CatalogItem::Table(_)
1886 | CatalogItem::Source(_)
1887 | CatalogItem::MaterializedView(_)
1888 | CatalogItem::Sink(_)
1889 | CatalogItem::ContinualTask(_) => true,
1890 CatalogItem::Log(_)
1891 | CatalogItem::View(_)
1892 | CatalogItem::Index(_)
1893 | CatalogItem::Type(_)
1894 | CatalogItem::Func(_)
1895 | CatalogItem::Secret(_)
1896 | CatalogItem::Connection(_) => false,
1897 }
1898 }
1899
1900 pub fn relation_desc(&self, version: RelationVersionSelector) -> Option<Cow<'_, RelationDesc>> {
1909 match &self {
1910 CatalogItem::Source(src) => Some(Cow::Borrowed(&src.desc)),
1911 CatalogItem::Log(log) => Some(Cow::Owned(log.variant.desc())),
1912 CatalogItem::Table(tbl) => Some(Cow::Owned(tbl.desc.at_version(version))),
1913 CatalogItem::View(view) => Some(Cow::Borrowed(&view.desc)),
1914 CatalogItem::MaterializedView(mview) => {
1915 Some(Cow::Owned(mview.desc.at_version(version)))
1916 }
1917 CatalogItem::ContinualTask(ct) => Some(Cow::Borrowed(&ct.desc)),
1918 CatalogItem::Func(_)
1919 | CatalogItem::Index(_)
1920 | CatalogItem::Sink(_)
1921 | CatalogItem::Secret(_)
1922 | CatalogItem::Connection(_)
1923 | CatalogItem::Type(_) => None,
1924 }
1925 }
1926
1927 pub fn func(
1928 &self,
1929 entry: &CatalogEntry,
1930 ) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
1931 match &self {
1932 CatalogItem::Func(func) => Ok(func.inner),
1933 _ => Err(SqlCatalogError::UnexpectedType {
1934 name: entry.name().item.to_string(),
1935 actual_type: entry.item_type(),
1936 expected_type: CatalogItemType::Func,
1937 }),
1938 }
1939 }
1940
1941 pub fn source_desc(
1942 &self,
1943 entry: &CatalogEntry,
1944 ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
1945 match &self {
1946 CatalogItem::Source(source) => match &source.data_source {
1947 DataSourceDesc::Ingestion { desc, .. }
1948 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => Ok(Some(desc)),
1949 DataSourceDesc::IngestionExport { .. }
1950 | DataSourceDesc::Introspection(_)
1951 | DataSourceDesc::Webhook { .. }
1952 | DataSourceDesc::Progress
1953 | DataSourceDesc::Catalog => Ok(None),
1954 },
1955 _ => Err(SqlCatalogError::UnexpectedType {
1956 name: entry.name().item.to_string(),
1957 actual_type: entry.item_type(),
1958 expected_type: CatalogItemType::Source,
1959 }),
1960 }
1961 }
1962
1963 pub fn is_progress_source(&self) -> bool {
1965 matches!(
1966 self,
1967 CatalogItem::Source(Source {
1968 data_source: DataSourceDesc::Progress,
1969 ..
1970 })
1971 )
1972 }
1973
1974 pub fn references(&self) -> &ResolvedIds {
1977 static EMPTY: LazyLock<ResolvedIds> = LazyLock::new(ResolvedIds::empty);
1978 match self {
1979 CatalogItem::Func(_) => &*EMPTY,
1980 CatalogItem::Index(idx) => &idx.resolved_ids,
1981 CatalogItem::Sink(sink) => &sink.resolved_ids,
1982 CatalogItem::Source(source) => &source.resolved_ids,
1983 CatalogItem::Log(_) => &*EMPTY,
1984 CatalogItem::Table(table) => &table.resolved_ids,
1985 CatalogItem::Type(typ) => &typ.resolved_ids,
1986 CatalogItem::View(view) => &view.resolved_ids,
1987 CatalogItem::MaterializedView(mview) => &mview.resolved_ids,
1988 CatalogItem::Secret(_) => &*EMPTY,
1989 CatalogItem::Connection(connection) => &connection.resolved_ids,
1990 CatalogItem::ContinualTask(ct) => &ct.resolved_ids,
1991 }
1992 }
1993
1994 pub fn uses(&self) -> BTreeSet<CatalogItemId> {
2000 let mut uses: BTreeSet<_> = self.references().items().copied().collect();
2001 match self {
2002 CatalogItem::Func(_) => {}
2005 CatalogItem::Index(_) => {}
2006 CatalogItem::Sink(_) => {}
2007 CatalogItem::Source(_) => {}
2008 CatalogItem::Log(_) => {}
2009 CatalogItem::Table(_) => {}
2010 CatalogItem::Type(_) => {}
2011 CatalogItem::View(view) => uses.extend(view.dependencies.0.iter().copied()),
2012 CatalogItem::MaterializedView(mview) => {
2013 uses.extend(mview.dependencies.0.iter().copied())
2014 }
2015 CatalogItem::ContinualTask(ct) => uses.extend(ct.dependencies.0.iter().copied()),
2016 CatalogItem::Secret(_) => {}
2017 CatalogItem::Connection(_) => {}
2018 }
2019 uses
2020 }
2021
2022 pub fn conn_id(&self) -> Option<&ConnectionId> {
2025 match self {
2026 CatalogItem::View(view) => view.conn_id.as_ref(),
2027 CatalogItem::Index(index) => index.conn_id.as_ref(),
2028 CatalogItem::Table(table) => table.conn_id.as_ref(),
2029 CatalogItem::Log(_)
2030 | CatalogItem::Source(_)
2031 | CatalogItem::Sink(_)
2032 | CatalogItem::MaterializedView(_)
2033 | CatalogItem::Secret(_)
2034 | CatalogItem::Type(_)
2035 | CatalogItem::Func(_)
2036 | CatalogItem::Connection(_)
2037 | CatalogItem::ContinualTask(_) => None,
2038 }
2039 }
2040
2041 pub fn set_conn_id(&mut self, conn_id: Option<ConnectionId>) {
2044 match self {
2045 CatalogItem::View(view) => view.conn_id = conn_id,
2046 CatalogItem::Index(index) => index.conn_id = conn_id,
2047 CatalogItem::Table(table) => table.conn_id = conn_id,
2048 CatalogItem::Log(_)
2049 | CatalogItem::Source(_)
2050 | CatalogItem::Sink(_)
2051 | CatalogItem::MaterializedView(_)
2052 | CatalogItem::Secret(_)
2053 | CatalogItem::Type(_)
2054 | CatalogItem::Func(_)
2055 | CatalogItem::Connection(_)
2056 | CatalogItem::ContinualTask(_) => (),
2057 }
2058 }
2059
2060 pub fn is_temporary(&self) -> bool {
2062 self.conn_id().is_some()
2063 }
2064
2065 pub fn rename_schema_refs(
2066 &self,
2067 database_name: &str,
2068 cur_schema_name: &str,
2069 new_schema_name: &str,
2070 ) -> Result<CatalogItem, (String, String)> {
2071 let do_rewrite = |create_sql: String| -> Result<String, (String, String)> {
2072 let mut create_stmt = mz_sql::parse::parse(&create_sql)
2073 .expect("invalid create sql persisted to catalog")
2074 .into_element()
2075 .ast;
2076
2077 mz_sql::ast::transform::create_stmt_rename_schema_refs(
2079 &mut create_stmt,
2080 database_name,
2081 cur_schema_name,
2082 new_schema_name,
2083 )?;
2084
2085 Ok(create_stmt.to_ast_string_stable())
2086 };
2087
2088 match self {
2089 CatalogItem::Table(i) => {
2090 let mut i = i.clone();
2091 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2092 Ok(CatalogItem::Table(i))
2093 }
2094 CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
2095 CatalogItem::Source(i) => {
2096 let mut i = i.clone();
2097 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2098 Ok(CatalogItem::Source(i))
2099 }
2100 CatalogItem::Sink(i) => {
2101 let mut i = i.clone();
2102 i.create_sql = do_rewrite(i.create_sql)?;
2103 Ok(CatalogItem::Sink(i))
2104 }
2105 CatalogItem::View(i) => {
2106 let mut i = i.clone();
2107 i.create_sql = do_rewrite(i.create_sql)?;
2108 Ok(CatalogItem::View(i))
2109 }
2110 CatalogItem::MaterializedView(i) => {
2111 let mut i = i.clone();
2112 i.create_sql = do_rewrite(i.create_sql)?;
2113 Ok(CatalogItem::MaterializedView(i))
2114 }
2115 CatalogItem::Index(i) => {
2116 let mut i = i.clone();
2117 i.create_sql = do_rewrite(i.create_sql)?;
2118 Ok(CatalogItem::Index(i))
2119 }
2120 CatalogItem::Secret(i) => {
2121 let mut i = i.clone();
2122 i.create_sql = do_rewrite(i.create_sql)?;
2123 Ok(CatalogItem::Secret(i))
2124 }
2125 CatalogItem::Connection(i) => {
2126 let mut i = i.clone();
2127 i.create_sql = do_rewrite(i.create_sql)?;
2128 Ok(CatalogItem::Connection(i))
2129 }
2130 CatalogItem::Type(i) => {
2131 let mut i = i.clone();
2132 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2133 Ok(CatalogItem::Type(i))
2134 }
2135 CatalogItem::Func(i) => Ok(CatalogItem::Func(i.clone())),
2136 CatalogItem::ContinualTask(i) => {
2137 let mut i = i.clone();
2138 i.create_sql = do_rewrite(i.create_sql)?;
2139 Ok(CatalogItem::ContinualTask(i))
2140 }
2141 }
2142 }
2143
2144 pub fn rename_item_refs(
2148 &self,
2149 from: FullItemName,
2150 to_item_name: String,
2151 rename_self: bool,
2152 ) -> Result<CatalogItem, String> {
2153 let do_rewrite = |create_sql: String| -> Result<String, String> {
2154 let mut create_stmt = mz_sql::parse::parse(&create_sql)
2155 .expect("invalid create sql persisted to catalog")
2156 .into_element()
2157 .ast;
2158 if rename_self {
2159 mz_sql::ast::transform::create_stmt_rename(&mut create_stmt, to_item_name.clone());
2160 }
2161 mz_sql::ast::transform::create_stmt_rename_refs(&mut create_stmt, from, to_item_name)?;
2163 Ok(create_stmt.to_ast_string_stable())
2164 };
2165
2166 match self {
2167 CatalogItem::Table(i) => {
2168 let mut i = i.clone();
2169 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2170 Ok(CatalogItem::Table(i))
2171 }
2172 CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
2173 CatalogItem::Source(i) => {
2174 let mut i = i.clone();
2175 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2176 Ok(CatalogItem::Source(i))
2177 }
2178 CatalogItem::Sink(i) => {
2179 let mut i = i.clone();
2180 i.create_sql = do_rewrite(i.create_sql)?;
2181 Ok(CatalogItem::Sink(i))
2182 }
2183 CatalogItem::View(i) => {
2184 let mut i = i.clone();
2185 i.create_sql = do_rewrite(i.create_sql)?;
2186 Ok(CatalogItem::View(i))
2187 }
2188 CatalogItem::MaterializedView(i) => {
2189 let mut i = i.clone();
2190 i.create_sql = do_rewrite(i.create_sql)?;
2191 Ok(CatalogItem::MaterializedView(i))
2192 }
2193 CatalogItem::Index(i) => {
2194 let mut i = i.clone();
2195 i.create_sql = do_rewrite(i.create_sql)?;
2196 Ok(CatalogItem::Index(i))
2197 }
2198 CatalogItem::Secret(i) => {
2199 let mut i = i.clone();
2200 i.create_sql = do_rewrite(i.create_sql)?;
2201 Ok(CatalogItem::Secret(i))
2202 }
2203 CatalogItem::Func(_) | CatalogItem::Type(_) => {
2204 unreachable!("{}s cannot be renamed", self.typ())
2205 }
2206 CatalogItem::Connection(i) => {
2207 let mut i = i.clone();
2208 i.create_sql = do_rewrite(i.create_sql)?;
2209 Ok(CatalogItem::Connection(i))
2210 }
2211 CatalogItem::ContinualTask(i) => {
2212 let mut i = i.clone();
2213 i.create_sql = do_rewrite(i.create_sql)?;
2214 Ok(CatalogItem::ContinualTask(i))
2215 }
2216 }
2217 }
2218
2219 pub fn replace_item_refs(&self, old_id: CatalogItemId, new_id: CatalogItemId) -> CatalogItem {
2221 let do_rewrite = |create_sql: String| -> String {
2222 let mut create_stmt = mz_sql::parse::parse(&create_sql)
2223 .expect("invalid create sql persisted to catalog")
2224 .into_element()
2225 .ast;
2226 mz_sql::ast::transform::create_stmt_replace_ids(
2227 &mut create_stmt,
2228 &[(old_id, new_id)].into(),
2229 );
2230 create_stmt.to_ast_string_stable()
2231 };
2232
2233 match self {
2234 CatalogItem::Table(i) => {
2235 let mut i = i.clone();
2236 i.create_sql = i.create_sql.map(do_rewrite);
2237 CatalogItem::Table(i)
2238 }
2239 CatalogItem::Log(i) => CatalogItem::Log(i.clone()),
2240 CatalogItem::Source(i) => {
2241 let mut i = i.clone();
2242 i.create_sql = i.create_sql.map(do_rewrite);
2243 CatalogItem::Source(i)
2244 }
2245 CatalogItem::Sink(i) => {
2246 let mut i = i.clone();
2247 i.create_sql = do_rewrite(i.create_sql);
2248 CatalogItem::Sink(i)
2249 }
2250 CatalogItem::View(i) => {
2251 let mut i = i.clone();
2252 i.create_sql = do_rewrite(i.create_sql);
2253 CatalogItem::View(i)
2254 }
2255 CatalogItem::MaterializedView(i) => {
2256 let mut i = i.clone();
2257 i.create_sql = do_rewrite(i.create_sql);
2258 CatalogItem::MaterializedView(i)
2259 }
2260 CatalogItem::Index(i) => {
2261 let mut i = i.clone();
2262 i.create_sql = do_rewrite(i.create_sql);
2263 CatalogItem::Index(i)
2264 }
2265 CatalogItem::Secret(i) => {
2266 let mut i = i.clone();
2267 i.create_sql = do_rewrite(i.create_sql);
2268 CatalogItem::Secret(i)
2269 }
2270 CatalogItem::Func(_) | CatalogItem::Type(_) => {
2271 unreachable!("references of {}s cannot be replaced", self.typ())
2272 }
2273 CatalogItem::Connection(i) => {
2274 let mut i = i.clone();
2275 i.create_sql = do_rewrite(i.create_sql);
2276 CatalogItem::Connection(i)
2277 }
2278 CatalogItem::ContinualTask(i) => {
2279 let mut i = i.clone();
2280 i.create_sql = do_rewrite(i.create_sql);
2281 CatalogItem::ContinualTask(i)
2282 }
2283 }
2284 }
2285 pub fn update_retain_history(
2288 &mut self,
2289 value: Option<Value>,
2290 window: CompactionWindow,
2291 ) -> Result<Option<WithOptionValue<Raw>>, ()> {
2292 let update = |mut ast: &mut Statement<Raw>| {
2293 macro_rules! update_retain_history {
2295 ( $stmt:ident, $opt:ident, $name:ident ) => {{
2296 let pos = $stmt
2298 .with_options
2299 .iter()
2300 .rposition(|o| o.name == mz_sql_parser::ast::$name::RetainHistory);
2302 if let Some(value) = value {
2303 let next = mz_sql_parser::ast::$opt {
2304 name: mz_sql_parser::ast::$name::RetainHistory,
2305 value: Some(WithOptionValue::RetainHistoryFor(value)),
2306 };
2307 if let Some(idx) = pos {
2308 let previous = $stmt.with_options[idx].clone();
2309 $stmt.with_options[idx] = next;
2310 previous.value
2311 } else {
2312 $stmt.with_options.push(next);
2313 None
2314 }
2315 } else {
2316 if let Some(idx) = pos {
2317 $stmt.with_options.swap_remove(idx).value
2318 } else {
2319 None
2320 }
2321 }
2322 }};
2323 }
2324 let previous = match &mut ast {
2325 Statement::CreateTable(stmt) => {
2326 update_retain_history!(stmt, TableOption, TableOptionName)
2327 }
2328 Statement::CreateIndex(stmt) => {
2329 update_retain_history!(stmt, IndexOption, IndexOptionName)
2330 }
2331 Statement::CreateSource(stmt) => {
2332 update_retain_history!(stmt, CreateSourceOption, CreateSourceOptionName)
2333 }
2334 Statement::CreateMaterializedView(stmt) => {
2335 update_retain_history!(stmt, MaterializedViewOption, MaterializedViewOptionName)
2336 }
2337 _ => {
2338 return Err(());
2339 }
2340 };
2341 Ok(previous)
2342 };
2343
2344 let res = self.update_sql(update)?;
2345 let cw = self
2346 .custom_logical_compaction_window_mut()
2347 .expect("item must have compaction window");
2348 *cw = Some(window);
2349 Ok(res)
2350 }
2351
2352 pub fn update_timestamp_interval(
2354 &mut self,
2355 value: Option<Value>,
2356 interval: Duration,
2357 ) -> Result<(), ()> {
2358 let update = |ast: &mut Statement<Raw>| {
2359 match ast {
2360 Statement::CreateSource(stmt) => {
2361 let pos = stmt.with_options.iter().rposition(|o| {
2362 o.name == mz_sql_parser::ast::CreateSourceOptionName::TimestampInterval
2363 });
2364 if let Some(value) = value {
2365 let next = mz_sql_parser::ast::CreateSourceOption {
2366 name: mz_sql_parser::ast::CreateSourceOptionName::TimestampInterval,
2367 value: Some(WithOptionValue::Value(value)),
2368 };
2369 if let Some(idx) = pos {
2370 stmt.with_options[idx] = next;
2371 } else {
2372 stmt.with_options.push(next);
2373 }
2374 } else {
2375 if let Some(idx) = pos {
2376 stmt.with_options.swap_remove(idx);
2377 }
2378 }
2379 }
2380 _ => return Err(()),
2381 };
2382 Ok(())
2383 };
2384
2385 self.update_sql(update)?;
2386
2387 match self {
2389 CatalogItem::Source(source) => {
2390 match &mut source.data_source {
2391 DataSourceDesc::Ingestion { desc, .. }
2392 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
2393 desc.timestamp_interval = interval;
2394 }
2395 _ => return Err(()),
2396 }
2397 Ok(())
2398 }
2399 _ => Err(()),
2400 }
2401 }
2402
2403 pub fn add_column(
2404 &mut self,
2405 name: ColumnName,
2406 typ: SqlColumnType,
2407 sql: RawDataType,
2408 ) -> Result<RelationVersion, PlanError> {
2409 let CatalogItem::Table(table) = self else {
2410 return Err(PlanError::Unsupported {
2411 feature: "adding columns to a non-Table".to_string(),
2412 discussion_no: None,
2413 });
2414 };
2415 let next_version = table.desc.add_column(name.clone(), typ);
2416
2417 let update = |mut ast: &mut Statement<Raw>| match &mut ast {
2418 Statement::CreateTable(stmt) => {
2419 let version = ColumnOptionDef {
2420 name: None,
2421 option: ColumnOption::Versioned {
2422 action: ColumnVersioned::Added,
2423 version: next_version.into(),
2424 },
2425 };
2426 let column = ColumnDef {
2427 name: name.into(),
2428 data_type: sql,
2429 collation: None,
2430 options: vec![version],
2431 };
2432 stmt.columns.push(column);
2433 Ok(())
2434 }
2435 _ => Err(()),
2436 };
2437
2438 self.update_sql(update)
2439 .map_err(|()| PlanError::Unstructured("expected CREATE TABLE statement".to_string()))?;
2440 Ok(next_version)
2441 }
2442
2443 pub fn update_sql<F, T>(&mut self, f: F) -> Result<T, ()>
2446 where
2447 F: FnOnce(&mut Statement<Raw>) -> Result<T, ()>,
2448 {
2449 let create_sql = match self {
2450 CatalogItem::Table(Table { create_sql, .. })
2451 | CatalogItem::Type(Type { create_sql, .. })
2452 | CatalogItem::Source(Source { create_sql, .. }) => create_sql.as_mut(),
2453 CatalogItem::Sink(Sink { create_sql, .. })
2454 | CatalogItem::View(View { create_sql, .. })
2455 | CatalogItem::MaterializedView(MaterializedView { create_sql, .. })
2456 | CatalogItem::Index(Index { create_sql, .. })
2457 | CatalogItem::Secret(Secret { create_sql, .. })
2458 | CatalogItem::Connection(Connection { create_sql, .. })
2459 | CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => Some(create_sql),
2460 CatalogItem::Func(_) | CatalogItem::Log(_) => None,
2461 };
2462 let Some(create_sql) = create_sql else {
2463 return Err(());
2464 };
2465 let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
2466 .expect("non-system items must be parseable")
2467 .into_element()
2468 .ast;
2469 debug!("rewrite: {}", ast.to_ast_string_redacted());
2470 let t = f(&mut ast)?;
2471 *create_sql = ast.to_ast_string_stable();
2472 debug!("rewrote: {}", ast.to_ast_string_redacted());
2473 Ok(t)
2474 }
2475
2476 pub fn is_compute_object_on_cluster(&self) -> Option<ClusterId> {
2483 match self {
2484 CatalogItem::Index(index) => Some(index.cluster_id),
2485 CatalogItem::Table(_)
2486 | CatalogItem::Source(_)
2487 | CatalogItem::Log(_)
2488 | CatalogItem::View(_)
2489 | CatalogItem::MaterializedView(_)
2490 | CatalogItem::Sink(_)
2491 | CatalogItem::Type(_)
2492 | CatalogItem::Func(_)
2493 | CatalogItem::Secret(_)
2494 | CatalogItem::Connection(_)
2495 | CatalogItem::ContinualTask(_) => None,
2496 }
2497 }
2498
2499 pub fn cluster_id(&self) -> Option<ClusterId> {
2500 match self {
2501 CatalogItem::MaterializedView(mv) => Some(mv.cluster_id),
2502 CatalogItem::Index(index) => Some(index.cluster_id),
2503 CatalogItem::Source(source) => match &source.data_source {
2504 DataSourceDesc::Ingestion { cluster_id, .. }
2505 | DataSourceDesc::OldSyntaxIngestion { cluster_id, .. } => Some(*cluster_id),
2506 DataSourceDesc::IngestionExport { .. } => None,
2510 DataSourceDesc::Webhook { cluster_id, .. } => Some(*cluster_id),
2511 DataSourceDesc::Introspection(_)
2512 | DataSourceDesc::Progress
2513 | DataSourceDesc::Catalog => None,
2514 },
2515 CatalogItem::Sink(sink) => Some(sink.cluster_id),
2516 CatalogItem::ContinualTask(ct) => Some(ct.cluster_id),
2517 CatalogItem::Table(_)
2518 | CatalogItem::Log(_)
2519 | CatalogItem::View(_)
2520 | CatalogItem::Type(_)
2521 | CatalogItem::Func(_)
2522 | CatalogItem::Secret(_)
2523 | CatalogItem::Connection(_) => None,
2524 }
2525 }
2526
2527 pub fn custom_logical_compaction_window(&self) -> Option<CompactionWindow> {
2530 match self {
2531 CatalogItem::Table(table) => table.custom_logical_compaction_window,
2532 CatalogItem::Source(source) => source.custom_logical_compaction_window,
2533 CatalogItem::Index(index) => index.custom_logical_compaction_window,
2534 CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
2535 CatalogItem::Log(_)
2536 | CatalogItem::View(_)
2537 | CatalogItem::Sink(_)
2538 | CatalogItem::Type(_)
2539 | CatalogItem::Func(_)
2540 | CatalogItem::Secret(_)
2541 | CatalogItem::Connection(_)
2542 | CatalogItem::ContinualTask(_) => None,
2543 }
2544 }
2545
2546 pub fn custom_logical_compaction_window_mut(
2550 &mut self,
2551 ) -> Option<&mut Option<CompactionWindow>> {
2552 let cw = match self {
2553 CatalogItem::Table(table) => &mut table.custom_logical_compaction_window,
2554 CatalogItem::Source(source) => &mut source.custom_logical_compaction_window,
2555 CatalogItem::Index(index) => &mut index.custom_logical_compaction_window,
2556 CatalogItem::MaterializedView(mview) => &mut mview.custom_logical_compaction_window,
2557 CatalogItem::Log(_)
2558 | CatalogItem::View(_)
2559 | CatalogItem::Sink(_)
2560 | CatalogItem::Type(_)
2561 | CatalogItem::Func(_)
2562 | CatalogItem::Secret(_)
2563 | CatalogItem::Connection(_)
2564 | CatalogItem::ContinualTask(_) => return None,
2565 };
2566 Some(cw)
2567 }
2568
2569 pub fn initial_logical_compaction_window(&self) -> Option<CompactionWindow> {
2577 let custom_logical_compaction_window = match self {
2578 CatalogItem::Table(_)
2579 | CatalogItem::Source(_)
2580 | CatalogItem::Index(_)
2581 | CatalogItem::MaterializedView(_)
2582 | CatalogItem::ContinualTask(_) => self.custom_logical_compaction_window(),
2583 CatalogItem::Log(_)
2584 | CatalogItem::View(_)
2585 | CatalogItem::Sink(_)
2586 | CatalogItem::Type(_)
2587 | CatalogItem::Func(_)
2588 | CatalogItem::Secret(_)
2589 | CatalogItem::Connection(_) => return None,
2590 };
2591 Some(custom_logical_compaction_window.unwrap_or(CompactionWindow::Default))
2592 }
2593
2594 pub fn is_retained_metrics_object(&self) -> bool {
2598 match self {
2599 CatalogItem::Table(table) => table.is_retained_metrics_object,
2600 CatalogItem::Source(source) => source.is_retained_metrics_object,
2601 CatalogItem::Index(index) => index.is_retained_metrics_object,
2602 CatalogItem::Log(_)
2603 | CatalogItem::View(_)
2604 | CatalogItem::MaterializedView(_)
2605 | CatalogItem::Sink(_)
2606 | CatalogItem::Type(_)
2607 | CatalogItem::Func(_)
2608 | CatalogItem::Secret(_)
2609 | CatalogItem::Connection(_)
2610 | CatalogItem::ContinualTask(_) => false,
2611 }
2612 }
2613
2614 pub fn to_serialized(&self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2615 match self {
2616 CatalogItem::Table(table) => {
2617 let create_sql = table
2618 .create_sql
2619 .clone()
2620 .expect("builtin tables cannot be serialized");
2621 let mut collections = table.collections.clone();
2622 let global_id = collections
2623 .remove(&RelationVersion::root())
2624 .expect("at least one version");
2625 (create_sql, global_id, collections)
2626 }
2627 CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2628 CatalogItem::Source(source) => {
2629 assert!(
2630 !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2631 "cannot serialize introspection/builtin sources",
2632 );
2633 let create_sql = source
2634 .create_sql
2635 .clone()
2636 .expect("builtin sources cannot be serialized");
2637 (create_sql, source.global_id, BTreeMap::new())
2638 }
2639 CatalogItem::View(view) => (view.create_sql.clone(), view.global_id, BTreeMap::new()),
2640 CatalogItem::MaterializedView(mview) => {
2641 let mut collections = mview.collections.clone();
2642 let global_id = collections
2643 .remove(&RelationVersion::root())
2644 .expect("at least one version");
2645 (mview.create_sql.clone(), global_id, collections)
2646 }
2647 CatalogItem::Index(index) => {
2648 (index.create_sql.clone(), index.global_id, BTreeMap::new())
2649 }
2650 CatalogItem::Sink(sink) => (sink.create_sql.clone(), sink.global_id, BTreeMap::new()),
2651 CatalogItem::Type(typ) => {
2652 let create_sql = typ
2653 .create_sql
2654 .clone()
2655 .expect("builtin types cannot be serialized");
2656 (create_sql, typ.global_id, BTreeMap::new())
2657 }
2658 CatalogItem::Secret(secret) => {
2659 (secret.create_sql.clone(), secret.global_id, BTreeMap::new())
2660 }
2661 CatalogItem::Connection(connection) => (
2662 connection.create_sql.clone(),
2663 connection.global_id,
2664 BTreeMap::new(),
2665 ),
2666 CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2667 CatalogItem::ContinualTask(ct) => {
2668 (ct.create_sql.clone(), ct.global_id, BTreeMap::new())
2669 }
2670 }
2671 }
2672
2673 pub fn into_serialized(self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2674 match self {
2675 CatalogItem::Table(mut table) => {
2676 let create_sql = table
2677 .create_sql
2678 .expect("builtin tables cannot be serialized");
2679 let global_id = table
2680 .collections
2681 .remove(&RelationVersion::root())
2682 .expect("at least one version");
2683 (create_sql, global_id, table.collections)
2684 }
2685 CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2686 CatalogItem::Source(source) => {
2687 assert!(
2688 !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2689 "cannot serialize introspection/builtin sources",
2690 );
2691 let create_sql = source
2692 .create_sql
2693 .expect("builtin sources cannot be serialized");
2694 (create_sql, source.global_id, BTreeMap::new())
2695 }
2696 CatalogItem::View(view) => (view.create_sql, view.global_id, BTreeMap::new()),
2697 CatalogItem::MaterializedView(mut mview) => {
2698 let global_id = mview
2699 .collections
2700 .remove(&RelationVersion::root())
2701 .expect("at least one version");
2702 (mview.create_sql, global_id, mview.collections)
2703 }
2704 CatalogItem::Index(index) => (index.create_sql, index.global_id, BTreeMap::new()),
2705 CatalogItem::Sink(sink) => (sink.create_sql, sink.global_id, BTreeMap::new()),
2706 CatalogItem::Type(typ) => {
2707 let create_sql = typ.create_sql.expect("builtin types cannot be serialized");
2708 (create_sql, typ.global_id, BTreeMap::new())
2709 }
2710 CatalogItem::Secret(secret) => (secret.create_sql, secret.global_id, BTreeMap::new()),
2711 CatalogItem::Connection(connection) => {
2712 (connection.create_sql, connection.global_id, BTreeMap::new())
2713 }
2714 CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2715 CatalogItem::ContinualTask(ct) => (ct.create_sql, ct.global_id, BTreeMap::new()),
2716 }
2717 }
2718
2719 pub fn global_id_for_version(&self, version: RelationVersionSelector) -> Option<GlobalId> {
2722 let collections = match self {
2723 CatalogItem::MaterializedView(mv) => &mv.collections,
2724 CatalogItem::Table(table) => &table.collections,
2725 CatalogItem::Source(source) => return Some(source.global_id),
2726 CatalogItem::Log(log) => return Some(log.global_id),
2727 CatalogItem::View(view) => return Some(view.global_id),
2728 CatalogItem::Sink(sink) => return Some(sink.global_id),
2729 CatalogItem::Index(index) => return Some(index.global_id),
2730 CatalogItem::Type(ty) => return Some(ty.global_id),
2731 CatalogItem::Func(func) => return Some(func.global_id),
2732 CatalogItem::Secret(secret) => return Some(secret.global_id),
2733 CatalogItem::Connection(conn) => return Some(conn.global_id),
2734 CatalogItem::ContinualTask(ct) => return Some(ct.global_id),
2735 };
2736 match version {
2737 RelationVersionSelector::Latest => collections.values().last().copied(),
2738 RelationVersionSelector::Specific(version) => collections.get(&version).copied(),
2739 }
2740 }
2741}
2742
2743impl CatalogEntry {
2744 pub fn relation_desc_latest(&self) -> Option<Cow<'_, RelationDesc>> {
2747 self.item.relation_desc(RelationVersionSelector::Latest)
2748 }
2749
2750 pub fn has_columns(&self) -> bool {
2752 match self.item() {
2753 CatalogItem::Type(Type { details, .. }) => {
2754 matches!(details.typ, CatalogType::Record { .. })
2755 }
2756 _ => self.relation_desc_latest().is_some(),
2757 }
2758 }
2759
2760 pub fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
2762 self.item.func(self)
2763 }
2764
2765 pub fn index(&self) -> Option<&Index> {
2767 match self.item() {
2768 CatalogItem::Index(idx) => Some(idx),
2769 _ => None,
2770 }
2771 }
2772
2773 pub fn materialized_view(&self) -> Option<&MaterializedView> {
2775 match self.item() {
2776 CatalogItem::MaterializedView(mv) => Some(mv),
2777 _ => None,
2778 }
2779 }
2780
2781 pub fn table(&self) -> Option<&Table> {
2783 match self.item() {
2784 CatalogItem::Table(tbl) => Some(tbl),
2785 _ => None,
2786 }
2787 }
2788
2789 pub fn source(&self) -> Option<&Source> {
2791 match self.item() {
2792 CatalogItem::Source(src) => Some(src),
2793 _ => None,
2794 }
2795 }
2796
2797 pub fn sink(&self) -> Option<&Sink> {
2799 match self.item() {
2800 CatalogItem::Sink(sink) => Some(sink),
2801 _ => None,
2802 }
2803 }
2804
2805 pub fn secret(&self) -> Option<&Secret> {
2807 match self.item() {
2808 CatalogItem::Secret(secret) => Some(secret),
2809 _ => None,
2810 }
2811 }
2812
2813 pub fn connection(&self) -> Result<&Connection, SqlCatalogError> {
2814 match self.item() {
2815 CatalogItem::Connection(connection) => Ok(connection),
2816 _ => {
2817 let db_name = match self.name().qualifiers.database_spec {
2818 ResolvedDatabaseSpecifier::Ambient => "".to_string(),
2819 ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
2820 };
2821 Err(SqlCatalogError::UnknownConnection(format!(
2822 "{}{}.{}",
2823 db_name,
2824 self.name().qualifiers.schema_spec,
2825 self.name().item
2826 )))
2827 }
2828 }
2829 }
2830
2831 pub fn source_desc(
2834 &self,
2835 ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
2836 self.item.source_desc(self)
2837 }
2838
2839 pub fn is_connection(&self) -> bool {
2841 matches!(self.item(), CatalogItem::Connection(_))
2842 }
2843
2844 pub fn is_table(&self) -> bool {
2846 matches!(self.item(), CatalogItem::Table(_))
2847 }
2848
2849 pub fn is_source(&self) -> bool {
2852 matches!(self.item(), CatalogItem::Source(_))
2853 }
2854
2855 pub fn subsource_details(
2858 &self,
2859 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
2860 match &self.item() {
2861 CatalogItem::Source(source) => match &source.data_source {
2862 DataSourceDesc::IngestionExport {
2863 ingestion_id,
2864 external_reference,
2865 details,
2866 data_config: _,
2867 } => Some((*ingestion_id, external_reference, details)),
2868 _ => None,
2869 },
2870 _ => None,
2871 }
2872 }
2873
2874 pub fn source_export_details(
2877 &self,
2878 ) -> Option<(
2879 CatalogItemId,
2880 &UnresolvedItemName,
2881 &SourceExportDetails,
2882 &SourceExportDataConfig<ReferencedConnection>,
2883 )> {
2884 match &self.item() {
2885 CatalogItem::Source(source) => match &source.data_source {
2886 DataSourceDesc::IngestionExport {
2887 ingestion_id,
2888 external_reference,
2889 details,
2890 data_config,
2891 } => Some((*ingestion_id, external_reference, details, data_config)),
2892 _ => None,
2893 },
2894 CatalogItem::Table(table) => match &table.data_source {
2895 TableDataSource::DataSource {
2896 desc:
2897 DataSourceDesc::IngestionExport {
2898 ingestion_id,
2899 external_reference,
2900 details,
2901 data_config,
2902 },
2903 timeline: _,
2904 } => Some((*ingestion_id, external_reference, details, data_config)),
2905 _ => None,
2906 },
2907 _ => None,
2908 }
2909 }
2910
2911 pub fn is_progress_source(&self) -> bool {
2913 self.item().is_progress_source()
2914 }
2915
2916 pub fn progress_id(&self) -> Option<CatalogItemId> {
2918 match &self.item() {
2919 CatalogItem::Source(source) => match &source.data_source {
2920 DataSourceDesc::Ingestion { .. } => Some(self.id),
2921 DataSourceDesc::OldSyntaxIngestion {
2922 progress_subsource, ..
2923 } => Some(*progress_subsource),
2924 DataSourceDesc::IngestionExport { .. }
2925 | DataSourceDesc::Introspection(_)
2926 | DataSourceDesc::Progress
2927 | DataSourceDesc::Webhook { .. }
2928 | DataSourceDesc::Catalog => None,
2929 },
2930 CatalogItem::Table(_)
2931 | CatalogItem::Log(_)
2932 | CatalogItem::View(_)
2933 | CatalogItem::MaterializedView(_)
2934 | CatalogItem::Sink(_)
2935 | CatalogItem::Index(_)
2936 | CatalogItem::Type(_)
2937 | CatalogItem::Func(_)
2938 | CatalogItem::Secret(_)
2939 | CatalogItem::Connection(_)
2940 | CatalogItem::ContinualTask(_) => None,
2941 }
2942 }
2943
2944 pub fn is_sink(&self) -> bool {
2946 matches!(self.item(), CatalogItem::Sink(_))
2947 }
2948
2949 pub fn is_materialized_view(&self) -> bool {
2951 matches!(self.item(), CatalogItem::MaterializedView(_))
2952 }
2953
2954 pub fn is_view(&self) -> bool {
2956 matches!(self.item(), CatalogItem::View(_))
2957 }
2958
2959 pub fn is_secret(&self) -> bool {
2961 matches!(self.item(), CatalogItem::Secret(_))
2962 }
2963
2964 pub fn is_introspection_source(&self) -> bool {
2966 matches!(self.item(), CatalogItem::Log(_))
2967 }
2968
2969 pub fn is_index(&self) -> bool {
2971 matches!(self.item(), CatalogItem::Index(_))
2972 }
2973
2974 pub fn is_continual_task(&self) -> bool {
2976 matches!(self.item(), CatalogItem::ContinualTask(_))
2977 }
2978
2979 pub fn is_relation(&self) -> bool {
2981 mz_sql::catalog::ObjectType::from(self.item_type()).is_relation()
2982 }
2983
2984 pub fn references(&self) -> &ResolvedIds {
2987 self.item.references()
2988 }
2989
2990 pub fn uses(&self) -> BTreeSet<CatalogItemId> {
2996 self.item.uses()
2997 }
2998
2999 pub fn item(&self) -> &CatalogItem {
3001 &self.item
3002 }
3003
3004 pub fn item_mut(&mut self) -> &mut CatalogItem {
3007 &mut self.item
3008 }
3009
3010 pub fn id(&self) -> CatalogItemId {
3012 self.id
3013 }
3014
3015 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
3017 self.item().global_ids()
3018 }
3019
3020 pub fn latest_global_id(&self) -> GlobalId {
3021 self.item().latest_global_id()
3022 }
3023
3024 pub fn oid(&self) -> u32 {
3026 self.oid
3027 }
3028
3029 pub fn name(&self) -> &QualifiedItemName {
3031 &self.name
3032 }
3033
3034 pub fn referenced_by(&self) -> &[CatalogItemId] {
3036 &self.referenced_by
3037 }
3038
3039 pub fn used_by(&self) -> &[CatalogItemId] {
3041 &self.used_by
3042 }
3043
3044 pub fn conn_id(&self) -> Option<&ConnectionId> {
3047 self.item.conn_id()
3048 }
3049
3050 pub fn owner_id(&self) -> &RoleId {
3052 &self.owner_id
3053 }
3054
3055 pub fn privileges(&self) -> &PrivilegeMap {
3057 &self.privileges
3058 }
3059
3060 pub fn comment_object_id(&self) -> CommentObjectId {
3062 use CatalogItemType::*;
3063 match self.item_type() {
3064 Table => CommentObjectId::Table(self.id),
3065 Source => CommentObjectId::Source(self.id),
3066 Sink => CommentObjectId::Sink(self.id),
3067 View => CommentObjectId::View(self.id),
3068 MaterializedView => CommentObjectId::MaterializedView(self.id),
3069 Index => CommentObjectId::Index(self.id),
3070 Func => CommentObjectId::Func(self.id),
3071 Connection => CommentObjectId::Connection(self.id),
3072 Type => CommentObjectId::Type(self.id),
3073 Secret => CommentObjectId::Secret(self.id),
3074 ContinualTask => CommentObjectId::ContinualTask(self.id),
3075 }
3076 }
3077}
3078
3079#[derive(Debug, Clone, Default)]
3080pub struct CommentsMap {
3081 map: BTreeMap<CommentObjectId, BTreeMap<Option<usize>, String>>,
3082}
3083
3084impl CommentsMap {
3085 pub fn update_comment(
3086 &mut self,
3087 object_id: CommentObjectId,
3088 sub_component: Option<usize>,
3089 comment: Option<String>,
3090 ) -> Option<String> {
3091 let object_comments = self.map.entry(object_id).or_default();
3092
3093 let (empty, prev) = if let Some(comment) = comment {
3095 let prev = object_comments.insert(sub_component, comment);
3096 (false, prev)
3097 } else {
3098 let prev = object_comments.remove(&sub_component);
3099 (object_comments.is_empty(), prev)
3100 };
3101
3102 if empty {
3104 self.map.remove(&object_id);
3105 }
3106
3107 prev
3109 }
3110
3111 pub fn drop_comments(
3117 &mut self,
3118 object_ids: &BTreeSet<CommentObjectId>,
3119 ) -> Vec<(CommentObjectId, Option<usize>, String)> {
3120 let mut removed_comments = Vec::new();
3121
3122 for object_id in object_ids {
3123 if let Some(comments) = self.map.remove(object_id) {
3124 let removed = comments
3125 .into_iter()
3126 .map(|(sub_comp, comment)| (object_id.clone(), sub_comp, comment));
3127 removed_comments.extend(removed);
3128 }
3129 }
3130
3131 removed_comments
3132 }
3133
3134 pub fn iter(&self) -> impl Iterator<Item = (CommentObjectId, Option<usize>, &str)> {
3135 self.map
3136 .iter()
3137 .map(|(id, comments)| {
3138 comments
3139 .iter()
3140 .map(|(pos, comment)| (*id, *pos, comment.as_str()))
3141 })
3142 .flatten()
3143 }
3144
3145 pub fn get_object_comments(
3146 &self,
3147 object_id: CommentObjectId,
3148 ) -> Option<&BTreeMap<Option<usize>, String>> {
3149 self.map.get(&object_id)
3150 }
3151}
3152
3153impl Serialize for CommentsMap {
3154 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
3155 where
3156 S: serde::Serializer,
3157 {
3158 let comment_count = self
3159 .map
3160 .iter()
3161 .map(|(_object_id, comments)| comments.len())
3162 .sum();
3163
3164 let mut seq = serializer.serialize_seq(Some(comment_count))?;
3165 for (object_id, sub) in &self.map {
3166 for (sub_component, comment) in sub {
3167 seq.serialize_element(&(
3168 format!("{object_id:?}"),
3169 format!("{sub_component:?}"),
3170 comment,
3171 ))?;
3172 }
3173 }
3174 seq.end()
3175 }
3176}
3177
3178#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
3179pub struct DefaultPrivileges {
3180 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
3181 privileges: BTreeMap<DefaultPrivilegeObject, RoleDefaultPrivileges>,
3182}
3183
3184#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
3187struct RoleDefaultPrivileges(
3188 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
3190 BTreeMap<RoleId, DefaultPrivilegeAclItem>,
3191);
3192
3193impl Deref for RoleDefaultPrivileges {
3194 type Target = BTreeMap<RoleId, DefaultPrivilegeAclItem>;
3195
3196 fn deref(&self) -> &Self::Target {
3197 &self.0
3198 }
3199}
3200
3201impl DerefMut for RoleDefaultPrivileges {
3202 fn deref_mut(&mut self) -> &mut Self::Target {
3203 &mut self.0
3204 }
3205}
3206
3207impl DefaultPrivileges {
3208 pub fn grant(&mut self, object: DefaultPrivilegeObject, privilege: DefaultPrivilegeAclItem) {
3210 if privilege.acl_mode.is_empty() {
3211 return;
3212 }
3213
3214 let privileges = self.privileges.entry(object).or_default();
3215 if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
3216 default_privilege.acl_mode |= privilege.acl_mode;
3217 } else {
3218 privileges.insert(privilege.grantee, privilege);
3219 }
3220 }
3221
3222 pub fn revoke(&mut self, object: &DefaultPrivilegeObject, privilege: &DefaultPrivilegeAclItem) {
3224 if let Some(privileges) = self.privileges.get_mut(object) {
3225 if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
3226 default_privilege.acl_mode =
3227 default_privilege.acl_mode.difference(privilege.acl_mode);
3228 if default_privilege.acl_mode.is_empty() {
3229 privileges.remove(&privilege.grantee);
3230 }
3231 }
3232 if privileges.is_empty() {
3233 self.privileges.remove(object);
3234 }
3235 }
3236 }
3237
3238 pub fn get_privileges_for_grantee(
3241 &self,
3242 object: &DefaultPrivilegeObject,
3243 grantee: &RoleId,
3244 ) -> Option<&AclMode> {
3245 self.privileges
3246 .get(object)
3247 .and_then(|privileges| privileges.get(grantee))
3248 .map(|privilege| &privilege.acl_mode)
3249 }
3250
3251 pub fn get_applicable_privileges(
3253 &self,
3254 role_id: RoleId,
3255 database_id: Option<DatabaseId>,
3256 schema_id: Option<SchemaId>,
3257 object_type: mz_sql::catalog::ObjectType,
3258 ) -> impl Iterator<Item = DefaultPrivilegeAclItem> + '_ {
3259 let privilege_object_type = if object_type.is_relation() {
3263 mz_sql::catalog::ObjectType::Table
3264 } else {
3265 object_type
3266 };
3267 let valid_acl_mode = rbac::all_object_privileges(SystemObjectType::Object(object_type));
3268
3269 [
3273 DefaultPrivilegeObject {
3274 role_id,
3275 database_id,
3276 schema_id,
3277 object_type: privilege_object_type,
3278 },
3279 DefaultPrivilegeObject {
3280 role_id,
3281 database_id,
3282 schema_id: None,
3283 object_type: privilege_object_type,
3284 },
3285 DefaultPrivilegeObject {
3286 role_id,
3287 database_id: None,
3288 schema_id: None,
3289 object_type: privilege_object_type,
3290 },
3291 DefaultPrivilegeObject {
3292 role_id: RoleId::Public,
3293 database_id,
3294 schema_id,
3295 object_type: privilege_object_type,
3296 },
3297 DefaultPrivilegeObject {
3298 role_id: RoleId::Public,
3299 database_id,
3300 schema_id: None,
3301 object_type: privilege_object_type,
3302 },
3303 DefaultPrivilegeObject {
3304 role_id: RoleId::Public,
3305 database_id: None,
3306 schema_id: None,
3307 object_type: privilege_object_type,
3308 },
3309 ]
3310 .into_iter()
3311 .filter_map(|object| self.privileges.get(&object))
3312 .flat_map(|acl_map| acl_map.values())
3313 .fold(
3315 BTreeMap::new(),
3316 |mut accum, DefaultPrivilegeAclItem { grantee, acl_mode }| {
3317 let accum_acl_mode = accum.entry(grantee).or_insert_with(AclMode::empty);
3318 *accum_acl_mode |= *acl_mode;
3319 accum
3320 },
3321 )
3322 .into_iter()
3323 .map(move |(grantee, acl_mode)| (grantee, acl_mode & valid_acl_mode))
3328 .filter(|(_, acl_mode)| !acl_mode.is_empty())
3330 .map(|(grantee, acl_mode)| DefaultPrivilegeAclItem {
3331 grantee: *grantee,
3332 acl_mode,
3333 })
3334 }
3335
3336 pub fn iter(
3337 &self,
3338 ) -> impl Iterator<
3339 Item = (
3340 &DefaultPrivilegeObject,
3341 impl Iterator<Item = &DefaultPrivilegeAclItem>,
3342 ),
3343 > {
3344 self.privileges
3345 .iter()
3346 .map(|(object, acl_map)| (object, acl_map.values()))
3347 }
3348}
3349
3350#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3351pub struct ClusterConfig {
3352 pub variant: ClusterVariant,
3353 pub workload_class: Option<String>,
3354}
3355
3356impl ClusterConfig {
3357 pub fn features(&self) -> Option<&OptimizerFeatureOverrides> {
3358 match &self.variant {
3359 ClusterVariant::Managed(managed) => Some(&managed.optimizer_feature_overrides),
3360 ClusterVariant::Unmanaged => None,
3361 }
3362 }
3363}
3364
3365impl From<ClusterConfig> for durable::ClusterConfig {
3366 fn from(config: ClusterConfig) -> Self {
3367 Self {
3368 variant: config.variant.into(),
3369 workload_class: config.workload_class,
3370 }
3371 }
3372}
3373
3374impl From<durable::ClusterConfig> for ClusterConfig {
3375 fn from(config: durable::ClusterConfig) -> Self {
3376 Self {
3377 variant: config.variant.into(),
3378 workload_class: config.workload_class,
3379 }
3380 }
3381}
3382
3383#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3384pub struct ClusterVariantManaged {
3385 pub size: String,
3386 pub availability_zones: Vec<String>,
3387 pub logging: ReplicaLogging,
3388 pub replication_factor: u32,
3389 pub optimizer_feature_overrides: OptimizerFeatureOverrides,
3390 pub schedule: ClusterSchedule,
3391}
3392
3393impl From<ClusterVariantManaged> for durable::ClusterVariantManaged {
3394 fn from(managed: ClusterVariantManaged) -> Self {
3395 Self {
3396 size: managed.size,
3397 availability_zones: managed.availability_zones,
3398 logging: managed.logging,
3399 replication_factor: managed.replication_factor,
3400 optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3401 schedule: managed.schedule,
3402 }
3403 }
3404}
3405
3406impl From<durable::ClusterVariantManaged> for ClusterVariantManaged {
3407 fn from(managed: durable::ClusterVariantManaged) -> Self {
3408 Self {
3409 size: managed.size,
3410 availability_zones: managed.availability_zones,
3411 logging: managed.logging,
3412 replication_factor: managed.replication_factor,
3413 optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3414 schedule: managed.schedule,
3415 }
3416 }
3417}
3418
3419#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3420pub enum ClusterVariant {
3421 Managed(ClusterVariantManaged),
3422 Unmanaged,
3423}
3424
3425impl From<ClusterVariant> for durable::ClusterVariant {
3426 fn from(variant: ClusterVariant) -> Self {
3427 match variant {
3428 ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3429 ClusterVariant::Unmanaged => Self::Unmanaged,
3430 }
3431 }
3432}
3433
3434impl From<durable::ClusterVariant> for ClusterVariant {
3435 fn from(variant: durable::ClusterVariant) -> Self {
3436 match variant {
3437 durable::ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3438 durable::ClusterVariant::Unmanaged => Self::Unmanaged,
3439 }
3440 }
3441}
3442
3443impl mz_sql::catalog::CatalogDatabase for Database {
3444 fn name(&self) -> &str {
3445 &self.name
3446 }
3447
3448 fn id(&self) -> DatabaseId {
3449 self.id
3450 }
3451
3452 fn has_schemas(&self) -> bool {
3453 !self.schemas_by_name.is_empty()
3454 }
3455
3456 fn schema_ids(&self) -> &BTreeMap<String, SchemaId> {
3457 &self.schemas_by_name
3458 }
3459
3460 #[allow(clippy::as_conversions)]
3462 fn schemas(&self) -> Vec<&dyn CatalogSchema> {
3463 self.schemas_by_id
3464 .values()
3465 .map(|schema| schema as &dyn CatalogSchema)
3466 .collect()
3467 }
3468
3469 fn owner_id(&self) -> RoleId {
3470 self.owner_id
3471 }
3472
3473 fn privileges(&self) -> &PrivilegeMap {
3474 &self.privileges
3475 }
3476}
3477
3478impl mz_sql::catalog::CatalogSchema for Schema {
3479 fn database(&self) -> &ResolvedDatabaseSpecifier {
3480 &self.name.database
3481 }
3482
3483 fn name(&self) -> &QualifiedSchemaName {
3484 &self.name
3485 }
3486
3487 fn id(&self) -> &SchemaSpecifier {
3488 &self.id
3489 }
3490
3491 fn has_items(&self) -> bool {
3492 !self.items.is_empty()
3493 }
3494
3495 fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_> {
3496 Box::new(
3497 self.items
3498 .values()
3499 .chain(self.functions.values())
3500 .chain(self.types.values())
3501 .copied(),
3502 )
3503 }
3504
3505 fn owner_id(&self) -> RoleId {
3506 self.owner_id
3507 }
3508
3509 fn privileges(&self) -> &PrivilegeMap {
3510 &self.privileges
3511 }
3512}
3513
3514impl mz_sql::catalog::CatalogRole for Role {
3515 fn name(&self) -> &str {
3516 &self.name
3517 }
3518
3519 fn id(&self) -> RoleId {
3520 self.id
3521 }
3522
3523 fn membership(&self) -> &BTreeMap<RoleId, RoleId> {
3524 &self.membership.map
3525 }
3526
3527 fn attributes(&self) -> &RoleAttributes {
3528 &self.attributes
3529 }
3530
3531 fn vars(&self) -> &BTreeMap<String, OwnedVarInput> {
3532 &self.vars.map
3533 }
3534}
3535
3536impl mz_sql::catalog::CatalogNetworkPolicy for NetworkPolicy {
3537 fn name(&self) -> &str {
3538 &self.name
3539 }
3540
3541 fn id(&self) -> NetworkPolicyId {
3542 self.id
3543 }
3544
3545 fn owner_id(&self) -> RoleId {
3546 self.owner_id
3547 }
3548
3549 fn privileges(&self) -> &PrivilegeMap {
3550 &self.privileges
3551 }
3552}
3553
3554impl mz_sql::catalog::CatalogCluster<'_> for Cluster {
3555 fn name(&self) -> &str {
3556 &self.name
3557 }
3558
3559 fn id(&self) -> ClusterId {
3560 self.id
3561 }
3562
3563 fn bound_objects(&self) -> &BTreeSet<CatalogItemId> {
3564 &self.bound_objects
3565 }
3566
3567 fn replica_ids(&self) -> &BTreeMap<String, ReplicaId> {
3568 &self.replica_id_by_name_
3569 }
3570
3571 #[allow(clippy::as_conversions)]
3573 fn replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>> {
3574 self.replicas()
3575 .map(|replica| replica as &dyn CatalogClusterReplica)
3576 .collect()
3577 }
3578
3579 fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica<'_> {
3580 self.replica(id).expect("catalog out of sync")
3581 }
3582
3583 fn owner_id(&self) -> RoleId {
3584 self.owner_id
3585 }
3586
3587 fn privileges(&self) -> &PrivilegeMap {
3588 &self.privileges
3589 }
3590
3591 fn is_managed(&self) -> bool {
3592 self.is_managed()
3593 }
3594
3595 fn managed_size(&self) -> Option<&str> {
3596 match &self.config.variant {
3597 ClusterVariant::Managed(ClusterVariantManaged { size, .. }) => Some(size),
3598 ClusterVariant::Unmanaged => None,
3599 }
3600 }
3601
3602 fn schedule(&self) -> Option<&ClusterSchedule> {
3603 match &self.config.variant {
3604 ClusterVariant::Managed(ClusterVariantManaged { schedule, .. }) => Some(schedule),
3605 ClusterVariant::Unmanaged => None,
3606 }
3607 }
3608
3609 fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
3610 self.try_to_plan()
3611 }
3612}
3613
3614impl mz_sql::catalog::CatalogClusterReplica<'_> for ClusterReplica {
3615 fn name(&self) -> &str {
3616 &self.name
3617 }
3618
3619 fn cluster_id(&self) -> ClusterId {
3620 self.cluster_id
3621 }
3622
3623 fn replica_id(&self) -> ReplicaId {
3624 self.replica_id
3625 }
3626
3627 fn owner_id(&self) -> RoleId {
3628 self.owner_id
3629 }
3630
3631 fn internal(&self) -> bool {
3632 self.config.location.internal()
3633 }
3634}
3635
3636impl mz_sql::catalog::CatalogItem for CatalogEntry {
3637 fn name(&self) -> &QualifiedItemName {
3638 self.name()
3639 }
3640
3641 fn id(&self) -> CatalogItemId {
3642 self.id()
3643 }
3644
3645 fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
3646 Box::new(self.global_ids())
3647 }
3648
3649 fn oid(&self) -> u32 {
3650 self.oid()
3651 }
3652
3653 fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
3654 self.func()
3655 }
3656
3657 fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
3658 self.source_desc()
3659 }
3660
3661 fn connection(
3662 &self,
3663 ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
3664 {
3665 Ok(self.connection()?.details.to_connection())
3666 }
3667
3668 fn create_sql(&self) -> &str {
3669 match self.item() {
3670 CatalogItem::Table(Table { create_sql, .. }) => {
3671 create_sql.as_deref().unwrap_or("<builtin>")
3672 }
3673 CatalogItem::Source(Source { create_sql, .. }) => {
3674 create_sql.as_deref().unwrap_or("<builtin>")
3675 }
3676 CatalogItem::Sink(Sink { create_sql, .. }) => create_sql,
3677 CatalogItem::View(View { create_sql, .. }) => create_sql,
3678 CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) => create_sql,
3679 CatalogItem::Index(Index { create_sql, .. }) => create_sql,
3680 CatalogItem::Type(Type { create_sql, .. }) => {
3681 create_sql.as_deref().unwrap_or("<builtin>")
3682 }
3683 CatalogItem::Secret(Secret { create_sql, .. }) => create_sql,
3684 CatalogItem::Connection(Connection { create_sql, .. }) => create_sql,
3685 CatalogItem::Func(_) => "<builtin>",
3686 CatalogItem::Log(_) => "<builtin>",
3687 CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => create_sql,
3688 }
3689 }
3690
3691 fn item_type(&self) -> SqlCatalogItemType {
3692 self.item().typ()
3693 }
3694
3695 fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
3696 if let CatalogItem::Index(Index { keys, on, .. }) = self.item() {
3697 Some((keys, *on))
3698 } else {
3699 None
3700 }
3701 }
3702
3703 fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
3704 if let CatalogItem::Table(Table {
3705 data_source: TableDataSource::TableWrites { defaults },
3706 ..
3707 }) = self.item()
3708 {
3709 Some(defaults.as_slice())
3710 } else {
3711 None
3712 }
3713 }
3714
3715 fn replacement_target(&self) -> Option<CatalogItemId> {
3716 if let CatalogItem::MaterializedView(mv) = self.item() {
3717 mv.replacement_target
3718 } else {
3719 None
3720 }
3721 }
3722
3723 fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
3724 if let CatalogItem::Type(Type { details, .. }) = self.item() {
3725 Some(details)
3726 } else {
3727 None
3728 }
3729 }
3730
3731 fn references(&self) -> &ResolvedIds {
3732 self.references()
3733 }
3734
3735 fn uses(&self) -> BTreeSet<CatalogItemId> {
3736 self.uses()
3737 }
3738
3739 fn referenced_by(&self) -> &[CatalogItemId] {
3740 self.referenced_by()
3741 }
3742
3743 fn used_by(&self) -> &[CatalogItemId] {
3744 self.used_by()
3745 }
3746
3747 fn subsource_details(
3748 &self,
3749 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
3750 self.subsource_details()
3751 }
3752
3753 fn source_export_details(
3754 &self,
3755 ) -> Option<(
3756 CatalogItemId,
3757 &UnresolvedItemName,
3758 &SourceExportDetails,
3759 &SourceExportDataConfig<ReferencedConnection>,
3760 )> {
3761 self.source_export_details()
3762 }
3763
3764 fn is_progress_source(&self) -> bool {
3765 self.is_progress_source()
3766 }
3767
3768 fn progress_id(&self) -> Option<CatalogItemId> {
3769 self.progress_id()
3770 }
3771
3772 fn owner_id(&self) -> RoleId {
3773 self.owner_id
3774 }
3775
3776 fn privileges(&self) -> &PrivilegeMap {
3777 &self.privileges
3778 }
3779
3780 fn cluster_id(&self) -> Option<ClusterId> {
3781 self.item().cluster_id()
3782 }
3783
3784 fn at_version(
3785 &self,
3786 version: RelationVersionSelector,
3787 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
3788 Box::new(CatalogCollectionEntry {
3789 entry: self.clone(),
3790 version,
3791 })
3792 }
3793
3794 fn latest_version(&self) -> Option<RelationVersion> {
3795 self.table().map(|t| t.desc.latest_version())
3796 }
3797}
3798
3799#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3801pub struct StateUpdate {
3802 pub kind: StateUpdateKind,
3803 pub ts: Timestamp,
3804 pub diff: StateDiff,
3805}
3806
3807#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3811pub enum StateUpdateKind {
3812 Role(durable::objects::Role),
3813 RoleAuth(durable::objects::RoleAuth),
3814 Database(durable::objects::Database),
3815 Schema(durable::objects::Schema),
3816 DefaultPrivilege(durable::objects::DefaultPrivilege),
3817 SystemPrivilege(MzAclItem),
3818 SystemConfiguration(durable::objects::SystemConfiguration),
3819 Cluster(durable::objects::Cluster),
3820 NetworkPolicy(durable::objects::NetworkPolicy),
3821 IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3822 ClusterReplica(durable::objects::ClusterReplica),
3823 SourceReferences(durable::objects::SourceReferences),
3824 SystemObjectMapping(durable::objects::SystemObjectMapping),
3825 TemporaryItem(TemporaryItem),
3829 Item(durable::objects::Item),
3830 Comment(durable::objects::Comment),
3831 AuditLog(durable::objects::AuditLog),
3832 StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3834 UnfinalizedShard(durable::objects::UnfinalizedShard),
3835}
3836
3837#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
3839pub enum StateDiff {
3840 Retraction,
3841 Addition,
3842}
3843
3844impl From<StateDiff> for Diff {
3845 fn from(diff: StateDiff) -> Self {
3846 match diff {
3847 StateDiff::Retraction => Diff::MINUS_ONE,
3848 StateDiff::Addition => Diff::ONE,
3849 }
3850 }
3851}
3852impl TryFrom<Diff> for StateDiff {
3853 type Error = String;
3854
3855 fn try_from(diff: Diff) -> Result<Self, Self::Error> {
3856 match diff {
3857 Diff::MINUS_ONE => Ok(Self::Retraction),
3858 Diff::ONE => Ok(Self::Addition),
3859 diff => Err(format!("invalid diff {diff}")),
3860 }
3861 }
3862}
3863
3864#[derive(Debug, Clone, Ord, PartialOrd, PartialEq, Eq)]
3866pub struct TemporaryItem {
3867 pub id: CatalogItemId,
3868 pub oid: u32,
3869 pub global_id: GlobalId,
3870 pub schema_id: SchemaId,
3871 pub name: String,
3872 pub conn_id: Option<ConnectionId>,
3873 pub create_sql: String,
3874 pub owner_id: RoleId,
3875 pub privileges: Vec<MzAclItem>,
3876 pub extra_versions: BTreeMap<RelationVersion, GlobalId>,
3877}
3878
3879impl From<CatalogEntry> for TemporaryItem {
3880 fn from(entry: CatalogEntry) -> Self {
3881 let conn_id = entry.conn_id().cloned();
3882 let (create_sql, global_id, extra_versions) = entry.item.to_serialized();
3883
3884 TemporaryItem {
3885 id: entry.id,
3886 oid: entry.oid,
3887 global_id,
3888 schema_id: entry.name.qualifiers.schema_spec.into(),
3889 name: entry.name.item,
3890 conn_id,
3891 create_sql,
3892 owner_id: entry.owner_id,
3893 privileges: entry.privileges.into_all_values().collect(),
3894 extra_versions,
3895 }
3896 }
3897}
3898
3899impl TemporaryItem {
3900 pub fn item_type(&self) -> CatalogItemType {
3901 item_type(&self.create_sql)
3902 }
3903}
3904
3905#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
3907pub enum BootstrapStateUpdateKind {
3908 Role(durable::objects::Role),
3909 RoleAuth(durable::objects::RoleAuth),
3910 Database(durable::objects::Database),
3911 Schema(durable::objects::Schema),
3912 DefaultPrivilege(durable::objects::DefaultPrivilege),
3913 SystemPrivilege(MzAclItem),
3914 SystemConfiguration(durable::objects::SystemConfiguration),
3915 Cluster(durable::objects::Cluster),
3916 NetworkPolicy(durable::objects::NetworkPolicy),
3917 IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3918 ClusterReplica(durable::objects::ClusterReplica),
3919 SourceReferences(durable::objects::SourceReferences),
3920 SystemObjectMapping(durable::objects::SystemObjectMapping),
3921 Item(durable::objects::Item),
3922 Comment(durable::objects::Comment),
3923 AuditLog(durable::objects::AuditLog),
3924 StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3926 UnfinalizedShard(durable::objects::UnfinalizedShard),
3927}
3928
3929impl From<BootstrapStateUpdateKind> for StateUpdateKind {
3930 fn from(value: BootstrapStateUpdateKind) -> Self {
3931 match value {
3932 BootstrapStateUpdateKind::Role(kind) => StateUpdateKind::Role(kind),
3933 BootstrapStateUpdateKind::RoleAuth(kind) => StateUpdateKind::RoleAuth(kind),
3934 BootstrapStateUpdateKind::Database(kind) => StateUpdateKind::Database(kind),
3935 BootstrapStateUpdateKind::Schema(kind) => StateUpdateKind::Schema(kind),
3936 BootstrapStateUpdateKind::DefaultPrivilege(kind) => {
3937 StateUpdateKind::DefaultPrivilege(kind)
3938 }
3939 BootstrapStateUpdateKind::SystemPrivilege(kind) => {
3940 StateUpdateKind::SystemPrivilege(kind)
3941 }
3942 BootstrapStateUpdateKind::SystemConfiguration(kind) => {
3943 StateUpdateKind::SystemConfiguration(kind)
3944 }
3945 BootstrapStateUpdateKind::SourceReferences(kind) => {
3946 StateUpdateKind::SourceReferences(kind)
3947 }
3948 BootstrapStateUpdateKind::Cluster(kind) => StateUpdateKind::Cluster(kind),
3949 BootstrapStateUpdateKind::NetworkPolicy(kind) => StateUpdateKind::NetworkPolicy(kind),
3950 BootstrapStateUpdateKind::IntrospectionSourceIndex(kind) => {
3951 StateUpdateKind::IntrospectionSourceIndex(kind)
3952 }
3953 BootstrapStateUpdateKind::ClusterReplica(kind) => StateUpdateKind::ClusterReplica(kind),
3954 BootstrapStateUpdateKind::SystemObjectMapping(kind) => {
3955 StateUpdateKind::SystemObjectMapping(kind)
3956 }
3957 BootstrapStateUpdateKind::Item(kind) => StateUpdateKind::Item(kind),
3958 BootstrapStateUpdateKind::Comment(kind) => StateUpdateKind::Comment(kind),
3959 BootstrapStateUpdateKind::AuditLog(kind) => StateUpdateKind::AuditLog(kind),
3960 BootstrapStateUpdateKind::StorageCollectionMetadata(kind) => {
3961 StateUpdateKind::StorageCollectionMetadata(kind)
3962 }
3963 BootstrapStateUpdateKind::UnfinalizedShard(kind) => {
3964 StateUpdateKind::UnfinalizedShard(kind)
3965 }
3966 }
3967 }
3968}
3969
3970impl TryFrom<StateUpdateKind> for BootstrapStateUpdateKind {
3971 type Error = TemporaryItem;
3972
3973 fn try_from(value: StateUpdateKind) -> Result<Self, Self::Error> {
3974 match value {
3975 StateUpdateKind::Role(kind) => Ok(BootstrapStateUpdateKind::Role(kind)),
3976 StateUpdateKind::RoleAuth(kind) => Ok(BootstrapStateUpdateKind::RoleAuth(kind)),
3977 StateUpdateKind::Database(kind) => Ok(BootstrapStateUpdateKind::Database(kind)),
3978 StateUpdateKind::Schema(kind) => Ok(BootstrapStateUpdateKind::Schema(kind)),
3979 StateUpdateKind::DefaultPrivilege(kind) => {
3980 Ok(BootstrapStateUpdateKind::DefaultPrivilege(kind))
3981 }
3982 StateUpdateKind::SystemPrivilege(kind) => {
3983 Ok(BootstrapStateUpdateKind::SystemPrivilege(kind))
3984 }
3985 StateUpdateKind::SystemConfiguration(kind) => {
3986 Ok(BootstrapStateUpdateKind::SystemConfiguration(kind))
3987 }
3988 StateUpdateKind::Cluster(kind) => Ok(BootstrapStateUpdateKind::Cluster(kind)),
3989 StateUpdateKind::NetworkPolicy(kind) => {
3990 Ok(BootstrapStateUpdateKind::NetworkPolicy(kind))
3991 }
3992 StateUpdateKind::IntrospectionSourceIndex(kind) => {
3993 Ok(BootstrapStateUpdateKind::IntrospectionSourceIndex(kind))
3994 }
3995 StateUpdateKind::ClusterReplica(kind) => {
3996 Ok(BootstrapStateUpdateKind::ClusterReplica(kind))
3997 }
3998 StateUpdateKind::SourceReferences(kind) => {
3999 Ok(BootstrapStateUpdateKind::SourceReferences(kind))
4000 }
4001 StateUpdateKind::SystemObjectMapping(kind) => {
4002 Ok(BootstrapStateUpdateKind::SystemObjectMapping(kind))
4003 }
4004 StateUpdateKind::TemporaryItem(kind) => Err(kind),
4005 StateUpdateKind::Item(kind) => Ok(BootstrapStateUpdateKind::Item(kind)),
4006 StateUpdateKind::Comment(kind) => Ok(BootstrapStateUpdateKind::Comment(kind)),
4007 StateUpdateKind::AuditLog(kind) => Ok(BootstrapStateUpdateKind::AuditLog(kind)),
4008 StateUpdateKind::StorageCollectionMetadata(kind) => {
4009 Ok(BootstrapStateUpdateKind::StorageCollectionMetadata(kind))
4010 }
4011 StateUpdateKind::UnfinalizedShard(kind) => {
4012 Ok(BootstrapStateUpdateKind::UnfinalizedShard(kind))
4013 }
4014 }
4015 }
4016}