1use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, LazyLock};
18
19use chrono::{DateTime, Utc};
20use mz_adapter_types::compaction::CompactionWindow;
21use mz_adapter_types::connection::ConnectionId;
22use mz_compute_client::logging::LogVariant;
23use mz_controller::clusters::{ClusterRole, ClusterStatus, ReplicaConfig, ReplicaLogging};
24use mz_controller_types::{ClusterId, ReplicaId};
25use mz_expr::{MirScalarExpr, OptimizedMirRelationExpr};
26use mz_ore::collections::CollectionExt;
27use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
28use mz_repr::network_policy_id::NetworkPolicyId;
29use mz_repr::optimize::OptimizerFeatureOverrides;
30use mz_repr::refresh_schedule::RefreshSchedule;
31use mz_repr::role_id::RoleId;
32use mz_repr::{
33 CatalogItemId, ColumnName, Diff, GlobalId, RelationDesc, RelationVersion,
34 RelationVersionSelector, SqlColumnType, Timestamp, VersionedRelationDesc,
35};
36use mz_sql::ast::display::AstDisplay;
37use mz_sql::ast::{
38 ColumnDef, ColumnOption, ColumnOptionDef, ColumnVersioned, Expr, Raw, RawDataType, Statement,
39 UnresolvedItemName, Value, WithOptionValue,
40};
41use mz_sql::catalog::{
42 CatalogClusterReplica, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem,
43 CatalogItemType as SqlCatalogItemType, CatalogItemType, CatalogSchema, CatalogType,
44 CatalogTypeDetails, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference,
45 RoleAttributes, RoleMembership, RoleVars, SystemObjectType,
46};
47use mz_sql::names::{
48 Aug, CommentObjectId, DatabaseId, DependencyIds, FullItemName, QualifiedItemName,
49 QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier,
50};
51use mz_sql::plan::{
52 ClusterSchedule, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, ConnectionDetails,
53 CreateClusterManagedPlan, CreateClusterPlan, CreateClusterVariant, CreateSourcePlan,
54 HirRelationExpr, NetworkPolicyRule, PlanError, WebhookBodyFormat, WebhookHeaders,
55 WebhookValidation,
56};
57use mz_sql::rbac;
58use mz_sql::session::vars::OwnedVarInput;
59use mz_storage_client::controller::IntrospectionType;
60use mz_storage_types::connections::inline::ReferencedConnection;
61use mz_storage_types::sinks::{SinkEnvelope, StorageSinkConnection};
62use mz_storage_types::sources::load_generator::LoadGenerator;
63use mz_storage_types::sources::{
64 GenericSourceConnection, SourceConnection, SourceDesc, SourceEnvelope, SourceExportDataConfig,
65 SourceExportDetails, Timeline,
66};
67use serde::ser::SerializeSeq;
68use serde::{Deserialize, Serialize};
69use timely::progress::Antichain;
70use tracing::debug;
71
72use crate::builtin::{MZ_CATALOG_SERVER_CLUSTER, MZ_SYSTEM_CLUSTER};
73use crate::durable;
74use crate::durable::objects::item_type;
75
76pub trait UpdateFrom<T>: From<T> {
78 fn update_from(&mut self, from: T);
79}
80
81#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
82pub struct Database {
83 pub name: String,
84 pub id: DatabaseId,
85 pub oid: u32,
86 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
87 pub schemas_by_id: BTreeMap<SchemaId, Schema>,
88 pub schemas_by_name: BTreeMap<String, SchemaId>,
89 pub owner_id: RoleId,
90 pub privileges: PrivilegeMap,
91}
92
93impl From<Database> for durable::Database {
94 fn from(database: Database) -> durable::Database {
95 durable::Database {
96 id: database.id,
97 oid: database.oid,
98 name: database.name,
99 owner_id: database.owner_id,
100 privileges: database.privileges.into_all_values().collect(),
101 }
102 }
103}
104
105impl From<durable::Database> for Database {
106 fn from(
107 durable::Database {
108 id,
109 oid,
110 name,
111 owner_id,
112 privileges,
113 }: durable::Database,
114 ) -> Database {
115 Database {
116 id,
117 oid,
118 schemas_by_id: BTreeMap::new(),
119 schemas_by_name: BTreeMap::new(),
120 name,
121 owner_id,
122 privileges: PrivilegeMap::from_mz_acl_items(privileges),
123 }
124 }
125}
126
127impl UpdateFrom<durable::Database> for Database {
128 fn update_from(
129 &mut self,
130 durable::Database {
131 id,
132 oid,
133 name,
134 owner_id,
135 privileges,
136 }: durable::Database,
137 ) {
138 self.id = id;
139 self.oid = oid;
140 self.name = name;
141 self.owner_id = owner_id;
142 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
143 }
144}
145
146#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
147pub struct Schema {
148 pub name: QualifiedSchemaName,
149 pub id: SchemaSpecifier,
150 pub oid: u32,
151 pub items: BTreeMap<String, CatalogItemId>,
152 pub functions: BTreeMap<String, CatalogItemId>,
153 pub types: BTreeMap<String, CatalogItemId>,
154 pub owner_id: RoleId,
155 pub privileges: PrivilegeMap,
156}
157
158impl From<Schema> for durable::Schema {
159 fn from(schema: Schema) -> durable::Schema {
160 durable::Schema {
161 id: schema.id.into(),
162 oid: schema.oid,
163 name: schema.name.schema,
164 database_id: schema.name.database.id(),
165 owner_id: schema.owner_id,
166 privileges: schema.privileges.into_all_values().collect(),
167 }
168 }
169}
170
171impl From<durable::Schema> for Schema {
172 fn from(
173 durable::Schema {
174 id,
175 oid,
176 name,
177 database_id,
178 owner_id,
179 privileges,
180 }: durable::Schema,
181 ) -> Schema {
182 Schema {
183 name: QualifiedSchemaName {
184 database: database_id.into(),
185 schema: name,
186 },
187 id: id.into(),
188 oid,
189 items: BTreeMap::new(),
190 functions: BTreeMap::new(),
191 types: BTreeMap::new(),
192 owner_id,
193 privileges: PrivilegeMap::from_mz_acl_items(privileges),
194 }
195 }
196}
197
198impl UpdateFrom<durable::Schema> for Schema {
199 fn update_from(
200 &mut self,
201 durable::Schema {
202 id,
203 oid,
204 name,
205 database_id,
206 owner_id,
207 privileges,
208 }: durable::Schema,
209 ) {
210 self.name = QualifiedSchemaName {
211 database: database_id.into(),
212 schema: name,
213 };
214 self.id = id.into();
215 self.oid = oid;
216 self.owner_id = owner_id;
217 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
218 }
219}
220
221#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
222pub struct Role {
223 pub name: String,
224 pub id: RoleId,
225 pub oid: u32,
226 pub attributes: RoleAttributes,
227 pub membership: RoleMembership,
228 pub vars: RoleVars,
229}
230
231impl Role {
232 pub fn is_user(&self) -> bool {
233 self.id.is_user()
234 }
235
236 pub fn vars<'a>(&'a self) -> impl Iterator<Item = (&'a str, &'a OwnedVarInput)> {
237 self.vars.map.iter().map(|(name, val)| (name.as_str(), val))
238 }
239}
240
241impl From<Role> for durable::Role {
242 fn from(role: Role) -> durable::Role {
243 durable::Role {
244 id: role.id,
245 oid: role.oid,
246 name: role.name,
247 attributes: role.attributes,
248 membership: role.membership,
249 vars: role.vars,
250 }
251 }
252}
253
254impl From<durable::Role> for Role {
255 fn from(
256 durable::Role {
257 id,
258 oid,
259 name,
260 attributes,
261 membership,
262 vars,
263 }: durable::Role,
264 ) -> Self {
265 Role {
266 name,
267 id,
268 oid,
269 attributes,
270 membership,
271 vars,
272 }
273 }
274}
275
276impl UpdateFrom<durable::Role> for Role {
277 fn update_from(
278 &mut self,
279 durable::Role {
280 id,
281 oid,
282 name,
283 attributes,
284 membership,
285 vars,
286 }: durable::Role,
287 ) {
288 self.id = id;
289 self.oid = oid;
290 self.name = name;
291 self.attributes = attributes;
292 self.membership = membership;
293 self.vars = vars;
294 }
295}
296
297#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
298pub struct RoleAuth {
299 pub role_id: RoleId,
300 pub password_hash: Option<String>,
301 pub updated_at: u64,
302}
303
304impl From<RoleAuth> for durable::RoleAuth {
305 fn from(role_auth: RoleAuth) -> durable::RoleAuth {
306 durable::RoleAuth {
307 role_id: role_auth.role_id,
308 password_hash: role_auth.password_hash,
309 updated_at: role_auth.updated_at,
310 }
311 }
312}
313
314impl From<durable::RoleAuth> for RoleAuth {
315 fn from(
316 durable::RoleAuth {
317 role_id,
318 password_hash,
319 updated_at,
320 }: durable::RoleAuth,
321 ) -> RoleAuth {
322 RoleAuth {
323 role_id,
324 password_hash,
325 updated_at,
326 }
327 }
328}
329
330impl UpdateFrom<durable::RoleAuth> for RoleAuth {
331 fn update_from(&mut self, from: durable::RoleAuth) {
332 self.role_id = from.role_id;
333 self.password_hash = from.password_hash;
334 }
335}
336
337#[derive(Debug, Serialize, Clone, PartialEq)]
338pub struct Cluster {
339 pub name: String,
340 pub id: ClusterId,
341 pub config: ClusterConfig,
342 #[serde(skip)]
343 pub log_indexes: BTreeMap<LogVariant, GlobalId>,
344 pub bound_objects: BTreeSet<CatalogItemId>,
347 pub replica_id_by_name_: BTreeMap<String, ReplicaId>,
348 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
349 pub replicas_by_id_: BTreeMap<ReplicaId, ClusterReplica>,
350 pub owner_id: RoleId,
351 pub privileges: PrivilegeMap,
352}
353
354impl Cluster {
355 pub fn role(&self) -> ClusterRole {
357 if self.name == MZ_SYSTEM_CLUSTER.name {
360 ClusterRole::SystemCritical
361 } else if self.name == MZ_CATALOG_SERVER_CLUSTER.name {
362 ClusterRole::System
363 } else {
364 ClusterRole::User
365 }
366 }
367
368 pub fn is_managed(&self) -> bool {
370 matches!(self.config.variant, ClusterVariant::Managed { .. })
371 }
372
373 pub fn user_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
375 self.replicas().filter(|r| !r.config.location.internal())
376 }
377
378 pub fn replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
380 self.replicas_by_id_.values()
381 }
382
383 pub fn replica(&self, replica_id: ReplicaId) -> Option<&ClusterReplica> {
385 self.replicas_by_id_.get(&replica_id)
386 }
387
388 pub fn replica_id(&self, name: &str) -> Option<ReplicaId> {
390 self.replica_id_by_name_.get(name).copied()
391 }
392
393 pub fn availability_zones(&self) -> Option<&[String]> {
395 match &self.config.variant {
396 ClusterVariant::Managed(managed) => Some(&managed.availability_zones),
397 ClusterVariant::Unmanaged => None,
398 }
399 }
400
401 pub fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
402 let name = self.name.clone();
403 let variant = match &self.config.variant {
404 ClusterVariant::Managed(ClusterVariantManaged {
405 size,
406 availability_zones,
407 logging,
408 replication_factor,
409 optimizer_feature_overrides,
410 schedule,
411 }) => {
412 let introspection = match logging {
413 ReplicaLogging {
414 log_logging,
415 interval: Some(interval),
416 } => Some(ComputeReplicaIntrospectionConfig {
417 debugging: *log_logging,
418 interval: interval.clone(),
419 }),
420 ReplicaLogging {
421 log_logging: _,
422 interval: None,
423 } => None,
424 };
425 let compute = ComputeReplicaConfig { introspection };
426 CreateClusterVariant::Managed(CreateClusterManagedPlan {
427 replication_factor: replication_factor.clone(),
428 size: size.clone(),
429 availability_zones: availability_zones.clone(),
430 compute,
431 optimizer_feature_overrides: optimizer_feature_overrides.clone(),
432 schedule: schedule.clone(),
433 })
434 }
435 ClusterVariant::Unmanaged => {
436 return Err(PlanError::Unsupported {
439 feature: "SHOW CREATE for unmanaged clusters".to_string(),
440 discussion_no: None,
441 });
442 }
443 };
444 let workload_class = self.config.workload_class.clone();
445 Ok(CreateClusterPlan {
446 name,
447 variant,
448 workload_class,
449 })
450 }
451}
452
453impl From<Cluster> for durable::Cluster {
454 fn from(cluster: Cluster) -> durable::Cluster {
455 durable::Cluster {
456 id: cluster.id,
457 name: cluster.name,
458 owner_id: cluster.owner_id,
459 privileges: cluster.privileges.into_all_values().collect(),
460 config: cluster.config.into(),
461 }
462 }
463}
464
465impl From<durable::Cluster> for Cluster {
466 fn from(
467 durable::Cluster {
468 id,
469 name,
470 owner_id,
471 privileges,
472 config,
473 }: durable::Cluster,
474 ) -> Self {
475 Cluster {
476 name: name.clone(),
477 id,
478 bound_objects: BTreeSet::new(),
479 log_indexes: BTreeMap::new(),
480 replica_id_by_name_: BTreeMap::new(),
481 replicas_by_id_: BTreeMap::new(),
482 owner_id,
483 privileges: PrivilegeMap::from_mz_acl_items(privileges),
484 config: config.into(),
485 }
486 }
487}
488
489impl UpdateFrom<durable::Cluster> for Cluster {
490 fn update_from(
491 &mut self,
492 durable::Cluster {
493 id,
494 name,
495 owner_id,
496 privileges,
497 config,
498 }: durable::Cluster,
499 ) {
500 self.id = id;
501 self.name = name;
502 self.owner_id = owner_id;
503 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
504 self.config = config.into();
505 }
506}
507
508#[derive(Debug, Serialize, Clone, PartialEq)]
509pub struct ClusterReplica {
510 pub name: String,
511 pub cluster_id: ClusterId,
512 pub replica_id: ReplicaId,
513 pub config: ReplicaConfig,
514 pub owner_id: RoleId,
515}
516
517impl From<ClusterReplica> for durable::ClusterReplica {
518 fn from(replica: ClusterReplica) -> durable::ClusterReplica {
519 durable::ClusterReplica {
520 cluster_id: replica.cluster_id,
521 replica_id: replica.replica_id,
522 name: replica.name,
523 config: replica.config.into(),
524 owner_id: replica.owner_id,
525 }
526 }
527}
528
529#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
530pub struct ClusterReplicaProcessStatus {
531 pub status: ClusterStatus,
532 pub time: DateTime<Utc>,
533}
534
535#[derive(Debug, Serialize, Clone, PartialEq)]
536pub struct SourceReferences {
537 pub updated_at: u64,
538 pub references: Vec<SourceReference>,
539}
540
541#[derive(Debug, Serialize, Clone, PartialEq)]
542pub struct SourceReference {
543 pub name: String,
544 pub namespace: Option<String>,
545 pub columns: Vec<String>,
546}
547
548impl From<SourceReference> for durable::SourceReference {
549 fn from(source_reference: SourceReference) -> durable::SourceReference {
550 durable::SourceReference {
551 name: source_reference.name,
552 namespace: source_reference.namespace,
553 columns: source_reference.columns,
554 }
555 }
556}
557
558impl SourceReferences {
559 pub fn to_durable(self, source_id: CatalogItemId) -> durable::SourceReferences {
560 durable::SourceReferences {
561 source_id,
562 updated_at: self.updated_at,
563 references: self.references.into_iter().map(Into::into).collect(),
564 }
565 }
566}
567
568impl From<durable::SourceReference> for SourceReference {
569 fn from(source_reference: durable::SourceReference) -> SourceReference {
570 SourceReference {
571 name: source_reference.name,
572 namespace: source_reference.namespace,
573 columns: source_reference.columns,
574 }
575 }
576}
577
578impl From<durable::SourceReferences> for SourceReferences {
579 fn from(source_references: durable::SourceReferences) -> SourceReferences {
580 SourceReferences {
581 updated_at: source_references.updated_at,
582 references: source_references
583 .references
584 .into_iter()
585 .map(|source_reference| source_reference.into())
586 .collect(),
587 }
588 }
589}
590
591impl From<mz_sql::plan::SourceReference> for SourceReference {
592 fn from(source_reference: mz_sql::plan::SourceReference) -> SourceReference {
593 SourceReference {
594 name: source_reference.name,
595 namespace: source_reference.namespace,
596 columns: source_reference.columns,
597 }
598 }
599}
600
601impl From<mz_sql::plan::SourceReferences> for SourceReferences {
602 fn from(source_references: mz_sql::plan::SourceReferences) -> SourceReferences {
603 SourceReferences {
604 updated_at: source_references.updated_at,
605 references: source_references
606 .references
607 .into_iter()
608 .map(|source_reference| source_reference.into())
609 .collect(),
610 }
611 }
612}
613
614impl From<SourceReferences> for mz_sql::plan::SourceReferences {
615 fn from(source_references: SourceReferences) -> mz_sql::plan::SourceReferences {
616 mz_sql::plan::SourceReferences {
617 updated_at: source_references.updated_at,
618 references: source_references
619 .references
620 .into_iter()
621 .map(|source_reference| source_reference.into())
622 .collect(),
623 }
624 }
625}
626
627impl From<SourceReference> for mz_sql::plan::SourceReference {
628 fn from(source_reference: SourceReference) -> mz_sql::plan::SourceReference {
629 mz_sql::plan::SourceReference {
630 name: source_reference.name,
631 namespace: source_reference.namespace,
632 columns: source_reference.columns,
633 }
634 }
635}
636
637#[derive(Clone, Debug, Serialize)]
638pub struct CatalogEntry {
639 pub item: CatalogItem,
640 #[serde(skip)]
641 pub referenced_by: Vec<CatalogItemId>,
642 #[serde(skip)]
646 pub used_by: Vec<CatalogItemId>,
647 pub id: CatalogItemId,
648 pub oid: u32,
649 pub name: QualifiedItemName,
650 pub owner_id: RoleId,
651 pub privileges: PrivilegeMap,
652}
653
654#[derive(Clone, Debug)]
672pub struct CatalogCollectionEntry {
673 pub entry: CatalogEntry,
674 pub version: RelationVersionSelector,
675}
676
677impl CatalogCollectionEntry {
678 pub fn desc(&self, name: &FullItemName) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
679 self.item().desc(name, self.version)
680 }
681
682 pub fn desc_opt(&self) -> Option<Cow<'_, RelationDesc>> {
683 self.item().desc_opt(self.version)
684 }
685}
686
687impl mz_sql::catalog::CatalogCollectionItem for CatalogCollectionEntry {
688 fn desc(&self, name: &FullItemName) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
689 CatalogCollectionEntry::desc(self, name)
690 }
691
692 fn global_id(&self) -> GlobalId {
693 self.entry
694 .item()
695 .global_id_for_version(self.version)
696 .expect("catalog corruption, missing version!")
697 }
698}
699
700impl Deref for CatalogCollectionEntry {
701 type Target = CatalogEntry;
702
703 fn deref(&self) -> &CatalogEntry {
704 &self.entry
705 }
706}
707
708impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry {
709 fn name(&self) -> &QualifiedItemName {
710 self.entry.name()
711 }
712
713 fn id(&self) -> CatalogItemId {
714 self.entry.id()
715 }
716
717 fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
718 Box::new(self.entry.global_ids())
719 }
720
721 fn oid(&self) -> u32 {
722 self.entry.oid()
723 }
724
725 fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
726 self.entry.func()
727 }
728
729 fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
730 self.entry.source_desc()
731 }
732
733 fn connection(
734 &self,
735 ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
736 {
737 mz_sql::catalog::CatalogItem::connection(&self.entry)
738 }
739
740 fn create_sql(&self) -> &str {
741 self.entry.create_sql()
742 }
743
744 fn item_type(&self) -> SqlCatalogItemType {
745 self.entry.item_type()
746 }
747
748 fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
749 self.entry.index_details()
750 }
751
752 fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
753 self.entry.writable_table_details()
754 }
755
756 fn replacement_target(&self) -> Option<CatalogItemId> {
757 self.entry.replacement_target()
758 }
759
760 fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
761 self.entry.type_details()
762 }
763
764 fn references(&self) -> &ResolvedIds {
765 self.entry.references()
766 }
767
768 fn uses(&self) -> BTreeSet<CatalogItemId> {
769 self.entry.uses()
770 }
771
772 fn referenced_by(&self) -> &[CatalogItemId] {
773 self.entry.referenced_by()
774 }
775
776 fn used_by(&self) -> &[CatalogItemId] {
777 self.entry.used_by()
778 }
779
780 fn subsource_details(
781 &self,
782 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
783 self.entry.subsource_details()
784 }
785
786 fn source_export_details(
787 &self,
788 ) -> Option<(
789 CatalogItemId,
790 &UnresolvedItemName,
791 &SourceExportDetails,
792 &SourceExportDataConfig<ReferencedConnection>,
793 )> {
794 self.entry.source_export_details()
795 }
796
797 fn is_progress_source(&self) -> bool {
798 self.entry.is_progress_source()
799 }
800
801 fn progress_id(&self) -> Option<CatalogItemId> {
802 self.entry.progress_id()
803 }
804
805 fn owner_id(&self) -> RoleId {
806 *self.entry.owner_id()
807 }
808
809 fn privileges(&self) -> &PrivilegeMap {
810 self.entry.privileges()
811 }
812
813 fn cluster_id(&self) -> Option<ClusterId> {
814 self.entry.item().cluster_id()
815 }
816
817 fn at_version(
818 &self,
819 version: RelationVersionSelector,
820 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
821 Box::new(CatalogCollectionEntry {
822 entry: self.entry.clone(),
823 version,
824 })
825 }
826
827 fn latest_version(&self) -> Option<RelationVersion> {
828 self.entry.latest_version()
829 }
830}
831
832#[derive(Debug, Clone, Serialize)]
833pub enum CatalogItem {
834 Table(Table),
835 Source(Source),
836 Log(Log),
837 View(View),
838 MaterializedView(MaterializedView),
839 Sink(Sink),
840 Index(Index),
841 Type(Type),
842 Func(Func),
843 Secret(Secret),
844 Connection(Connection),
845 ContinualTask(ContinualTask),
846}
847
848impl From<CatalogEntry> for durable::Item {
849 fn from(entry: CatalogEntry) -> durable::Item {
850 let (create_sql, global_id, extra_versions) = entry.item.into_serialized();
851 durable::Item {
852 id: entry.id,
853 oid: entry.oid,
854 global_id,
855 schema_id: entry.name.qualifiers.schema_spec.into(),
856 name: entry.name.item,
857 create_sql,
858 owner_id: entry.owner_id,
859 privileges: entry.privileges.into_all_values().collect(),
860 extra_versions,
861 }
862 }
863}
864
865#[derive(Debug, Clone, Serialize)]
866pub struct Table {
867 pub create_sql: Option<String>,
869 pub desc: VersionedRelationDesc,
871 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
873 pub collections: BTreeMap<RelationVersion, GlobalId>,
874 #[serde(skip)]
876 pub conn_id: Option<ConnectionId>,
877 pub resolved_ids: ResolvedIds,
879 pub custom_logical_compaction_window: Option<CompactionWindow>,
881 pub is_retained_metrics_object: bool,
886 pub data_source: TableDataSource,
888}
889
890impl Table {
891 pub fn timeline(&self) -> Timeline {
892 match &self.data_source {
893 TableDataSource::TableWrites { .. } => Timeline::EpochMilliseconds,
896 TableDataSource::DataSource { timeline, .. } => timeline.clone(),
897 }
898 }
899
900 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
902 self.collections.values().copied()
903 }
904
905 pub fn global_id_writes(&self) -> GlobalId {
907 *self
908 .collections
909 .last_key_value()
910 .expect("at least one version of a table")
911 .1
912 }
913
914 pub fn collection_descs(
916 &self,
917 ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
918 self.collections.iter().map(|(version, gid)| {
919 let desc = self
920 .desc
921 .at_version(RelationVersionSelector::Specific(*version));
922 (*gid, *version, desc)
923 })
924 }
925
926 pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
928 let (version, _gid) = self
929 .collections
930 .iter()
931 .find(|(_version, gid)| *gid == id)
932 .expect("GlobalId to exist");
933 self.desc
934 .at_version(RelationVersionSelector::Specific(*version))
935 }
936}
937
938#[derive(Clone, Debug, Serialize)]
939pub enum TableDataSource {
940 TableWrites {
942 #[serde(skip)]
943 defaults: Vec<Expr<Aug>>,
944 },
945
946 DataSource {
949 desc: DataSourceDesc,
950 timeline: Timeline,
951 },
952}
953
954#[derive(Debug, Clone, Serialize)]
955pub enum DataSourceDesc {
956 Ingestion {
958 desc: SourceDesc<ReferencedConnection>,
959 cluster_id: ClusterId,
960 },
961 OldSyntaxIngestion {
963 desc: SourceDesc<ReferencedConnection>,
964 cluster_id: ClusterId,
965 progress_subsource: CatalogItemId,
968 data_config: SourceExportDataConfig<ReferencedConnection>,
969 details: SourceExportDetails,
970 },
971 IngestionExport {
979 ingestion_id: CatalogItemId,
980 external_reference: UnresolvedItemName,
981 details: SourceExportDetails,
982 data_config: SourceExportDataConfig<ReferencedConnection>,
983 },
984 Introspection(IntrospectionType),
986 Progress,
988 Webhook {
990 validate_using: Option<WebhookValidation>,
992 body_format: WebhookBodyFormat,
994 headers: WebhookHeaders,
996 cluster_id: ClusterId,
998 },
999}
1000
1001impl DataSourceDesc {
1002 pub fn formats(&self) -> (Option<&str>, Option<&str>) {
1004 match &self {
1005 DataSourceDesc::Ingestion { .. } => (None, None),
1006 DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1007 match &data_config.encoding.as_ref() {
1008 Some(encoding) => match &encoding.key {
1009 Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1010 None => (None, Some(encoding.value.type_())),
1011 },
1012 None => (None, None),
1013 }
1014 }
1015 DataSourceDesc::IngestionExport { data_config, .. } => match &data_config.encoding {
1016 Some(encoding) => match &encoding.key {
1017 Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1018 None => (None, Some(encoding.value.type_())),
1019 },
1020 None => (None, None),
1021 },
1022 DataSourceDesc::Introspection(_)
1023 | DataSourceDesc::Webhook { .. }
1024 | DataSourceDesc::Progress => (None, None),
1025 }
1026 }
1027
1028 pub fn envelope(&self) -> Option<&str> {
1030 fn envelope_string(envelope: &SourceEnvelope) -> &str {
1035 match envelope {
1036 SourceEnvelope::None(_) => "none",
1037 SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
1038 mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert",
1039 mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
1040 "debezium"
1044 }
1045 mz_storage_types::sources::envelope::UpsertStyle::ValueErrInline { .. } => {
1046 "upsert-value-err-inline"
1047 }
1048 },
1049 SourceEnvelope::CdcV2 => {
1050 "materialize"
1053 }
1054 }
1055 }
1056
1057 match self {
1058 DataSourceDesc::Ingestion { .. } => None,
1063 DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1064 Some(envelope_string(&data_config.envelope))
1065 }
1066 DataSourceDesc::IngestionExport { data_config, .. } => {
1067 Some(envelope_string(&data_config.envelope))
1068 }
1069 DataSourceDesc::Introspection(_)
1070 | DataSourceDesc::Webhook { .. }
1071 | DataSourceDesc::Progress => None,
1072 }
1073 }
1074}
1075
1076#[derive(Debug, Clone, Serialize)]
1077pub struct Source {
1078 pub create_sql: Option<String>,
1080 pub global_id: GlobalId,
1082 #[serde(skip)]
1084 pub data_source: DataSourceDesc,
1085 pub desc: RelationDesc,
1087 pub timeline: Timeline,
1089 pub resolved_ids: ResolvedIds,
1091 pub custom_logical_compaction_window: Option<CompactionWindow>,
1095 pub is_retained_metrics_object: bool,
1098}
1099
1100impl Source {
1101 pub fn new(
1108 plan: CreateSourcePlan,
1109 global_id: GlobalId,
1110 resolved_ids: ResolvedIds,
1111 custom_logical_compaction_window: Option<CompactionWindow>,
1112 is_retained_metrics_object: bool,
1113 ) -> Source {
1114 Source {
1115 create_sql: Some(plan.source.create_sql),
1116 data_source: match plan.source.data_source {
1117 mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1118 desc,
1119 cluster_id: plan
1120 .in_cluster
1121 .expect("ingestion-based sources must be given a cluster ID"),
1122 },
1123 mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1124 desc,
1125 progress_subsource,
1126 data_config,
1127 details,
1128 } => DataSourceDesc::OldSyntaxIngestion {
1129 desc,
1130 cluster_id: plan
1131 .in_cluster
1132 .expect("ingestion-based sources must be given a cluster ID"),
1133 progress_subsource,
1134 data_config,
1135 details,
1136 },
1137 mz_sql::plan::DataSourceDesc::Progress => {
1138 assert!(
1139 plan.in_cluster.is_none(),
1140 "subsources must not have a host config or cluster_id defined"
1141 );
1142 DataSourceDesc::Progress
1143 }
1144 mz_sql::plan::DataSourceDesc::IngestionExport {
1145 ingestion_id,
1146 external_reference,
1147 details,
1148 data_config,
1149 } => {
1150 assert!(
1151 plan.in_cluster.is_none(),
1152 "subsources must not have a host config or cluster_id defined"
1153 );
1154 DataSourceDesc::IngestionExport {
1155 ingestion_id,
1156 external_reference,
1157 details,
1158 data_config,
1159 }
1160 }
1161 mz_sql::plan::DataSourceDesc::Webhook {
1162 validate_using,
1163 body_format,
1164 headers,
1165 cluster_id,
1166 } => {
1167 mz_ore::soft_assert_or_log!(
1168 cluster_id.is_none(),
1169 "cluster_id set at Source level for Webhooks"
1170 );
1171 DataSourceDesc::Webhook {
1172 validate_using,
1173 body_format,
1174 headers,
1175 cluster_id: plan
1176 .in_cluster
1177 .expect("webhook sources must be given a cluster ID"),
1178 }
1179 }
1180 },
1181 desc: plan.source.desc,
1182 global_id,
1183 timeline: plan.timeline,
1184 resolved_ids,
1185 custom_logical_compaction_window: plan
1186 .source
1187 .compaction_window
1188 .or(custom_logical_compaction_window),
1189 is_retained_metrics_object,
1190 }
1191 }
1192
1193 pub fn source_type(&self) -> &str {
1195 match &self.data_source {
1196 DataSourceDesc::Ingestion { desc, .. }
1197 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.name(),
1198 DataSourceDesc::Progress => "progress",
1199 DataSourceDesc::IngestionExport { .. } => "subsource",
1200 DataSourceDesc::Introspection(_) => "source",
1201 DataSourceDesc::Webhook { .. } => "webhook",
1202 }
1203 }
1204
1205 pub fn connection_id(&self) -> Option<CatalogItemId> {
1207 match &self.data_source {
1208 DataSourceDesc::Ingestion { desc, .. }
1209 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.connection_id(),
1210 DataSourceDesc::IngestionExport { .. }
1211 | DataSourceDesc::Introspection(_)
1212 | DataSourceDesc::Webhook { .. }
1213 | DataSourceDesc::Progress => None,
1214 }
1215 }
1216
1217 pub fn global_id(&self) -> GlobalId {
1219 self.global_id
1220 }
1221
1222 pub fn user_controllable_persist_shard_count(&self) -> i64 {
1230 match &self.data_source {
1231 DataSourceDesc::Ingestion { .. } => 0,
1232 DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1233 match &desc.connection {
1234 GenericSourceConnection::Postgres(_)
1238 | GenericSourceConnection::MySql(_)
1239 | GenericSourceConnection::SqlServer(_) => 0,
1240 GenericSourceConnection::LoadGenerator(lg) => match lg.load_generator {
1241 LoadGenerator::Clock
1243 | LoadGenerator::Counter { .. }
1244 | LoadGenerator::Datums
1245 | LoadGenerator::KeyValue(_) => 1,
1246 LoadGenerator::Auction
1247 | LoadGenerator::Marketing
1248 | LoadGenerator::Tpch { .. } => 0,
1249 },
1250 GenericSourceConnection::Kafka(_) => 1,
1251 }
1252 }
1253 DataSourceDesc::IngestionExport { .. } => 1,
1256 DataSourceDesc::Webhook { .. } => 1,
1257 DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => 0,
1260 }
1261 }
1262}
1263
1264#[derive(Debug, Clone, Serialize)]
1265pub struct Log {
1266 pub variant: LogVariant,
1268 pub global_id: GlobalId,
1270}
1271
1272impl Log {
1273 pub fn global_id(&self) -> GlobalId {
1275 self.global_id
1276 }
1277}
1278
1279#[derive(Debug, Clone, Serialize)]
1280pub struct Sink {
1281 pub create_sql: String,
1283 pub global_id: GlobalId,
1285 pub from: GlobalId,
1287 pub connection: StorageSinkConnection<ReferencedConnection>,
1289 pub envelope: SinkEnvelope,
1293 pub with_snapshot: bool,
1295 pub version: u64,
1297 pub resolved_ids: ResolvedIds,
1299 pub cluster_id: ClusterId,
1301}
1302
1303impl Sink {
1304 pub fn sink_type(&self) -> &str {
1305 self.connection.name()
1306 }
1307
1308 pub fn envelope(&self) -> Option<&str> {
1310 match &self.envelope {
1311 SinkEnvelope::Debezium => Some("debezium"),
1312 SinkEnvelope::Upsert => Some("upsert"),
1313 }
1314 }
1315
1316 pub fn combined_format(&self) -> Option<Cow<'_, str>> {
1321 match &self.connection {
1322 StorageSinkConnection::Kafka(connection) => Some(connection.format.get_format_name()),
1323 _ => None,
1324 }
1325 }
1326
1327 pub fn formats(&self) -> Option<(Option<&str>, &str)> {
1329 match &self.connection {
1330 StorageSinkConnection::Kafka(connection) => {
1331 let key_format = connection
1332 .format
1333 .key_format
1334 .as_ref()
1335 .map(|f| f.get_format_name());
1336 let value_format = connection.format.value_format.get_format_name();
1337 Some((key_format, value_format))
1338 }
1339 _ => None,
1340 }
1341 }
1342
1343 pub fn connection_id(&self) -> Option<CatalogItemId> {
1344 self.connection.connection_id()
1345 }
1346
1347 pub fn global_id(&self) -> GlobalId {
1349 self.global_id
1350 }
1351}
1352
1353#[derive(Debug, Clone, Serialize)]
1354pub struct View {
1355 pub create_sql: String,
1357 pub global_id: GlobalId,
1359 pub raw_expr: Arc<HirRelationExpr>,
1361 pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1363 pub desc: RelationDesc,
1365 pub conn_id: Option<ConnectionId>,
1367 pub resolved_ids: ResolvedIds,
1369 pub dependencies: DependencyIds,
1371}
1372
1373impl View {
1374 pub fn global_id(&self) -> GlobalId {
1376 self.global_id
1377 }
1378}
1379
1380#[derive(Debug, Clone, Serialize)]
1381pub struct MaterializedView {
1382 pub create_sql: String,
1384 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
1386 pub collections: BTreeMap<RelationVersion, GlobalId>,
1387 pub raw_expr: Arc<HirRelationExpr>,
1389 pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1391 pub desc: VersionedRelationDesc,
1393 pub resolved_ids: ResolvedIds,
1395 pub dependencies: DependencyIds,
1397 pub replacement_target: Option<CatalogItemId>,
1399 pub cluster_id: ClusterId,
1401 pub non_null_assertions: Vec<usize>,
1405 pub custom_logical_compaction_window: Option<CompactionWindow>,
1407 pub refresh_schedule: Option<RefreshSchedule>,
1409 pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1414}
1415
1416impl MaterializedView {
1417 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1419 self.collections.values().copied()
1420 }
1421
1422 pub fn global_id_writes(&self) -> GlobalId {
1425 *self
1426 .collections
1427 .last_key_value()
1428 .expect("at least one version of a materialized view")
1429 .1
1430 }
1431
1432 pub fn collection_descs(
1434 &self,
1435 ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
1436 self.collections.iter().map(|(version, gid)| {
1437 let desc = self
1438 .desc
1439 .at_version(RelationVersionSelector::Specific(*version));
1440 (*gid, *version, desc)
1441 })
1442 }
1443
1444 pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
1446 let (version, _gid) = self
1447 .collections
1448 .iter()
1449 .find(|(_version, gid)| *gid == id)
1450 .expect("GlobalId to exist");
1451 self.desc
1452 .at_version(RelationVersionSelector::Specific(*version))
1453 }
1454
1455 pub fn apply_replacement(&mut self, replacement: Self) {
1457 let target_id = replacement
1458 .replacement_target
1459 .expect("replacement has target");
1460
1461 fn parse(create_sql: &str) -> mz_sql::ast::CreateMaterializedViewStatement<Raw> {
1462 let res = mz_sql::parse::parse(create_sql).unwrap_or_else(|e| {
1463 panic!("invalid create_sql persisted in catalog: {e}\n{create_sql}");
1464 });
1465 if let Statement::CreateMaterializedView(cmvs) = res.into_element().ast {
1466 cmvs
1467 } else {
1468 panic!("invalid MV create_sql persisted in catalog\n{create_sql}");
1469 }
1470 }
1471
1472 let old_stmt = parse(&self.create_sql);
1473 let rpl_stmt = parse(&replacement.create_sql);
1474 let new_stmt = mz_sql::ast::CreateMaterializedViewStatement {
1475 if_exists: old_stmt.if_exists,
1476 name: old_stmt.name,
1477 columns: rpl_stmt.columns,
1478 replacing: None,
1479 in_cluster: rpl_stmt.in_cluster,
1480 query: rpl_stmt.query,
1481 as_of: rpl_stmt.as_of,
1482 with_options: rpl_stmt.with_options,
1483 };
1484 let create_sql = new_stmt.to_ast_string_stable();
1485
1486 let mut collections = std::mem::take(&mut self.collections);
1487 let latest_version = collections.keys().max().expect("at least one version");
1491 let new_version = latest_version.bump();
1492 collections.insert(new_version, replacement.global_id_writes());
1493
1494 let mut resolved_ids = replacement.resolved_ids;
1495 resolved_ids.remove_item(&target_id);
1496 let mut dependencies = replacement.dependencies;
1497 dependencies.0.remove(&target_id);
1498
1499 *self = Self {
1500 create_sql,
1501 collections,
1502 raw_expr: replacement.raw_expr,
1503 optimized_expr: replacement.optimized_expr,
1504 desc: replacement.desc,
1505 resolved_ids,
1506 dependencies,
1507 replacement_target: None,
1508 cluster_id: replacement.cluster_id,
1509 non_null_assertions: replacement.non_null_assertions,
1510 custom_logical_compaction_window: replacement.custom_logical_compaction_window,
1511 refresh_schedule: replacement.refresh_schedule,
1512 initial_as_of: replacement.initial_as_of,
1513 };
1514 }
1515}
1516
1517#[derive(Debug, Clone, Serialize)]
1518pub struct Index {
1519 pub create_sql: String,
1521 pub global_id: GlobalId,
1523 pub on: GlobalId,
1525 pub keys: Arc<[MirScalarExpr]>,
1527 pub conn_id: Option<ConnectionId>,
1529 pub resolved_ids: ResolvedIds,
1531 pub cluster_id: ClusterId,
1533 pub custom_logical_compaction_window: Option<CompactionWindow>,
1535 pub is_retained_metrics_object: bool,
1540}
1541
1542impl Index {
1543 pub fn global_id(&self) -> GlobalId {
1545 self.global_id
1546 }
1547}
1548
1549#[derive(Debug, Clone, Serialize)]
1550pub struct Type {
1551 pub create_sql: Option<String>,
1553 pub global_id: GlobalId,
1555 #[serde(skip)]
1556 pub details: CatalogTypeDetails<IdReference>,
1557 pub resolved_ids: ResolvedIds,
1559}
1560
1561#[derive(Debug, Clone, Serialize)]
1562pub struct Func {
1563 #[serde(skip)]
1565 pub inner: &'static mz_sql::func::Func,
1566 pub global_id: GlobalId,
1568}
1569
1570#[derive(Debug, Clone, Serialize)]
1571pub struct Secret {
1572 pub create_sql: String,
1574 pub global_id: GlobalId,
1576}
1577
1578#[derive(Debug, Clone, Serialize)]
1579pub struct Connection {
1580 pub create_sql: String,
1582 pub global_id: GlobalId,
1584 pub details: ConnectionDetails,
1586 pub resolved_ids: ResolvedIds,
1588}
1589
1590impl Connection {
1591 pub fn global_id(&self) -> GlobalId {
1593 self.global_id
1594 }
1595}
1596
1597#[derive(Debug, Clone, Serialize)]
1598pub struct ContinualTask {
1599 pub create_sql: String,
1601 pub global_id: GlobalId,
1603 pub input_id: GlobalId,
1605 pub with_snapshot: bool,
1606 pub raw_expr: Arc<HirRelationExpr>,
1611 pub desc: RelationDesc,
1613 pub resolved_ids: ResolvedIds,
1615 pub dependencies: DependencyIds,
1617 pub cluster_id: ClusterId,
1619 pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1621}
1622
1623impl ContinualTask {
1624 pub fn global_id(&self) -> GlobalId {
1626 self.global_id
1627 }
1628}
1629
1630#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
1631pub struct NetworkPolicy {
1632 pub name: String,
1633 pub id: NetworkPolicyId,
1634 pub oid: u32,
1635 pub rules: Vec<NetworkPolicyRule>,
1636 pub owner_id: RoleId,
1637 pub privileges: PrivilegeMap,
1638}
1639
1640impl From<NetworkPolicy> for durable::NetworkPolicy {
1641 fn from(policy: NetworkPolicy) -> durable::NetworkPolicy {
1642 durable::NetworkPolicy {
1643 id: policy.id,
1644 oid: policy.oid,
1645 name: policy.name,
1646 rules: policy.rules,
1647 owner_id: policy.owner_id,
1648 privileges: policy.privileges.into_all_values().collect(),
1649 }
1650 }
1651}
1652
1653impl From<durable::NetworkPolicy> for NetworkPolicy {
1654 fn from(
1655 durable::NetworkPolicy {
1656 id,
1657 oid,
1658 name,
1659 rules,
1660 owner_id,
1661 privileges,
1662 }: durable::NetworkPolicy,
1663 ) -> Self {
1664 NetworkPolicy {
1665 id,
1666 oid,
1667 name,
1668 rules,
1669 owner_id,
1670 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1671 }
1672 }
1673}
1674
1675impl UpdateFrom<durable::NetworkPolicy> for NetworkPolicy {
1676 fn update_from(
1677 &mut self,
1678 durable::NetworkPolicy {
1679 id,
1680 oid,
1681 name,
1682 rules,
1683 owner_id,
1684 privileges,
1685 }: durable::NetworkPolicy,
1686 ) {
1687 self.id = id;
1688 self.oid = oid;
1689 self.name = name;
1690 self.rules = rules;
1691 self.owner_id = owner_id;
1692 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1693 }
1694}
1695
1696impl CatalogItem {
1697 pub fn typ(&self) -> mz_sql::catalog::CatalogItemType {
1699 match self {
1700 CatalogItem::Table(_) => CatalogItemType::Table,
1701 CatalogItem::Source(_) => CatalogItemType::Source,
1702 CatalogItem::Log(_) => CatalogItemType::Source,
1703 CatalogItem::Sink(_) => CatalogItemType::Sink,
1704 CatalogItem::View(_) => CatalogItemType::View,
1705 CatalogItem::MaterializedView(_) => CatalogItemType::MaterializedView,
1706 CatalogItem::Index(_) => CatalogItemType::Index,
1707 CatalogItem::Type(_) => CatalogItemType::Type,
1708 CatalogItem::Func(_) => CatalogItemType::Func,
1709 CatalogItem::Secret(_) => CatalogItemType::Secret,
1710 CatalogItem::Connection(_) => CatalogItemType::Connection,
1711 CatalogItem::ContinualTask(_) => CatalogItemType::ContinualTask,
1712 }
1713 }
1714
1715 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1717 let gid = match self {
1718 CatalogItem::Source(source) => source.global_id,
1719 CatalogItem::Log(log) => log.global_id,
1720 CatalogItem::Sink(sink) => sink.global_id,
1721 CatalogItem::View(view) => view.global_id,
1722 CatalogItem::MaterializedView(mv) => {
1723 return itertools::Either::Left(mv.collections.values().copied());
1724 }
1725 CatalogItem::ContinualTask(ct) => ct.global_id,
1726 CatalogItem::Index(index) => index.global_id,
1727 CatalogItem::Func(func) => func.global_id,
1728 CatalogItem::Type(ty) => ty.global_id,
1729 CatalogItem::Secret(secret) => secret.global_id,
1730 CatalogItem::Connection(conn) => conn.global_id,
1731 CatalogItem::Table(table) => {
1732 return itertools::Either::Left(table.collections.values().copied());
1733 }
1734 };
1735 itertools::Either::Right(std::iter::once(gid))
1736 }
1737
1738 pub fn latest_global_id(&self) -> GlobalId {
1742 match self {
1743 CatalogItem::Source(source) => source.global_id,
1744 CatalogItem::Log(log) => log.global_id,
1745 CatalogItem::Sink(sink) => sink.global_id,
1746 CatalogItem::View(view) => view.global_id,
1747 CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
1748 CatalogItem::ContinualTask(ct) => ct.global_id,
1749 CatalogItem::Index(index) => index.global_id,
1750 CatalogItem::Func(func) => func.global_id,
1751 CatalogItem::Type(ty) => ty.global_id,
1752 CatalogItem::Secret(secret) => secret.global_id,
1753 CatalogItem::Connection(conn) => conn.global_id,
1754 CatalogItem::Table(table) => table.global_id_writes(),
1755 }
1756 }
1757
1758 pub fn is_storage_collection(&self) -> bool {
1760 match self {
1761 CatalogItem::Table(_)
1762 | CatalogItem::Source(_)
1763 | CatalogItem::MaterializedView(_)
1764 | CatalogItem::Sink(_)
1765 | CatalogItem::ContinualTask(_) => true,
1766 CatalogItem::Log(_)
1767 | CatalogItem::View(_)
1768 | CatalogItem::Index(_)
1769 | CatalogItem::Type(_)
1770 | CatalogItem::Func(_)
1771 | CatalogItem::Secret(_)
1772 | CatalogItem::Connection(_) => false,
1773 }
1774 }
1775
1776 pub fn desc(
1785 &self,
1786 name: &FullItemName,
1787 version: RelationVersionSelector,
1788 ) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
1789 self.desc_opt(version)
1790 .ok_or_else(|| SqlCatalogError::InvalidDependency {
1791 name: name.to_string(),
1792 typ: self.typ(),
1793 })
1794 }
1795
1796 pub fn desc_opt(&self, version: RelationVersionSelector) -> Option<Cow<'_, RelationDesc>> {
1797 match &self {
1798 CatalogItem::Source(src) => Some(Cow::Borrowed(&src.desc)),
1799 CatalogItem::Log(log) => Some(Cow::Owned(log.variant.desc())),
1800 CatalogItem::Table(tbl) => Some(Cow::Owned(tbl.desc.at_version(version))),
1801 CatalogItem::View(view) => Some(Cow::Borrowed(&view.desc)),
1802 CatalogItem::MaterializedView(mview) => {
1803 Some(Cow::Owned(mview.desc.at_version(version)))
1804 }
1805 CatalogItem::ContinualTask(ct) => Some(Cow::Borrowed(&ct.desc)),
1806 CatalogItem::Func(_)
1807 | CatalogItem::Index(_)
1808 | CatalogItem::Sink(_)
1809 | CatalogItem::Secret(_)
1810 | CatalogItem::Connection(_)
1811 | CatalogItem::Type(_) => None,
1812 }
1813 }
1814
1815 pub fn func(
1816 &self,
1817 entry: &CatalogEntry,
1818 ) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
1819 match &self {
1820 CatalogItem::Func(func) => Ok(func.inner),
1821 _ => Err(SqlCatalogError::UnexpectedType {
1822 name: entry.name().item.to_string(),
1823 actual_type: entry.item_type(),
1824 expected_type: CatalogItemType::Func,
1825 }),
1826 }
1827 }
1828
1829 pub fn source_desc(
1830 &self,
1831 entry: &CatalogEntry,
1832 ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
1833 match &self {
1834 CatalogItem::Source(source) => match &source.data_source {
1835 DataSourceDesc::Ingestion { desc, .. }
1836 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => Ok(Some(desc)),
1837 DataSourceDesc::IngestionExport { .. }
1838 | DataSourceDesc::Introspection(_)
1839 | DataSourceDesc::Webhook { .. }
1840 | DataSourceDesc::Progress => Ok(None),
1841 },
1842 _ => Err(SqlCatalogError::UnexpectedType {
1843 name: entry.name().item.to_string(),
1844 actual_type: entry.item_type(),
1845 expected_type: CatalogItemType::Source,
1846 }),
1847 }
1848 }
1849
1850 pub fn is_progress_source(&self) -> bool {
1852 matches!(
1853 self,
1854 CatalogItem::Source(Source {
1855 data_source: DataSourceDesc::Progress,
1856 ..
1857 })
1858 )
1859 }
1860
1861 pub fn references(&self) -> &ResolvedIds {
1864 static EMPTY: LazyLock<ResolvedIds> = LazyLock::new(ResolvedIds::empty);
1865 match self {
1866 CatalogItem::Func(_) => &*EMPTY,
1867 CatalogItem::Index(idx) => &idx.resolved_ids,
1868 CatalogItem::Sink(sink) => &sink.resolved_ids,
1869 CatalogItem::Source(source) => &source.resolved_ids,
1870 CatalogItem::Log(_) => &*EMPTY,
1871 CatalogItem::Table(table) => &table.resolved_ids,
1872 CatalogItem::Type(typ) => &typ.resolved_ids,
1873 CatalogItem::View(view) => &view.resolved_ids,
1874 CatalogItem::MaterializedView(mview) => &mview.resolved_ids,
1875 CatalogItem::Secret(_) => &*EMPTY,
1876 CatalogItem::Connection(connection) => &connection.resolved_ids,
1877 CatalogItem::ContinualTask(ct) => &ct.resolved_ids,
1878 }
1879 }
1880
1881 pub fn uses(&self) -> BTreeSet<CatalogItemId> {
1887 let mut uses: BTreeSet<_> = self.references().items().copied().collect();
1888 match self {
1889 CatalogItem::Func(_) => {}
1892 CatalogItem::Index(_) => {}
1893 CatalogItem::Sink(_) => {}
1894 CatalogItem::Source(_) => {}
1895 CatalogItem::Log(_) => {}
1896 CatalogItem::Table(_) => {}
1897 CatalogItem::Type(_) => {}
1898 CatalogItem::View(view) => uses.extend(view.dependencies.0.iter().copied()),
1899 CatalogItem::MaterializedView(mview) => {
1900 uses.extend(mview.dependencies.0.iter().copied())
1901 }
1902 CatalogItem::ContinualTask(ct) => uses.extend(ct.dependencies.0.iter().copied()),
1903 CatalogItem::Secret(_) => {}
1904 CatalogItem::Connection(_) => {}
1905 }
1906 uses
1907 }
1908
1909 pub fn conn_id(&self) -> Option<&ConnectionId> {
1912 match self {
1913 CatalogItem::View(view) => view.conn_id.as_ref(),
1914 CatalogItem::Index(index) => index.conn_id.as_ref(),
1915 CatalogItem::Table(table) => table.conn_id.as_ref(),
1916 CatalogItem::Log(_)
1917 | CatalogItem::Source(_)
1918 | CatalogItem::Sink(_)
1919 | CatalogItem::MaterializedView(_)
1920 | CatalogItem::Secret(_)
1921 | CatalogItem::Type(_)
1922 | CatalogItem::Func(_)
1923 | CatalogItem::Connection(_)
1924 | CatalogItem::ContinualTask(_) => None,
1925 }
1926 }
1927
1928 pub fn set_conn_id(&mut self, conn_id: Option<ConnectionId>) {
1931 match self {
1932 CatalogItem::View(view) => view.conn_id = conn_id,
1933 CatalogItem::Index(index) => index.conn_id = conn_id,
1934 CatalogItem::Table(table) => table.conn_id = conn_id,
1935 CatalogItem::Log(_)
1936 | CatalogItem::Source(_)
1937 | CatalogItem::Sink(_)
1938 | CatalogItem::MaterializedView(_)
1939 | CatalogItem::Secret(_)
1940 | CatalogItem::Type(_)
1941 | CatalogItem::Func(_)
1942 | CatalogItem::Connection(_)
1943 | CatalogItem::ContinualTask(_) => (),
1944 }
1945 }
1946
1947 pub fn is_temporary(&self) -> bool {
1949 self.conn_id().is_some()
1950 }
1951
1952 pub fn rename_schema_refs(
1953 &self,
1954 database_name: &str,
1955 cur_schema_name: &str,
1956 new_schema_name: &str,
1957 ) -> Result<CatalogItem, (String, String)> {
1958 let do_rewrite = |create_sql: String| -> Result<String, (String, String)> {
1959 let mut create_stmt = mz_sql::parse::parse(&create_sql)
1960 .expect("invalid create sql persisted to catalog")
1961 .into_element()
1962 .ast;
1963
1964 mz_sql::ast::transform::create_stmt_rename_schema_refs(
1966 &mut create_stmt,
1967 database_name,
1968 cur_schema_name,
1969 new_schema_name,
1970 )?;
1971
1972 Ok(create_stmt.to_ast_string_stable())
1973 };
1974
1975 match self {
1976 CatalogItem::Table(i) => {
1977 let mut i = i.clone();
1978 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1979 Ok(CatalogItem::Table(i))
1980 }
1981 CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
1982 CatalogItem::Source(i) => {
1983 let mut i = i.clone();
1984 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1985 Ok(CatalogItem::Source(i))
1986 }
1987 CatalogItem::Sink(i) => {
1988 let mut i = i.clone();
1989 i.create_sql = do_rewrite(i.create_sql)?;
1990 Ok(CatalogItem::Sink(i))
1991 }
1992 CatalogItem::View(i) => {
1993 let mut i = i.clone();
1994 i.create_sql = do_rewrite(i.create_sql)?;
1995 Ok(CatalogItem::View(i))
1996 }
1997 CatalogItem::MaterializedView(i) => {
1998 let mut i = i.clone();
1999 i.create_sql = do_rewrite(i.create_sql)?;
2000 Ok(CatalogItem::MaterializedView(i))
2001 }
2002 CatalogItem::Index(i) => {
2003 let mut i = i.clone();
2004 i.create_sql = do_rewrite(i.create_sql)?;
2005 Ok(CatalogItem::Index(i))
2006 }
2007 CatalogItem::Secret(i) => {
2008 let mut i = i.clone();
2009 i.create_sql = do_rewrite(i.create_sql)?;
2010 Ok(CatalogItem::Secret(i))
2011 }
2012 CatalogItem::Connection(i) => {
2013 let mut i = i.clone();
2014 i.create_sql = do_rewrite(i.create_sql)?;
2015 Ok(CatalogItem::Connection(i))
2016 }
2017 CatalogItem::Type(i) => {
2018 let mut i = i.clone();
2019 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2020 Ok(CatalogItem::Type(i))
2021 }
2022 CatalogItem::Func(i) => Ok(CatalogItem::Func(i.clone())),
2023 CatalogItem::ContinualTask(i) => {
2024 let mut i = i.clone();
2025 i.create_sql = do_rewrite(i.create_sql)?;
2026 Ok(CatalogItem::ContinualTask(i))
2027 }
2028 }
2029 }
2030
2031 pub fn rename_item_refs(
2035 &self,
2036 from: FullItemName,
2037 to_item_name: String,
2038 rename_self: bool,
2039 ) -> Result<CatalogItem, String> {
2040 let do_rewrite = |create_sql: String| -> Result<String, String> {
2041 let mut create_stmt = mz_sql::parse::parse(&create_sql)
2042 .expect("invalid create sql persisted to catalog")
2043 .into_element()
2044 .ast;
2045 if rename_self {
2046 mz_sql::ast::transform::create_stmt_rename(&mut create_stmt, to_item_name.clone());
2047 }
2048 mz_sql::ast::transform::create_stmt_rename_refs(&mut create_stmt, from, to_item_name)?;
2050 Ok(create_stmt.to_ast_string_stable())
2051 };
2052
2053 match self {
2054 CatalogItem::Table(i) => {
2055 let mut i = i.clone();
2056 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2057 Ok(CatalogItem::Table(i))
2058 }
2059 CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
2060 CatalogItem::Source(i) => {
2061 let mut i = i.clone();
2062 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
2063 Ok(CatalogItem::Source(i))
2064 }
2065 CatalogItem::Sink(i) => {
2066 let mut i = i.clone();
2067 i.create_sql = do_rewrite(i.create_sql)?;
2068 Ok(CatalogItem::Sink(i))
2069 }
2070 CatalogItem::View(i) => {
2071 let mut i = i.clone();
2072 i.create_sql = do_rewrite(i.create_sql)?;
2073 Ok(CatalogItem::View(i))
2074 }
2075 CatalogItem::MaterializedView(i) => {
2076 let mut i = i.clone();
2077 i.create_sql = do_rewrite(i.create_sql)?;
2078 Ok(CatalogItem::MaterializedView(i))
2079 }
2080 CatalogItem::Index(i) => {
2081 let mut i = i.clone();
2082 i.create_sql = do_rewrite(i.create_sql)?;
2083 Ok(CatalogItem::Index(i))
2084 }
2085 CatalogItem::Secret(i) => {
2086 let mut i = i.clone();
2087 i.create_sql = do_rewrite(i.create_sql)?;
2088 Ok(CatalogItem::Secret(i))
2089 }
2090 CatalogItem::Func(_) | CatalogItem::Type(_) => {
2091 unreachable!("{}s cannot be renamed", self.typ())
2092 }
2093 CatalogItem::Connection(i) => {
2094 let mut i = i.clone();
2095 i.create_sql = do_rewrite(i.create_sql)?;
2096 Ok(CatalogItem::Connection(i))
2097 }
2098 CatalogItem::ContinualTask(i) => {
2099 let mut i = i.clone();
2100 i.create_sql = do_rewrite(i.create_sql)?;
2101 Ok(CatalogItem::ContinualTask(i))
2102 }
2103 }
2104 }
2105
2106 pub fn update_retain_history(
2109 &mut self,
2110 value: Option<Value>,
2111 window: CompactionWindow,
2112 ) -> Result<Option<WithOptionValue<Raw>>, ()> {
2113 let update = |mut ast: &mut Statement<Raw>| {
2114 macro_rules! update_retain_history {
2116 ( $stmt:ident, $opt:ident, $name:ident ) => {{
2117 let pos = $stmt
2119 .with_options
2120 .iter()
2121 .rposition(|o| o.name == mz_sql_parser::ast::$name::RetainHistory);
2123 if let Some(value) = value {
2124 let next = mz_sql_parser::ast::$opt {
2125 name: mz_sql_parser::ast::$name::RetainHistory,
2126 value: Some(WithOptionValue::RetainHistoryFor(value)),
2127 };
2128 if let Some(idx) = pos {
2129 let previous = $stmt.with_options[idx].clone();
2130 $stmt.with_options[idx] = next;
2131 previous.value
2132 } else {
2133 $stmt.with_options.push(next);
2134 None
2135 }
2136 } else {
2137 if let Some(idx) = pos {
2138 $stmt.with_options.swap_remove(idx).value
2139 } else {
2140 None
2141 }
2142 }
2143 }};
2144 }
2145 let previous = match &mut ast {
2146 Statement::CreateTable(stmt) => {
2147 update_retain_history!(stmt, TableOption, TableOptionName)
2148 }
2149 Statement::CreateIndex(stmt) => {
2150 update_retain_history!(stmt, IndexOption, IndexOptionName)
2151 }
2152 Statement::CreateSource(stmt) => {
2153 update_retain_history!(stmt, CreateSourceOption, CreateSourceOptionName)
2154 }
2155 Statement::CreateMaterializedView(stmt) => {
2156 update_retain_history!(stmt, MaterializedViewOption, MaterializedViewOptionName)
2157 }
2158 _ => {
2159 return Err(());
2160 }
2161 };
2162 Ok(previous)
2163 };
2164
2165 let res = self.update_sql(update)?;
2166 let cw = self
2167 .custom_logical_compaction_window_mut()
2168 .expect("item must have compaction window");
2169 *cw = Some(window);
2170 Ok(res)
2171 }
2172
2173 pub fn add_column(
2174 &mut self,
2175 name: ColumnName,
2176 typ: SqlColumnType,
2177 sql: RawDataType,
2178 ) -> Result<RelationVersion, PlanError> {
2179 let CatalogItem::Table(table) = self else {
2180 return Err(PlanError::Unsupported {
2181 feature: "adding columns to a non-Table".to_string(),
2182 discussion_no: None,
2183 });
2184 };
2185 let next_version = table.desc.add_column(name.clone(), typ);
2186
2187 let update = |mut ast: &mut Statement<Raw>| match &mut ast {
2188 Statement::CreateTable(stmt) => {
2189 let version = ColumnOptionDef {
2190 name: None,
2191 option: ColumnOption::Versioned {
2192 action: ColumnVersioned::Added,
2193 version: next_version.into(),
2194 },
2195 };
2196 let column = ColumnDef {
2197 name: name.into(),
2198 data_type: sql,
2199 collation: None,
2200 options: vec![version],
2201 };
2202 stmt.columns.push(column);
2203 Ok(())
2204 }
2205 _ => Err(()),
2206 };
2207
2208 self.update_sql(update)
2209 .map_err(|()| PlanError::Unstructured("expected CREATE TABLE statement".to_string()))?;
2210 Ok(next_version)
2211 }
2212
2213 pub fn update_sql<F, T>(&mut self, f: F) -> Result<T, ()>
2216 where
2217 F: FnOnce(&mut Statement<Raw>) -> Result<T, ()>,
2218 {
2219 let create_sql = match self {
2220 CatalogItem::Table(Table { create_sql, .. })
2221 | CatalogItem::Type(Type { create_sql, .. })
2222 | CatalogItem::Source(Source { create_sql, .. }) => create_sql.as_mut(),
2223 CatalogItem::Sink(Sink { create_sql, .. })
2224 | CatalogItem::View(View { create_sql, .. })
2225 | CatalogItem::MaterializedView(MaterializedView { create_sql, .. })
2226 | CatalogItem::Index(Index { create_sql, .. })
2227 | CatalogItem::Secret(Secret { create_sql, .. })
2228 | CatalogItem::Connection(Connection { create_sql, .. })
2229 | CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => Some(create_sql),
2230 CatalogItem::Func(_) | CatalogItem::Log(_) => None,
2231 };
2232 let Some(create_sql) = create_sql else {
2233 return Err(());
2234 };
2235 let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
2236 .expect("non-system items must be parseable")
2237 .into_element()
2238 .ast;
2239 debug!("rewrite: {}", ast.to_ast_string_redacted());
2240 let t = f(&mut ast)?;
2241 *create_sql = ast.to_ast_string_stable();
2242 debug!("rewrote: {}", ast.to_ast_string_redacted());
2243 Ok(t)
2244 }
2245
2246 pub fn is_compute_object_on_cluster(&self) -> Option<ClusterId> {
2253 match self {
2254 CatalogItem::Index(index) => Some(index.cluster_id),
2255 CatalogItem::Table(_)
2256 | CatalogItem::Source(_)
2257 | CatalogItem::Log(_)
2258 | CatalogItem::View(_)
2259 | CatalogItem::MaterializedView(_)
2260 | CatalogItem::Sink(_)
2261 | CatalogItem::Type(_)
2262 | CatalogItem::Func(_)
2263 | CatalogItem::Secret(_)
2264 | CatalogItem::Connection(_)
2265 | CatalogItem::ContinualTask(_) => None,
2266 }
2267 }
2268
2269 pub fn cluster_id(&self) -> Option<ClusterId> {
2270 match self {
2271 CatalogItem::MaterializedView(mv) => Some(mv.cluster_id),
2272 CatalogItem::Index(index) => Some(index.cluster_id),
2273 CatalogItem::Source(source) => match &source.data_source {
2274 DataSourceDesc::Ingestion { cluster_id, .. }
2275 | DataSourceDesc::OldSyntaxIngestion { cluster_id, .. } => Some(*cluster_id),
2276 DataSourceDesc::IngestionExport { .. } => None,
2280 DataSourceDesc::Webhook { cluster_id, .. } => Some(*cluster_id),
2281 DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => None,
2282 },
2283 CatalogItem::Sink(sink) => Some(sink.cluster_id),
2284 CatalogItem::ContinualTask(ct) => Some(ct.cluster_id),
2285 CatalogItem::Table(_)
2286 | CatalogItem::Log(_)
2287 | CatalogItem::View(_)
2288 | CatalogItem::Type(_)
2289 | CatalogItem::Func(_)
2290 | CatalogItem::Secret(_)
2291 | CatalogItem::Connection(_) => None,
2292 }
2293 }
2294
2295 pub fn custom_logical_compaction_window(&self) -> Option<CompactionWindow> {
2298 match self {
2299 CatalogItem::Table(table) => table.custom_logical_compaction_window,
2300 CatalogItem::Source(source) => source.custom_logical_compaction_window,
2301 CatalogItem::Index(index) => index.custom_logical_compaction_window,
2302 CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
2303 CatalogItem::Log(_)
2304 | CatalogItem::View(_)
2305 | CatalogItem::Sink(_)
2306 | CatalogItem::Type(_)
2307 | CatalogItem::Func(_)
2308 | CatalogItem::Secret(_)
2309 | CatalogItem::Connection(_)
2310 | CatalogItem::ContinualTask(_) => None,
2311 }
2312 }
2313
2314 pub fn custom_logical_compaction_window_mut(
2318 &mut self,
2319 ) -> Option<&mut Option<CompactionWindow>> {
2320 let cw = match self {
2321 CatalogItem::Table(table) => &mut table.custom_logical_compaction_window,
2322 CatalogItem::Source(source) => &mut source.custom_logical_compaction_window,
2323 CatalogItem::Index(index) => &mut index.custom_logical_compaction_window,
2324 CatalogItem::MaterializedView(mview) => &mut mview.custom_logical_compaction_window,
2325 CatalogItem::Log(_)
2326 | CatalogItem::View(_)
2327 | CatalogItem::Sink(_)
2328 | CatalogItem::Type(_)
2329 | CatalogItem::Func(_)
2330 | CatalogItem::Secret(_)
2331 | CatalogItem::Connection(_)
2332 | CatalogItem::ContinualTask(_) => return None,
2333 };
2334 Some(cw)
2335 }
2336
2337 pub fn initial_logical_compaction_window(&self) -> Option<CompactionWindow> {
2345 let custom_logical_compaction_window = match self {
2346 CatalogItem::Table(_)
2347 | CatalogItem::Source(_)
2348 | CatalogItem::Index(_)
2349 | CatalogItem::MaterializedView(_)
2350 | CatalogItem::ContinualTask(_) => self.custom_logical_compaction_window(),
2351 CatalogItem::Log(_)
2352 | CatalogItem::View(_)
2353 | CatalogItem::Sink(_)
2354 | CatalogItem::Type(_)
2355 | CatalogItem::Func(_)
2356 | CatalogItem::Secret(_)
2357 | CatalogItem::Connection(_) => return None,
2358 };
2359 Some(custom_logical_compaction_window.unwrap_or(CompactionWindow::Default))
2360 }
2361
2362 pub fn is_retained_metrics_object(&self) -> bool {
2366 match self {
2367 CatalogItem::Table(table) => table.is_retained_metrics_object,
2368 CatalogItem::Source(source) => source.is_retained_metrics_object,
2369 CatalogItem::Index(index) => index.is_retained_metrics_object,
2370 CatalogItem::Log(_)
2371 | CatalogItem::View(_)
2372 | CatalogItem::MaterializedView(_)
2373 | CatalogItem::Sink(_)
2374 | CatalogItem::Type(_)
2375 | CatalogItem::Func(_)
2376 | CatalogItem::Secret(_)
2377 | CatalogItem::Connection(_)
2378 | CatalogItem::ContinualTask(_) => false,
2379 }
2380 }
2381
2382 pub fn to_serialized(&self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2383 match self {
2384 CatalogItem::Table(table) => {
2385 let create_sql = table
2386 .create_sql
2387 .clone()
2388 .expect("builtin tables cannot be serialized");
2389 let mut collections = table.collections.clone();
2390 let global_id = collections
2391 .remove(&RelationVersion::root())
2392 .expect("at least one version");
2393 (create_sql, global_id, collections)
2394 }
2395 CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2396 CatalogItem::Source(source) => {
2397 assert!(
2398 !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2399 "cannot serialize introspection/builtin sources",
2400 );
2401 let create_sql = source
2402 .create_sql
2403 .clone()
2404 .expect("builtin sources cannot be serialized");
2405 (create_sql, source.global_id, BTreeMap::new())
2406 }
2407 CatalogItem::View(view) => (view.create_sql.clone(), view.global_id, BTreeMap::new()),
2408 CatalogItem::MaterializedView(mview) => {
2409 let mut collections = mview.collections.clone();
2410 let global_id = collections
2411 .remove(&RelationVersion::root())
2412 .expect("at least one version");
2413 (mview.create_sql.clone(), global_id, collections)
2414 }
2415 CatalogItem::Index(index) => {
2416 (index.create_sql.clone(), index.global_id, BTreeMap::new())
2417 }
2418 CatalogItem::Sink(sink) => (sink.create_sql.clone(), sink.global_id, BTreeMap::new()),
2419 CatalogItem::Type(typ) => {
2420 let create_sql = typ
2421 .create_sql
2422 .clone()
2423 .expect("builtin types cannot be serialized");
2424 (create_sql, typ.global_id, BTreeMap::new())
2425 }
2426 CatalogItem::Secret(secret) => {
2427 (secret.create_sql.clone(), secret.global_id, BTreeMap::new())
2428 }
2429 CatalogItem::Connection(connection) => (
2430 connection.create_sql.clone(),
2431 connection.global_id,
2432 BTreeMap::new(),
2433 ),
2434 CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2435 CatalogItem::ContinualTask(ct) => {
2436 (ct.create_sql.clone(), ct.global_id, BTreeMap::new())
2437 }
2438 }
2439 }
2440
2441 pub fn into_serialized(self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2442 match self {
2443 CatalogItem::Table(mut table) => {
2444 let create_sql = table
2445 .create_sql
2446 .expect("builtin tables cannot be serialized");
2447 let global_id = table
2448 .collections
2449 .remove(&RelationVersion::root())
2450 .expect("at least one version");
2451 (create_sql, global_id, table.collections)
2452 }
2453 CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2454 CatalogItem::Source(source) => {
2455 assert!(
2456 !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2457 "cannot serialize introspection/builtin sources",
2458 );
2459 let create_sql = source
2460 .create_sql
2461 .expect("builtin sources cannot be serialized");
2462 (create_sql, source.global_id, BTreeMap::new())
2463 }
2464 CatalogItem::View(view) => (view.create_sql, view.global_id, BTreeMap::new()),
2465 CatalogItem::MaterializedView(mut mview) => {
2466 let global_id = mview
2467 .collections
2468 .remove(&RelationVersion::root())
2469 .expect("at least one version");
2470 (mview.create_sql, global_id, mview.collections)
2471 }
2472 CatalogItem::Index(index) => (index.create_sql, index.global_id, BTreeMap::new()),
2473 CatalogItem::Sink(sink) => (sink.create_sql, sink.global_id, BTreeMap::new()),
2474 CatalogItem::Type(typ) => {
2475 let create_sql = typ.create_sql.expect("builtin types cannot be serialized");
2476 (create_sql, typ.global_id, BTreeMap::new())
2477 }
2478 CatalogItem::Secret(secret) => (secret.create_sql, secret.global_id, BTreeMap::new()),
2479 CatalogItem::Connection(connection) => {
2480 (connection.create_sql, connection.global_id, BTreeMap::new())
2481 }
2482 CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2483 CatalogItem::ContinualTask(ct) => (ct.create_sql, ct.global_id, BTreeMap::new()),
2484 }
2485 }
2486
2487 pub fn global_id_for_version(&self, version: RelationVersionSelector) -> Option<GlobalId> {
2490 let collections = match self {
2491 CatalogItem::MaterializedView(mv) => &mv.collections,
2492 CatalogItem::Table(table) => &table.collections,
2493 CatalogItem::Source(source) => return Some(source.global_id),
2494 CatalogItem::Log(log) => return Some(log.global_id),
2495 CatalogItem::View(view) => return Some(view.global_id),
2496 CatalogItem::Sink(sink) => return Some(sink.global_id),
2497 CatalogItem::Index(index) => return Some(index.global_id),
2498 CatalogItem::Type(ty) => return Some(ty.global_id),
2499 CatalogItem::Func(func) => return Some(func.global_id),
2500 CatalogItem::Secret(secret) => return Some(secret.global_id),
2501 CatalogItem::Connection(conn) => return Some(conn.global_id),
2502 CatalogItem::ContinualTask(ct) => return Some(ct.global_id),
2503 };
2504 match version {
2505 RelationVersionSelector::Latest => collections.values().last().copied(),
2506 RelationVersionSelector::Specific(version) => collections.get(&version).copied(),
2507 }
2508 }
2509}
2510
2511impl CatalogEntry {
2512 pub fn desc_latest(
2517 &self,
2518 name: &FullItemName,
2519 ) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
2520 self.item.desc(name, RelationVersionSelector::Latest)
2521 }
2522
2523 pub fn desc_opt_latest(&self) -> Option<Cow<'_, RelationDesc>> {
2526 self.item.desc_opt(RelationVersionSelector::Latest)
2527 }
2528
2529 pub fn has_columns(&self) -> bool {
2531 match self.item() {
2532 CatalogItem::Type(Type { details, .. }) => {
2533 matches!(details.typ, CatalogType::Record { .. })
2534 }
2535 _ => self.desc_opt_latest().is_some(),
2536 }
2537 }
2538
2539 pub fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
2541 self.item.func(self)
2542 }
2543
2544 pub fn index(&self) -> Option<&Index> {
2546 match self.item() {
2547 CatalogItem::Index(idx) => Some(idx),
2548 _ => None,
2549 }
2550 }
2551
2552 pub fn materialized_view(&self) -> Option<&MaterializedView> {
2554 match self.item() {
2555 CatalogItem::MaterializedView(mv) => Some(mv),
2556 _ => None,
2557 }
2558 }
2559
2560 pub fn table(&self) -> Option<&Table> {
2562 match self.item() {
2563 CatalogItem::Table(tbl) => Some(tbl),
2564 _ => None,
2565 }
2566 }
2567
2568 pub fn source(&self) -> Option<&Source> {
2570 match self.item() {
2571 CatalogItem::Source(src) => Some(src),
2572 _ => None,
2573 }
2574 }
2575
2576 pub fn sink(&self) -> Option<&Sink> {
2578 match self.item() {
2579 CatalogItem::Sink(sink) => Some(sink),
2580 _ => None,
2581 }
2582 }
2583
2584 pub fn secret(&self) -> Option<&Secret> {
2586 match self.item() {
2587 CatalogItem::Secret(secret) => Some(secret),
2588 _ => None,
2589 }
2590 }
2591
2592 pub fn connection(&self) -> Result<&Connection, SqlCatalogError> {
2593 match self.item() {
2594 CatalogItem::Connection(connection) => Ok(connection),
2595 _ => {
2596 let db_name = match self.name().qualifiers.database_spec {
2597 ResolvedDatabaseSpecifier::Ambient => "".to_string(),
2598 ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
2599 };
2600 Err(SqlCatalogError::UnknownConnection(format!(
2601 "{}{}.{}",
2602 db_name,
2603 self.name().qualifiers.schema_spec,
2604 self.name().item
2605 )))
2606 }
2607 }
2608 }
2609
2610 pub fn source_desc(
2613 &self,
2614 ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
2615 self.item.source_desc(self)
2616 }
2617
2618 pub fn is_connection(&self) -> bool {
2620 matches!(self.item(), CatalogItem::Connection(_))
2621 }
2622
2623 pub fn is_table(&self) -> bool {
2625 matches!(self.item(), CatalogItem::Table(_))
2626 }
2627
2628 pub fn is_source(&self) -> bool {
2631 matches!(self.item(), CatalogItem::Source(_))
2632 }
2633
2634 pub fn subsource_details(
2637 &self,
2638 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
2639 match &self.item() {
2640 CatalogItem::Source(source) => match &source.data_source {
2641 DataSourceDesc::IngestionExport {
2642 ingestion_id,
2643 external_reference,
2644 details,
2645 data_config: _,
2646 } => Some((*ingestion_id, external_reference, details)),
2647 _ => None,
2648 },
2649 _ => None,
2650 }
2651 }
2652
2653 pub fn source_export_details(
2656 &self,
2657 ) -> Option<(
2658 CatalogItemId,
2659 &UnresolvedItemName,
2660 &SourceExportDetails,
2661 &SourceExportDataConfig<ReferencedConnection>,
2662 )> {
2663 match &self.item() {
2664 CatalogItem::Source(source) => match &source.data_source {
2665 DataSourceDesc::IngestionExport {
2666 ingestion_id,
2667 external_reference,
2668 details,
2669 data_config,
2670 } => Some((*ingestion_id, external_reference, details, data_config)),
2671 _ => None,
2672 },
2673 CatalogItem::Table(table) => match &table.data_source {
2674 TableDataSource::DataSource {
2675 desc:
2676 DataSourceDesc::IngestionExport {
2677 ingestion_id,
2678 external_reference,
2679 details,
2680 data_config,
2681 },
2682 timeline: _,
2683 } => Some((*ingestion_id, external_reference, details, data_config)),
2684 _ => None,
2685 },
2686 _ => None,
2687 }
2688 }
2689
2690 pub fn is_progress_source(&self) -> bool {
2692 self.item().is_progress_source()
2693 }
2694
2695 pub fn progress_id(&self) -> Option<CatalogItemId> {
2697 match &self.item() {
2698 CatalogItem::Source(source) => match &source.data_source {
2699 DataSourceDesc::Ingestion { .. } => Some(self.id),
2700 DataSourceDesc::OldSyntaxIngestion {
2701 progress_subsource, ..
2702 } => Some(*progress_subsource),
2703 DataSourceDesc::IngestionExport { .. }
2704 | DataSourceDesc::Introspection(_)
2705 | DataSourceDesc::Progress
2706 | DataSourceDesc::Webhook { .. } => None,
2707 },
2708 CatalogItem::Table(_)
2709 | CatalogItem::Log(_)
2710 | CatalogItem::View(_)
2711 | CatalogItem::MaterializedView(_)
2712 | CatalogItem::Sink(_)
2713 | CatalogItem::Index(_)
2714 | CatalogItem::Type(_)
2715 | CatalogItem::Func(_)
2716 | CatalogItem::Secret(_)
2717 | CatalogItem::Connection(_)
2718 | CatalogItem::ContinualTask(_) => None,
2719 }
2720 }
2721
2722 pub fn is_sink(&self) -> bool {
2724 matches!(self.item(), CatalogItem::Sink(_))
2725 }
2726
2727 pub fn is_materialized_view(&self) -> bool {
2729 matches!(self.item(), CatalogItem::MaterializedView(_))
2730 }
2731
2732 pub fn is_view(&self) -> bool {
2734 matches!(self.item(), CatalogItem::View(_))
2735 }
2736
2737 pub fn is_secret(&self) -> bool {
2739 matches!(self.item(), CatalogItem::Secret(_))
2740 }
2741
2742 pub fn is_introspection_source(&self) -> bool {
2744 matches!(self.item(), CatalogItem::Log(_))
2745 }
2746
2747 pub fn is_index(&self) -> bool {
2749 matches!(self.item(), CatalogItem::Index(_))
2750 }
2751
2752 pub fn is_continual_task(&self) -> bool {
2754 matches!(self.item(), CatalogItem::ContinualTask(_))
2755 }
2756
2757 pub fn is_relation(&self) -> bool {
2759 mz_sql::catalog::ObjectType::from(self.item_type()).is_relation()
2760 }
2761
2762 pub fn references(&self) -> &ResolvedIds {
2765 self.item.references()
2766 }
2767
2768 pub fn uses(&self) -> BTreeSet<CatalogItemId> {
2774 self.item.uses()
2775 }
2776
2777 pub fn item(&self) -> &CatalogItem {
2779 &self.item
2780 }
2781
2782 pub fn id(&self) -> CatalogItemId {
2784 self.id
2785 }
2786
2787 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2789 self.item().global_ids()
2790 }
2791
2792 pub fn latest_global_id(&self) -> GlobalId {
2793 self.item().latest_global_id()
2794 }
2795
2796 pub fn oid(&self) -> u32 {
2798 self.oid
2799 }
2800
2801 pub fn name(&self) -> &QualifiedItemName {
2803 &self.name
2804 }
2805
2806 pub fn referenced_by(&self) -> &[CatalogItemId] {
2808 &self.referenced_by
2809 }
2810
2811 pub fn used_by(&self) -> &[CatalogItemId] {
2813 &self.used_by
2814 }
2815
2816 pub fn conn_id(&self) -> Option<&ConnectionId> {
2819 self.item.conn_id()
2820 }
2821
2822 pub fn owner_id(&self) -> &RoleId {
2824 &self.owner_id
2825 }
2826
2827 pub fn privileges(&self) -> &PrivilegeMap {
2829 &self.privileges
2830 }
2831}
2832
2833#[derive(Debug, Clone, Default)]
2834pub struct CommentsMap {
2835 map: BTreeMap<CommentObjectId, BTreeMap<Option<usize>, String>>,
2836}
2837
2838impl CommentsMap {
2839 pub fn update_comment(
2840 &mut self,
2841 object_id: CommentObjectId,
2842 sub_component: Option<usize>,
2843 comment: Option<String>,
2844 ) -> Option<String> {
2845 let object_comments = self.map.entry(object_id).or_default();
2846
2847 let (empty, prev) = if let Some(comment) = comment {
2849 let prev = object_comments.insert(sub_component, comment);
2850 (false, prev)
2851 } else {
2852 let prev = object_comments.remove(&sub_component);
2853 (object_comments.is_empty(), prev)
2854 };
2855
2856 if empty {
2858 self.map.remove(&object_id);
2859 }
2860
2861 prev
2863 }
2864
2865 pub fn drop_comments(
2871 &mut self,
2872 object_ids: &BTreeSet<CommentObjectId>,
2873 ) -> Vec<(CommentObjectId, Option<usize>, String)> {
2874 let mut removed_comments = Vec::new();
2875
2876 for object_id in object_ids {
2877 if let Some(comments) = self.map.remove(object_id) {
2878 let removed = comments
2879 .into_iter()
2880 .map(|(sub_comp, comment)| (object_id.clone(), sub_comp, comment));
2881 removed_comments.extend(removed);
2882 }
2883 }
2884
2885 removed_comments
2886 }
2887
2888 pub fn iter(&self) -> impl Iterator<Item = (CommentObjectId, Option<usize>, &str)> {
2889 self.map
2890 .iter()
2891 .map(|(id, comments)| {
2892 comments
2893 .iter()
2894 .map(|(pos, comment)| (*id, *pos, comment.as_str()))
2895 })
2896 .flatten()
2897 }
2898
2899 pub fn get_object_comments(
2900 &self,
2901 object_id: CommentObjectId,
2902 ) -> Option<&BTreeMap<Option<usize>, String>> {
2903 self.map.get(&object_id)
2904 }
2905}
2906
2907impl Serialize for CommentsMap {
2908 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2909 where
2910 S: serde::Serializer,
2911 {
2912 let comment_count = self
2913 .map
2914 .iter()
2915 .map(|(_object_id, comments)| comments.len())
2916 .sum();
2917
2918 let mut seq = serializer.serialize_seq(Some(comment_count))?;
2919 for (object_id, sub) in &self.map {
2920 for (sub_component, comment) in sub {
2921 seq.serialize_element(&(
2922 format!("{object_id:?}"),
2923 format!("{sub_component:?}"),
2924 comment,
2925 ))?;
2926 }
2927 }
2928 seq.end()
2929 }
2930}
2931
2932#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
2933pub struct DefaultPrivileges {
2934 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
2935 privileges: BTreeMap<DefaultPrivilegeObject, RoleDefaultPrivileges>,
2936}
2937
2938#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
2941struct RoleDefaultPrivileges(
2942 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
2944 BTreeMap<RoleId, DefaultPrivilegeAclItem>,
2945);
2946
2947impl Deref for RoleDefaultPrivileges {
2948 type Target = BTreeMap<RoleId, DefaultPrivilegeAclItem>;
2949
2950 fn deref(&self) -> &Self::Target {
2951 &self.0
2952 }
2953}
2954
2955impl DerefMut for RoleDefaultPrivileges {
2956 fn deref_mut(&mut self) -> &mut Self::Target {
2957 &mut self.0
2958 }
2959}
2960
2961impl DefaultPrivileges {
2962 pub fn grant(&mut self, object: DefaultPrivilegeObject, privilege: DefaultPrivilegeAclItem) {
2964 if privilege.acl_mode.is_empty() {
2965 return;
2966 }
2967
2968 let privileges = self.privileges.entry(object).or_default();
2969 if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
2970 default_privilege.acl_mode |= privilege.acl_mode;
2971 } else {
2972 privileges.insert(privilege.grantee, privilege);
2973 }
2974 }
2975
2976 pub fn revoke(&mut self, object: &DefaultPrivilegeObject, privilege: &DefaultPrivilegeAclItem) {
2978 if let Some(privileges) = self.privileges.get_mut(object) {
2979 if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
2980 default_privilege.acl_mode =
2981 default_privilege.acl_mode.difference(privilege.acl_mode);
2982 if default_privilege.acl_mode.is_empty() {
2983 privileges.remove(&privilege.grantee);
2984 }
2985 }
2986 if privileges.is_empty() {
2987 self.privileges.remove(object);
2988 }
2989 }
2990 }
2991
2992 pub fn get_privileges_for_grantee(
2995 &self,
2996 object: &DefaultPrivilegeObject,
2997 grantee: &RoleId,
2998 ) -> Option<&AclMode> {
2999 self.privileges
3000 .get(object)
3001 .and_then(|privileges| privileges.get(grantee))
3002 .map(|privilege| &privilege.acl_mode)
3003 }
3004
3005 pub fn get_applicable_privileges(
3007 &self,
3008 role_id: RoleId,
3009 database_id: Option<DatabaseId>,
3010 schema_id: Option<SchemaId>,
3011 object_type: mz_sql::catalog::ObjectType,
3012 ) -> impl Iterator<Item = DefaultPrivilegeAclItem> + '_ {
3013 let privilege_object_type = if object_type.is_relation() {
3017 mz_sql::catalog::ObjectType::Table
3018 } else {
3019 object_type
3020 };
3021 let valid_acl_mode = rbac::all_object_privileges(SystemObjectType::Object(object_type));
3022
3023 [
3027 DefaultPrivilegeObject {
3028 role_id,
3029 database_id,
3030 schema_id,
3031 object_type: privilege_object_type,
3032 },
3033 DefaultPrivilegeObject {
3034 role_id,
3035 database_id,
3036 schema_id: None,
3037 object_type: privilege_object_type,
3038 },
3039 DefaultPrivilegeObject {
3040 role_id,
3041 database_id: None,
3042 schema_id: None,
3043 object_type: privilege_object_type,
3044 },
3045 DefaultPrivilegeObject {
3046 role_id: RoleId::Public,
3047 database_id,
3048 schema_id,
3049 object_type: privilege_object_type,
3050 },
3051 DefaultPrivilegeObject {
3052 role_id: RoleId::Public,
3053 database_id,
3054 schema_id: None,
3055 object_type: privilege_object_type,
3056 },
3057 DefaultPrivilegeObject {
3058 role_id: RoleId::Public,
3059 database_id: None,
3060 schema_id: None,
3061 object_type: privilege_object_type,
3062 },
3063 ]
3064 .into_iter()
3065 .filter_map(|object| self.privileges.get(&object))
3066 .flat_map(|acl_map| acl_map.values())
3067 .fold(
3069 BTreeMap::new(),
3070 |mut accum, DefaultPrivilegeAclItem { grantee, acl_mode }| {
3071 let accum_acl_mode = accum.entry(grantee).or_insert_with(AclMode::empty);
3072 *accum_acl_mode |= *acl_mode;
3073 accum
3074 },
3075 )
3076 .into_iter()
3077 .map(move |(grantee, acl_mode)| (grantee, acl_mode & valid_acl_mode))
3082 .filter(|(_, acl_mode)| !acl_mode.is_empty())
3084 .map(|(grantee, acl_mode)| DefaultPrivilegeAclItem {
3085 grantee: *grantee,
3086 acl_mode,
3087 })
3088 }
3089
3090 pub fn iter(
3091 &self,
3092 ) -> impl Iterator<
3093 Item = (
3094 &DefaultPrivilegeObject,
3095 impl Iterator<Item = &DefaultPrivilegeAclItem>,
3096 ),
3097 > {
3098 self.privileges
3099 .iter()
3100 .map(|(object, acl_map)| (object, acl_map.values()))
3101 }
3102}
3103
3104#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3105pub struct ClusterConfig {
3106 pub variant: ClusterVariant,
3107 pub workload_class: Option<String>,
3108}
3109
3110impl ClusterConfig {
3111 pub fn features(&self) -> Option<&OptimizerFeatureOverrides> {
3112 match &self.variant {
3113 ClusterVariant::Managed(managed) => Some(&managed.optimizer_feature_overrides),
3114 ClusterVariant::Unmanaged => None,
3115 }
3116 }
3117}
3118
3119impl From<ClusterConfig> for durable::ClusterConfig {
3120 fn from(config: ClusterConfig) -> Self {
3121 Self {
3122 variant: config.variant.into(),
3123 workload_class: config.workload_class,
3124 }
3125 }
3126}
3127
3128impl From<durable::ClusterConfig> for ClusterConfig {
3129 fn from(config: durable::ClusterConfig) -> Self {
3130 Self {
3131 variant: config.variant.into(),
3132 workload_class: config.workload_class,
3133 }
3134 }
3135}
3136
3137#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3138pub struct ClusterVariantManaged {
3139 pub size: String,
3140 pub availability_zones: Vec<String>,
3141 pub logging: ReplicaLogging,
3142 pub replication_factor: u32,
3143 pub optimizer_feature_overrides: OptimizerFeatureOverrides,
3144 pub schedule: ClusterSchedule,
3145}
3146
3147impl From<ClusterVariantManaged> for durable::ClusterVariantManaged {
3148 fn from(managed: ClusterVariantManaged) -> Self {
3149 Self {
3150 size: managed.size,
3151 availability_zones: managed.availability_zones,
3152 logging: managed.logging,
3153 replication_factor: managed.replication_factor,
3154 optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3155 schedule: managed.schedule,
3156 }
3157 }
3158}
3159
3160impl From<durable::ClusterVariantManaged> for ClusterVariantManaged {
3161 fn from(managed: durable::ClusterVariantManaged) -> Self {
3162 Self {
3163 size: managed.size,
3164 availability_zones: managed.availability_zones,
3165 logging: managed.logging,
3166 replication_factor: managed.replication_factor,
3167 optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3168 schedule: managed.schedule,
3169 }
3170 }
3171}
3172
3173#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3174pub enum ClusterVariant {
3175 Managed(ClusterVariantManaged),
3176 Unmanaged,
3177}
3178
3179impl From<ClusterVariant> for durable::ClusterVariant {
3180 fn from(variant: ClusterVariant) -> Self {
3181 match variant {
3182 ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3183 ClusterVariant::Unmanaged => Self::Unmanaged,
3184 }
3185 }
3186}
3187
3188impl From<durable::ClusterVariant> for ClusterVariant {
3189 fn from(variant: durable::ClusterVariant) -> Self {
3190 match variant {
3191 durable::ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3192 durable::ClusterVariant::Unmanaged => Self::Unmanaged,
3193 }
3194 }
3195}
3196
3197impl mz_sql::catalog::CatalogDatabase for Database {
3198 fn name(&self) -> &str {
3199 &self.name
3200 }
3201
3202 fn id(&self) -> DatabaseId {
3203 self.id
3204 }
3205
3206 fn has_schemas(&self) -> bool {
3207 !self.schemas_by_name.is_empty()
3208 }
3209
3210 fn schema_ids(&self) -> &BTreeMap<String, SchemaId> {
3211 &self.schemas_by_name
3212 }
3213
3214 #[allow(clippy::as_conversions)]
3216 fn schemas(&self) -> Vec<&dyn CatalogSchema> {
3217 self.schemas_by_id
3218 .values()
3219 .map(|schema| schema as &dyn CatalogSchema)
3220 .collect()
3221 }
3222
3223 fn owner_id(&self) -> RoleId {
3224 self.owner_id
3225 }
3226
3227 fn privileges(&self) -> &PrivilegeMap {
3228 &self.privileges
3229 }
3230}
3231
3232impl mz_sql::catalog::CatalogSchema for Schema {
3233 fn database(&self) -> &ResolvedDatabaseSpecifier {
3234 &self.name.database
3235 }
3236
3237 fn name(&self) -> &QualifiedSchemaName {
3238 &self.name
3239 }
3240
3241 fn id(&self) -> &SchemaSpecifier {
3242 &self.id
3243 }
3244
3245 fn has_items(&self) -> bool {
3246 !self.items.is_empty()
3247 }
3248
3249 fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_> {
3250 Box::new(
3251 self.items
3252 .values()
3253 .chain(self.functions.values())
3254 .chain(self.types.values())
3255 .copied(),
3256 )
3257 }
3258
3259 fn owner_id(&self) -> RoleId {
3260 self.owner_id
3261 }
3262
3263 fn privileges(&self) -> &PrivilegeMap {
3264 &self.privileges
3265 }
3266}
3267
3268impl mz_sql::catalog::CatalogRole for Role {
3269 fn name(&self) -> &str {
3270 &self.name
3271 }
3272
3273 fn id(&self) -> RoleId {
3274 self.id
3275 }
3276
3277 fn membership(&self) -> &BTreeMap<RoleId, RoleId> {
3278 &self.membership.map
3279 }
3280
3281 fn attributes(&self) -> &RoleAttributes {
3282 &self.attributes
3283 }
3284
3285 fn vars(&self) -> &BTreeMap<String, OwnedVarInput> {
3286 &self.vars.map
3287 }
3288}
3289
3290impl mz_sql::catalog::CatalogNetworkPolicy for NetworkPolicy {
3291 fn name(&self) -> &str {
3292 &self.name
3293 }
3294
3295 fn id(&self) -> NetworkPolicyId {
3296 self.id
3297 }
3298
3299 fn owner_id(&self) -> RoleId {
3300 self.owner_id
3301 }
3302
3303 fn privileges(&self) -> &PrivilegeMap {
3304 &self.privileges
3305 }
3306}
3307
3308impl mz_sql::catalog::CatalogCluster<'_> for Cluster {
3309 fn name(&self) -> &str {
3310 &self.name
3311 }
3312
3313 fn id(&self) -> ClusterId {
3314 self.id
3315 }
3316
3317 fn bound_objects(&self) -> &BTreeSet<CatalogItemId> {
3318 &self.bound_objects
3319 }
3320
3321 fn replica_ids(&self) -> &BTreeMap<String, ReplicaId> {
3322 &self.replica_id_by_name_
3323 }
3324
3325 #[allow(clippy::as_conversions)]
3327 fn replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>> {
3328 self.replicas()
3329 .map(|replica| replica as &dyn CatalogClusterReplica)
3330 .collect()
3331 }
3332
3333 fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica<'_> {
3334 self.replica(id).expect("catalog out of sync")
3335 }
3336
3337 fn owner_id(&self) -> RoleId {
3338 self.owner_id
3339 }
3340
3341 fn privileges(&self) -> &PrivilegeMap {
3342 &self.privileges
3343 }
3344
3345 fn is_managed(&self) -> bool {
3346 self.is_managed()
3347 }
3348
3349 fn managed_size(&self) -> Option<&str> {
3350 match &self.config.variant {
3351 ClusterVariant::Managed(ClusterVariantManaged { size, .. }) => Some(size),
3352 _ => None,
3353 }
3354 }
3355
3356 fn schedule(&self) -> Option<&ClusterSchedule> {
3357 match &self.config.variant {
3358 ClusterVariant::Managed(ClusterVariantManaged { schedule, .. }) => Some(schedule),
3359 _ => None,
3360 }
3361 }
3362
3363 fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
3364 self.try_to_plan()
3365 }
3366}
3367
3368impl mz_sql::catalog::CatalogClusterReplica<'_> for ClusterReplica {
3369 fn name(&self) -> &str {
3370 &self.name
3371 }
3372
3373 fn cluster_id(&self) -> ClusterId {
3374 self.cluster_id
3375 }
3376
3377 fn replica_id(&self) -> ReplicaId {
3378 self.replica_id
3379 }
3380
3381 fn owner_id(&self) -> RoleId {
3382 self.owner_id
3383 }
3384
3385 fn internal(&self) -> bool {
3386 self.config.location.internal()
3387 }
3388}
3389
3390impl mz_sql::catalog::CatalogItem for CatalogEntry {
3391 fn name(&self) -> &QualifiedItemName {
3392 self.name()
3393 }
3394
3395 fn id(&self) -> CatalogItemId {
3396 self.id()
3397 }
3398
3399 fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
3400 Box::new(self.global_ids())
3401 }
3402
3403 fn oid(&self) -> u32 {
3404 self.oid()
3405 }
3406
3407 fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
3408 self.func()
3409 }
3410
3411 fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
3412 self.source_desc()
3413 }
3414
3415 fn connection(
3416 &self,
3417 ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
3418 {
3419 Ok(self.connection()?.details.to_connection())
3420 }
3421
3422 fn create_sql(&self) -> &str {
3423 match self.item() {
3424 CatalogItem::Table(Table { create_sql, .. }) => {
3425 create_sql.as_deref().unwrap_or("<builtin>")
3426 }
3427 CatalogItem::Source(Source { create_sql, .. }) => {
3428 create_sql.as_deref().unwrap_or("<builtin>")
3429 }
3430 CatalogItem::Sink(Sink { create_sql, .. }) => create_sql,
3431 CatalogItem::View(View { create_sql, .. }) => create_sql,
3432 CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) => create_sql,
3433 CatalogItem::Index(Index { create_sql, .. }) => create_sql,
3434 CatalogItem::Type(Type { create_sql, .. }) => {
3435 create_sql.as_deref().unwrap_or("<builtin>")
3436 }
3437 CatalogItem::Secret(Secret { create_sql, .. }) => create_sql,
3438 CatalogItem::Connection(Connection { create_sql, .. }) => create_sql,
3439 CatalogItem::Func(_) => "<builtin>",
3440 CatalogItem::Log(_) => "<builtin>",
3441 CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => create_sql,
3442 }
3443 }
3444
3445 fn item_type(&self) -> SqlCatalogItemType {
3446 self.item().typ()
3447 }
3448
3449 fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
3450 if let CatalogItem::Index(Index { keys, on, .. }) = self.item() {
3451 Some((keys, *on))
3452 } else {
3453 None
3454 }
3455 }
3456
3457 fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
3458 if let CatalogItem::Table(Table {
3459 data_source: TableDataSource::TableWrites { defaults },
3460 ..
3461 }) = self.item()
3462 {
3463 Some(defaults.as_slice())
3464 } else {
3465 None
3466 }
3467 }
3468
3469 fn replacement_target(&self) -> Option<CatalogItemId> {
3470 if let CatalogItem::MaterializedView(mv) = self.item() {
3471 mv.replacement_target
3472 } else {
3473 None
3474 }
3475 }
3476
3477 fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
3478 if let CatalogItem::Type(Type { details, .. }) = self.item() {
3479 Some(details)
3480 } else {
3481 None
3482 }
3483 }
3484
3485 fn references(&self) -> &ResolvedIds {
3486 self.references()
3487 }
3488
3489 fn uses(&self) -> BTreeSet<CatalogItemId> {
3490 self.uses()
3491 }
3492
3493 fn referenced_by(&self) -> &[CatalogItemId] {
3494 self.referenced_by()
3495 }
3496
3497 fn used_by(&self) -> &[CatalogItemId] {
3498 self.used_by()
3499 }
3500
3501 fn subsource_details(
3502 &self,
3503 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
3504 self.subsource_details()
3505 }
3506
3507 fn source_export_details(
3508 &self,
3509 ) -> Option<(
3510 CatalogItemId,
3511 &UnresolvedItemName,
3512 &SourceExportDetails,
3513 &SourceExportDataConfig<ReferencedConnection>,
3514 )> {
3515 self.source_export_details()
3516 }
3517
3518 fn is_progress_source(&self) -> bool {
3519 self.is_progress_source()
3520 }
3521
3522 fn progress_id(&self) -> Option<CatalogItemId> {
3523 self.progress_id()
3524 }
3525
3526 fn owner_id(&self) -> RoleId {
3527 self.owner_id
3528 }
3529
3530 fn privileges(&self) -> &PrivilegeMap {
3531 &self.privileges
3532 }
3533
3534 fn cluster_id(&self) -> Option<ClusterId> {
3535 self.item().cluster_id()
3536 }
3537
3538 fn at_version(
3539 &self,
3540 version: RelationVersionSelector,
3541 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
3542 Box::new(CatalogCollectionEntry {
3543 entry: self.clone(),
3544 version,
3545 })
3546 }
3547
3548 fn latest_version(&self) -> Option<RelationVersion> {
3549 self.table().map(|t| t.desc.latest_version())
3550 }
3551}
3552
3553#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3555pub struct StateUpdate {
3556 pub kind: StateUpdateKind,
3557 pub ts: Timestamp,
3558 pub diff: StateDiff,
3559}
3560
3561#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
3565pub enum StateUpdateKind {
3566 Role(durable::objects::Role),
3567 RoleAuth(durable::objects::RoleAuth),
3568 Database(durable::objects::Database),
3569 Schema(durable::objects::Schema),
3570 DefaultPrivilege(durable::objects::DefaultPrivilege),
3571 SystemPrivilege(MzAclItem),
3572 SystemConfiguration(durable::objects::SystemConfiguration),
3573 Cluster(durable::objects::Cluster),
3574 NetworkPolicy(durable::objects::NetworkPolicy),
3575 IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3576 ClusterReplica(durable::objects::ClusterReplica),
3577 SourceReferences(durable::objects::SourceReferences),
3578 SystemObjectMapping(durable::objects::SystemObjectMapping),
3579 TemporaryItem(TemporaryItem),
3583 Item(durable::objects::Item),
3584 Comment(durable::objects::Comment),
3585 AuditLog(durable::objects::AuditLog),
3586 StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3588 UnfinalizedShard(durable::objects::UnfinalizedShard),
3589}
3590
3591#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
3593pub enum StateDiff {
3594 Retraction,
3595 Addition,
3596}
3597
3598impl From<StateDiff> for Diff {
3599 fn from(diff: StateDiff) -> Self {
3600 match diff {
3601 StateDiff::Retraction => Diff::MINUS_ONE,
3602 StateDiff::Addition => Diff::ONE,
3603 }
3604 }
3605}
3606impl TryFrom<Diff> for StateDiff {
3607 type Error = String;
3608
3609 fn try_from(diff: Diff) -> Result<Self, Self::Error> {
3610 match diff {
3611 Diff::MINUS_ONE => Ok(Self::Retraction),
3612 Diff::ONE => Ok(Self::Addition),
3613 diff => Err(format!("invalid diff {diff}")),
3614 }
3615 }
3616}
3617
3618#[derive(Debug, Clone, Ord, PartialOrd, PartialEq, Eq)]
3620pub struct TemporaryItem {
3621 pub id: CatalogItemId,
3622 pub oid: u32,
3623 pub global_id: GlobalId,
3624 pub schema_id: SchemaId,
3625 pub name: String,
3626 pub conn_id: Option<ConnectionId>,
3627 pub create_sql: String,
3628 pub owner_id: RoleId,
3629 pub privileges: Vec<MzAclItem>,
3630 pub extra_versions: BTreeMap<RelationVersion, GlobalId>,
3631}
3632
3633impl From<CatalogEntry> for TemporaryItem {
3634 fn from(entry: CatalogEntry) -> Self {
3635 let conn_id = entry.conn_id().cloned();
3636 let (create_sql, global_id, extra_versions) = entry.item.to_serialized();
3637
3638 TemporaryItem {
3639 id: entry.id,
3640 oid: entry.oid,
3641 global_id,
3642 schema_id: entry.name.qualifiers.schema_spec.into(),
3643 name: entry.name.item,
3644 conn_id,
3645 create_sql,
3646 owner_id: entry.owner_id,
3647 privileges: entry.privileges.into_all_values().collect(),
3648 extra_versions,
3649 }
3650 }
3651}
3652
3653impl TemporaryItem {
3654 pub fn item_type(&self) -> CatalogItemType {
3655 item_type(&self.create_sql)
3656 }
3657}
3658
3659#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
3661pub enum BootstrapStateUpdateKind {
3662 Role(durable::objects::Role),
3663 RoleAuth(durable::objects::RoleAuth),
3664 Database(durable::objects::Database),
3665 Schema(durable::objects::Schema),
3666 DefaultPrivilege(durable::objects::DefaultPrivilege),
3667 SystemPrivilege(MzAclItem),
3668 SystemConfiguration(durable::objects::SystemConfiguration),
3669 Cluster(durable::objects::Cluster),
3670 NetworkPolicy(durable::objects::NetworkPolicy),
3671 IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3672 ClusterReplica(durable::objects::ClusterReplica),
3673 SourceReferences(durable::objects::SourceReferences),
3674 SystemObjectMapping(durable::objects::SystemObjectMapping),
3675 Item(durable::objects::Item),
3676 Comment(durable::objects::Comment),
3677 AuditLog(durable::objects::AuditLog),
3678 StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3680 UnfinalizedShard(durable::objects::UnfinalizedShard),
3681}
3682
3683impl From<BootstrapStateUpdateKind> for StateUpdateKind {
3684 fn from(value: BootstrapStateUpdateKind) -> Self {
3685 match value {
3686 BootstrapStateUpdateKind::Role(kind) => StateUpdateKind::Role(kind),
3687 BootstrapStateUpdateKind::RoleAuth(kind) => StateUpdateKind::RoleAuth(kind),
3688 BootstrapStateUpdateKind::Database(kind) => StateUpdateKind::Database(kind),
3689 BootstrapStateUpdateKind::Schema(kind) => StateUpdateKind::Schema(kind),
3690 BootstrapStateUpdateKind::DefaultPrivilege(kind) => {
3691 StateUpdateKind::DefaultPrivilege(kind)
3692 }
3693 BootstrapStateUpdateKind::SystemPrivilege(kind) => {
3694 StateUpdateKind::SystemPrivilege(kind)
3695 }
3696 BootstrapStateUpdateKind::SystemConfiguration(kind) => {
3697 StateUpdateKind::SystemConfiguration(kind)
3698 }
3699 BootstrapStateUpdateKind::SourceReferences(kind) => {
3700 StateUpdateKind::SourceReferences(kind)
3701 }
3702 BootstrapStateUpdateKind::Cluster(kind) => StateUpdateKind::Cluster(kind),
3703 BootstrapStateUpdateKind::NetworkPolicy(kind) => StateUpdateKind::NetworkPolicy(kind),
3704 BootstrapStateUpdateKind::IntrospectionSourceIndex(kind) => {
3705 StateUpdateKind::IntrospectionSourceIndex(kind)
3706 }
3707 BootstrapStateUpdateKind::ClusterReplica(kind) => StateUpdateKind::ClusterReplica(kind),
3708 BootstrapStateUpdateKind::SystemObjectMapping(kind) => {
3709 StateUpdateKind::SystemObjectMapping(kind)
3710 }
3711 BootstrapStateUpdateKind::Item(kind) => StateUpdateKind::Item(kind),
3712 BootstrapStateUpdateKind::Comment(kind) => StateUpdateKind::Comment(kind),
3713 BootstrapStateUpdateKind::AuditLog(kind) => StateUpdateKind::AuditLog(kind),
3714 BootstrapStateUpdateKind::StorageCollectionMetadata(kind) => {
3715 StateUpdateKind::StorageCollectionMetadata(kind)
3716 }
3717 BootstrapStateUpdateKind::UnfinalizedShard(kind) => {
3718 StateUpdateKind::UnfinalizedShard(kind)
3719 }
3720 }
3721 }
3722}
3723
3724impl TryFrom<StateUpdateKind> for BootstrapStateUpdateKind {
3725 type Error = TemporaryItem;
3726
3727 fn try_from(value: StateUpdateKind) -> Result<Self, Self::Error> {
3728 match value {
3729 StateUpdateKind::Role(kind) => Ok(BootstrapStateUpdateKind::Role(kind)),
3730 StateUpdateKind::RoleAuth(kind) => Ok(BootstrapStateUpdateKind::RoleAuth(kind)),
3731 StateUpdateKind::Database(kind) => Ok(BootstrapStateUpdateKind::Database(kind)),
3732 StateUpdateKind::Schema(kind) => Ok(BootstrapStateUpdateKind::Schema(kind)),
3733 StateUpdateKind::DefaultPrivilege(kind) => {
3734 Ok(BootstrapStateUpdateKind::DefaultPrivilege(kind))
3735 }
3736 StateUpdateKind::SystemPrivilege(kind) => {
3737 Ok(BootstrapStateUpdateKind::SystemPrivilege(kind))
3738 }
3739 StateUpdateKind::SystemConfiguration(kind) => {
3740 Ok(BootstrapStateUpdateKind::SystemConfiguration(kind))
3741 }
3742 StateUpdateKind::Cluster(kind) => Ok(BootstrapStateUpdateKind::Cluster(kind)),
3743 StateUpdateKind::NetworkPolicy(kind) => {
3744 Ok(BootstrapStateUpdateKind::NetworkPolicy(kind))
3745 }
3746 StateUpdateKind::IntrospectionSourceIndex(kind) => {
3747 Ok(BootstrapStateUpdateKind::IntrospectionSourceIndex(kind))
3748 }
3749 StateUpdateKind::ClusterReplica(kind) => {
3750 Ok(BootstrapStateUpdateKind::ClusterReplica(kind))
3751 }
3752 StateUpdateKind::SourceReferences(kind) => {
3753 Ok(BootstrapStateUpdateKind::SourceReferences(kind))
3754 }
3755 StateUpdateKind::SystemObjectMapping(kind) => {
3756 Ok(BootstrapStateUpdateKind::SystemObjectMapping(kind))
3757 }
3758 StateUpdateKind::TemporaryItem(kind) => Err(kind),
3759 StateUpdateKind::Item(kind) => Ok(BootstrapStateUpdateKind::Item(kind)),
3760 StateUpdateKind::Comment(kind) => Ok(BootstrapStateUpdateKind::Comment(kind)),
3761 StateUpdateKind::AuditLog(kind) => Ok(BootstrapStateUpdateKind::AuditLog(kind)),
3762 StateUpdateKind::StorageCollectionMetadata(kind) => {
3763 Ok(BootstrapStateUpdateKind::StorageCollectionMetadata(kind))
3764 }
3765 StateUpdateKind::UnfinalizedShard(kind) => {
3766 Ok(BootstrapStateUpdateKind::UnfinalizedShard(kind))
3767 }
3768 }
3769 }
3770}