1use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, LazyLock};
18
19use chrono::{DateTime, Utc};
20use mz_adapter_types::compaction::CompactionWindow;
21use mz_adapter_types::connection::ConnectionId;
22use mz_compute_client::logging::LogVariant;
23use mz_controller::clusters::{ClusterRole, ClusterStatus, ReplicaConfig, ReplicaLogging};
24use mz_controller_types::{ClusterId, ReplicaId};
25use mz_expr::{MirScalarExpr, OptimizedMirRelationExpr};
26use mz_ore::collections::CollectionExt;
27use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
28use mz_repr::network_policy_id::NetworkPolicyId;
29use mz_repr::optimize::OptimizerFeatureOverrides;
30use mz_repr::refresh_schedule::RefreshSchedule;
31use mz_repr::role_id::RoleId;
32use mz_repr::{
33 CatalogItemId, ColumnName, Diff, GlobalId, RelationDesc, RelationVersion,
34 RelationVersionSelector, SqlColumnType, Timestamp, VersionedRelationDesc,
35};
36use mz_sql::ast::display::AstDisplay;
37use mz_sql::ast::{
38 ColumnDef, ColumnOption, ColumnOptionDef, ColumnVersioned, Expr, Raw, RawDataType, Statement,
39 UnresolvedItemName, Value, WithOptionValue,
40};
41use mz_sql::catalog::{
42 CatalogClusterReplica, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem,
43 CatalogItemType as SqlCatalogItemType, CatalogItemType, CatalogSchema, CatalogTypeDetails,
44 DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, RoleAttributes, RoleMembership,
45 RoleVars, SystemObjectType,
46};
47use mz_sql::names::{
48 Aug, CommentObjectId, DatabaseId, DependencyIds, FullItemName, QualifiedItemName,
49 QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier,
50};
51use mz_sql::plan::{
52 ClusterSchedule, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, ConnectionDetails,
53 CreateClusterManagedPlan, CreateClusterPlan, CreateClusterVariant, CreateSourcePlan,
54 HirRelationExpr, NetworkPolicyRule, PlanError, WebhookBodyFormat, WebhookHeaders,
55 WebhookValidation,
56};
57use mz_sql::rbac;
58use mz_sql::session::vars::OwnedVarInput;
59use mz_storage_client::controller::IntrospectionType;
60use mz_storage_types::connections::inline::ReferencedConnection;
61use mz_storage_types::sinks::{SinkEnvelope, StorageSinkConnection};
62use mz_storage_types::sources::load_generator::LoadGenerator;
63use mz_storage_types::sources::{
64 GenericSourceConnection, SourceConnection, SourceDesc, SourceEnvelope, SourceExportDataConfig,
65 SourceExportDetails, Timeline,
66};
67use serde::ser::SerializeSeq;
68use serde::{Deserialize, Serialize};
69use timely::progress::Antichain;
70use tracing::debug;
71
72use crate::builtin::{MZ_CATALOG_SERVER_CLUSTER, MZ_SYSTEM_CLUSTER};
73use crate::durable;
74
75pub trait UpdateFrom<T>: From<T> {
77 fn update_from(&mut self, from: T);
78}
79
80#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
81pub struct Database {
82 pub name: String,
83 pub id: DatabaseId,
84 pub oid: u32,
85 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
86 pub schemas_by_id: BTreeMap<SchemaId, Schema>,
87 pub schemas_by_name: BTreeMap<String, SchemaId>,
88 pub owner_id: RoleId,
89 pub privileges: PrivilegeMap,
90}
91
92impl From<Database> for durable::Database {
93 fn from(database: Database) -> durable::Database {
94 durable::Database {
95 id: database.id,
96 oid: database.oid,
97 name: database.name,
98 owner_id: database.owner_id,
99 privileges: database.privileges.into_all_values().collect(),
100 }
101 }
102}
103
104impl From<durable::Database> for Database {
105 fn from(
106 durable::Database {
107 id,
108 oid,
109 name,
110 owner_id,
111 privileges,
112 }: durable::Database,
113 ) -> Database {
114 Database {
115 id,
116 oid,
117 schemas_by_id: BTreeMap::new(),
118 schemas_by_name: BTreeMap::new(),
119 name,
120 owner_id,
121 privileges: PrivilegeMap::from_mz_acl_items(privileges),
122 }
123 }
124}
125
126impl UpdateFrom<durable::Database> for Database {
127 fn update_from(
128 &mut self,
129 durable::Database {
130 id,
131 oid,
132 name,
133 owner_id,
134 privileges,
135 }: durable::Database,
136 ) {
137 self.id = id;
138 self.oid = oid;
139 self.name = name;
140 self.owner_id = owner_id;
141 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
142 }
143}
144
145#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
146pub struct Schema {
147 pub name: QualifiedSchemaName,
148 pub id: SchemaSpecifier,
149 pub oid: u32,
150 pub items: BTreeMap<String, CatalogItemId>,
151 pub functions: BTreeMap<String, CatalogItemId>,
152 pub types: BTreeMap<String, CatalogItemId>,
153 pub owner_id: RoleId,
154 pub privileges: PrivilegeMap,
155}
156
157impl From<Schema> for durable::Schema {
158 fn from(schema: Schema) -> durable::Schema {
159 durable::Schema {
160 id: schema.id.into(),
161 oid: schema.oid,
162 name: schema.name.schema,
163 database_id: schema.name.database.id(),
164 owner_id: schema.owner_id,
165 privileges: schema.privileges.into_all_values().collect(),
166 }
167 }
168}
169
170impl From<durable::Schema> for Schema {
171 fn from(
172 durable::Schema {
173 id,
174 oid,
175 name,
176 database_id,
177 owner_id,
178 privileges,
179 }: durable::Schema,
180 ) -> Schema {
181 Schema {
182 name: QualifiedSchemaName {
183 database: database_id.into(),
184 schema: name,
185 },
186 id: id.into(),
187 oid,
188 items: BTreeMap::new(),
189 functions: BTreeMap::new(),
190 types: BTreeMap::new(),
191 owner_id,
192 privileges: PrivilegeMap::from_mz_acl_items(privileges),
193 }
194 }
195}
196
197impl UpdateFrom<durable::Schema> for Schema {
198 fn update_from(
199 &mut self,
200 durable::Schema {
201 id,
202 oid,
203 name,
204 database_id,
205 owner_id,
206 privileges,
207 }: durable::Schema,
208 ) {
209 self.name = QualifiedSchemaName {
210 database: database_id.into(),
211 schema: name,
212 };
213 self.id = id.into();
214 self.oid = oid;
215 self.owner_id = owner_id;
216 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
217 }
218}
219
220#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
221pub struct Role {
222 pub name: String,
223 pub id: RoleId,
224 pub oid: u32,
225 pub attributes: RoleAttributes,
226 pub membership: RoleMembership,
227 pub vars: RoleVars,
228}
229
230impl Role {
231 pub fn is_user(&self) -> bool {
232 self.id.is_user()
233 }
234
235 pub fn vars<'a>(&'a self) -> impl Iterator<Item = (&'a str, &'a OwnedVarInput)> {
236 self.vars.map.iter().map(|(name, val)| (name.as_str(), val))
237 }
238}
239
240impl From<Role> for durable::Role {
241 fn from(role: Role) -> durable::Role {
242 durable::Role {
243 id: role.id,
244 oid: role.oid,
245 name: role.name,
246 attributes: role.attributes,
247 membership: role.membership,
248 vars: role.vars,
249 }
250 }
251}
252
253impl From<durable::Role> for Role {
254 fn from(
255 durable::Role {
256 id,
257 oid,
258 name,
259 attributes,
260 membership,
261 vars,
262 }: durable::Role,
263 ) -> Self {
264 Role {
265 name,
266 id,
267 oid,
268 attributes,
269 membership,
270 vars,
271 }
272 }
273}
274
275impl UpdateFrom<durable::Role> for Role {
276 fn update_from(
277 &mut self,
278 durable::Role {
279 id,
280 oid,
281 name,
282 attributes,
283 membership,
284 vars,
285 }: durable::Role,
286 ) {
287 self.id = id;
288 self.oid = oid;
289 self.name = name;
290 self.attributes = attributes;
291 self.membership = membership;
292 self.vars = vars;
293 }
294}
295
296#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
297pub struct RoleAuth {
298 pub role_id: RoleId,
299 pub password_hash: Option<String>,
300 pub updated_at: u64,
301}
302
303impl From<RoleAuth> for durable::RoleAuth {
304 fn from(role_auth: RoleAuth) -> durable::RoleAuth {
305 durable::RoleAuth {
306 role_id: role_auth.role_id,
307 password_hash: role_auth.password_hash,
308 updated_at: role_auth.updated_at,
309 }
310 }
311}
312
313impl From<durable::RoleAuth> for RoleAuth {
314 fn from(
315 durable::RoleAuth {
316 role_id,
317 password_hash,
318 updated_at,
319 }: durable::RoleAuth,
320 ) -> RoleAuth {
321 RoleAuth {
322 role_id,
323 password_hash,
324 updated_at,
325 }
326 }
327}
328
329impl UpdateFrom<durable::RoleAuth> for RoleAuth {
330 fn update_from(&mut self, from: durable::RoleAuth) {
331 self.role_id = from.role_id;
332 self.password_hash = from.password_hash;
333 }
334}
335
336#[derive(Debug, Serialize, Clone, PartialEq)]
337pub struct Cluster {
338 pub name: String,
339 pub id: ClusterId,
340 pub config: ClusterConfig,
341 #[serde(skip)]
342 pub log_indexes: BTreeMap<LogVariant, GlobalId>,
343 pub bound_objects: BTreeSet<CatalogItemId>,
346 pub replica_id_by_name_: BTreeMap<String, ReplicaId>,
347 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
348 pub replicas_by_id_: BTreeMap<ReplicaId, ClusterReplica>,
349 pub owner_id: RoleId,
350 pub privileges: PrivilegeMap,
351}
352
353impl Cluster {
354 pub fn role(&self) -> ClusterRole {
356 if self.name == MZ_SYSTEM_CLUSTER.name {
359 ClusterRole::SystemCritical
360 } else if self.name == MZ_CATALOG_SERVER_CLUSTER.name {
361 ClusterRole::System
362 } else {
363 ClusterRole::User
364 }
365 }
366
367 pub fn is_managed(&self) -> bool {
369 matches!(self.config.variant, ClusterVariant::Managed { .. })
370 }
371
372 pub fn user_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
374 self.replicas().filter(|r| !r.config.location.internal())
375 }
376
377 pub fn replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
379 self.replicas_by_id_.values()
380 }
381
382 pub fn replica(&self, replica_id: ReplicaId) -> Option<&ClusterReplica> {
384 self.replicas_by_id_.get(&replica_id)
385 }
386
387 pub fn replica_id(&self, name: &str) -> Option<ReplicaId> {
389 self.replica_id_by_name_.get(name).copied()
390 }
391
392 pub fn availability_zones(&self) -> Option<&[String]> {
394 match &self.config.variant {
395 ClusterVariant::Managed(managed) => Some(&managed.availability_zones),
396 ClusterVariant::Unmanaged => None,
397 }
398 }
399
400 pub fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
401 let name = self.name.clone();
402 let variant = match &self.config.variant {
403 ClusterVariant::Managed(ClusterVariantManaged {
404 size,
405 availability_zones,
406 logging,
407 replication_factor,
408 optimizer_feature_overrides,
409 schedule,
410 }) => {
411 let introspection = match logging {
412 ReplicaLogging {
413 log_logging,
414 interval: Some(interval),
415 } => Some(ComputeReplicaIntrospectionConfig {
416 debugging: *log_logging,
417 interval: interval.clone(),
418 }),
419 ReplicaLogging {
420 log_logging: _,
421 interval: None,
422 } => None,
423 };
424 let compute = ComputeReplicaConfig { introspection };
425 CreateClusterVariant::Managed(CreateClusterManagedPlan {
426 replication_factor: replication_factor.clone(),
427 size: size.clone(),
428 availability_zones: availability_zones.clone(),
429 compute,
430 optimizer_feature_overrides: optimizer_feature_overrides.clone(),
431 schedule: schedule.clone(),
432 })
433 }
434 ClusterVariant::Unmanaged => {
435 return Err(PlanError::Unsupported {
438 feature: "SHOW CREATE for unmanaged clusters".to_string(),
439 discussion_no: None,
440 });
441 }
442 };
443 let workload_class = self.config.workload_class.clone();
444 Ok(CreateClusterPlan {
445 name,
446 variant,
447 workload_class,
448 })
449 }
450}
451
452impl From<Cluster> for durable::Cluster {
453 fn from(cluster: Cluster) -> durable::Cluster {
454 durable::Cluster {
455 id: cluster.id,
456 name: cluster.name,
457 owner_id: cluster.owner_id,
458 privileges: cluster.privileges.into_all_values().collect(),
459 config: cluster.config.into(),
460 }
461 }
462}
463
464impl From<durable::Cluster> for Cluster {
465 fn from(
466 durable::Cluster {
467 id,
468 name,
469 owner_id,
470 privileges,
471 config,
472 }: durable::Cluster,
473 ) -> Self {
474 Cluster {
475 name: name.clone(),
476 id,
477 bound_objects: BTreeSet::new(),
478 log_indexes: BTreeMap::new(),
479 replica_id_by_name_: BTreeMap::new(),
480 replicas_by_id_: BTreeMap::new(),
481 owner_id,
482 privileges: PrivilegeMap::from_mz_acl_items(privileges),
483 config: config.into(),
484 }
485 }
486}
487
488impl UpdateFrom<durable::Cluster> for Cluster {
489 fn update_from(
490 &mut self,
491 durable::Cluster {
492 id,
493 name,
494 owner_id,
495 privileges,
496 config,
497 }: durable::Cluster,
498 ) {
499 self.id = id;
500 self.name = name;
501 self.owner_id = owner_id;
502 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
503 self.config = config.into();
504 }
505}
506
507#[derive(Debug, Serialize, Clone, PartialEq)]
508pub struct ClusterReplica {
509 pub name: String,
510 pub cluster_id: ClusterId,
511 pub replica_id: ReplicaId,
512 pub config: ReplicaConfig,
513 pub owner_id: RoleId,
514}
515
516impl From<ClusterReplica> for durable::ClusterReplica {
517 fn from(replica: ClusterReplica) -> durable::ClusterReplica {
518 durable::ClusterReplica {
519 cluster_id: replica.cluster_id,
520 replica_id: replica.replica_id,
521 name: replica.name,
522 config: replica.config.into(),
523 owner_id: replica.owner_id,
524 }
525 }
526}
527
528#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
529pub struct ClusterReplicaProcessStatus {
530 pub status: ClusterStatus,
531 pub time: DateTime<Utc>,
532}
533
534#[derive(Debug, Serialize, Clone, PartialEq)]
535pub struct SourceReferences {
536 pub updated_at: u64,
537 pub references: Vec<SourceReference>,
538}
539
540#[derive(Debug, Serialize, Clone, PartialEq)]
541pub struct SourceReference {
542 pub name: String,
543 pub namespace: Option<String>,
544 pub columns: Vec<String>,
545}
546
547impl From<SourceReference> for durable::SourceReference {
548 fn from(source_reference: SourceReference) -> durable::SourceReference {
549 durable::SourceReference {
550 name: source_reference.name,
551 namespace: source_reference.namespace,
552 columns: source_reference.columns,
553 }
554 }
555}
556
557impl SourceReferences {
558 pub fn to_durable(self, source_id: CatalogItemId) -> durable::SourceReferences {
559 durable::SourceReferences {
560 source_id,
561 updated_at: self.updated_at,
562 references: self.references.into_iter().map(Into::into).collect(),
563 }
564 }
565}
566
567impl From<durable::SourceReference> for SourceReference {
568 fn from(source_reference: durable::SourceReference) -> SourceReference {
569 SourceReference {
570 name: source_reference.name,
571 namespace: source_reference.namespace,
572 columns: source_reference.columns,
573 }
574 }
575}
576
577impl From<durable::SourceReferences> for SourceReferences {
578 fn from(source_references: durable::SourceReferences) -> SourceReferences {
579 SourceReferences {
580 updated_at: source_references.updated_at,
581 references: source_references
582 .references
583 .into_iter()
584 .map(|source_reference| source_reference.into())
585 .collect(),
586 }
587 }
588}
589
590impl From<mz_sql::plan::SourceReference> for SourceReference {
591 fn from(source_reference: mz_sql::plan::SourceReference) -> SourceReference {
592 SourceReference {
593 name: source_reference.name,
594 namespace: source_reference.namespace,
595 columns: source_reference.columns,
596 }
597 }
598}
599
600impl From<mz_sql::plan::SourceReferences> for SourceReferences {
601 fn from(source_references: mz_sql::plan::SourceReferences) -> SourceReferences {
602 SourceReferences {
603 updated_at: source_references.updated_at,
604 references: source_references
605 .references
606 .into_iter()
607 .map(|source_reference| source_reference.into())
608 .collect(),
609 }
610 }
611}
612
613impl From<SourceReferences> for mz_sql::plan::SourceReferences {
614 fn from(source_references: SourceReferences) -> mz_sql::plan::SourceReferences {
615 mz_sql::plan::SourceReferences {
616 updated_at: source_references.updated_at,
617 references: source_references
618 .references
619 .into_iter()
620 .map(|source_reference| source_reference.into())
621 .collect(),
622 }
623 }
624}
625
626impl From<SourceReference> for mz_sql::plan::SourceReference {
627 fn from(source_reference: SourceReference) -> mz_sql::plan::SourceReference {
628 mz_sql::plan::SourceReference {
629 name: source_reference.name,
630 namespace: source_reference.namespace,
631 columns: source_reference.columns,
632 }
633 }
634}
635
636#[derive(Clone, Debug, Serialize)]
637pub struct CatalogEntry {
638 pub item: CatalogItem,
639 #[serde(skip)]
640 pub referenced_by: Vec<CatalogItemId>,
641 #[serde(skip)]
645 pub used_by: Vec<CatalogItemId>,
646 pub id: CatalogItemId,
647 pub oid: u32,
648 pub name: QualifiedItemName,
649 pub owner_id: RoleId,
650 pub privileges: PrivilegeMap,
651}
652
653#[derive(Clone, Debug)]
671pub struct CatalogCollectionEntry {
672 pub entry: CatalogEntry,
673 pub version: RelationVersionSelector,
674}
675
676impl CatalogCollectionEntry {
677 pub fn desc(&self, name: &FullItemName) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
678 self.item().desc(name, self.version)
679 }
680
681 pub fn desc_opt(&self) -> Option<Cow<'_, RelationDesc>> {
682 self.item().desc_opt(self.version)
683 }
684}
685
686impl mz_sql::catalog::CatalogCollectionItem for CatalogCollectionEntry {
687 fn desc(&self, name: &FullItemName) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
688 CatalogCollectionEntry::desc(self, name)
689 }
690
691 fn global_id(&self) -> GlobalId {
692 self.entry
693 .item()
694 .global_id_for_version(self.version)
695 .expect("catalog corruption, missing version!")
696 }
697}
698
699impl Deref for CatalogCollectionEntry {
700 type Target = CatalogEntry;
701
702 fn deref(&self) -> &CatalogEntry {
703 &self.entry
704 }
705}
706
707impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry {
708 fn name(&self) -> &QualifiedItemName {
709 self.entry.name()
710 }
711
712 fn id(&self) -> CatalogItemId {
713 self.entry.id()
714 }
715
716 fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
717 Box::new(self.entry.global_ids())
718 }
719
720 fn oid(&self) -> u32 {
721 self.entry.oid()
722 }
723
724 fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
725 self.entry.func()
726 }
727
728 fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
729 self.entry.source_desc()
730 }
731
732 fn connection(
733 &self,
734 ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
735 {
736 mz_sql::catalog::CatalogItem::connection(&self.entry)
737 }
738
739 fn create_sql(&self) -> &str {
740 self.entry.create_sql()
741 }
742
743 fn item_type(&self) -> SqlCatalogItemType {
744 self.entry.item_type()
745 }
746
747 fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
748 self.entry.index_details()
749 }
750
751 fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
752 self.entry.writable_table_details()
753 }
754
755 fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
756 self.entry.type_details()
757 }
758
759 fn references(&self) -> &ResolvedIds {
760 self.entry.references()
761 }
762
763 fn uses(&self) -> BTreeSet<CatalogItemId> {
764 self.entry.uses()
765 }
766
767 fn referenced_by(&self) -> &[CatalogItemId] {
768 self.entry.referenced_by()
769 }
770
771 fn used_by(&self) -> &[CatalogItemId] {
772 self.entry.used_by()
773 }
774
775 fn subsource_details(
776 &self,
777 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
778 self.entry.subsource_details()
779 }
780
781 fn source_export_details(
782 &self,
783 ) -> Option<(
784 CatalogItemId,
785 &UnresolvedItemName,
786 &SourceExportDetails,
787 &SourceExportDataConfig<ReferencedConnection>,
788 )> {
789 self.entry.source_export_details()
790 }
791
792 fn is_progress_source(&self) -> bool {
793 self.entry.is_progress_source()
794 }
795
796 fn progress_id(&self) -> Option<CatalogItemId> {
797 self.entry.progress_id()
798 }
799
800 fn owner_id(&self) -> RoleId {
801 *self.entry.owner_id()
802 }
803
804 fn privileges(&self) -> &PrivilegeMap {
805 self.entry.privileges()
806 }
807
808 fn cluster_id(&self) -> Option<ClusterId> {
809 self.entry.item().cluster_id()
810 }
811
812 fn at_version(
813 &self,
814 version: RelationVersionSelector,
815 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
816 Box::new(CatalogCollectionEntry {
817 entry: self.entry.clone(),
818 version,
819 })
820 }
821
822 fn latest_version(&self) -> Option<RelationVersion> {
823 self.entry.latest_version()
824 }
825}
826
827#[derive(Debug, Clone, Serialize)]
828pub enum CatalogItem {
829 Table(Table),
830 Source(Source),
831 Log(Log),
832 View(View),
833 MaterializedView(MaterializedView),
834 Sink(Sink),
835 Index(Index),
836 Type(Type),
837 Func(Func),
838 Secret(Secret),
839 Connection(Connection),
840 ContinualTask(ContinualTask),
841}
842
843impl From<CatalogEntry> for durable::Item {
844 fn from(entry: CatalogEntry) -> durable::Item {
845 let (create_sql, global_id, extra_versions) = entry.item.into_serialized();
846 durable::Item {
847 id: entry.id,
848 oid: entry.oid,
849 global_id,
850 schema_id: entry.name.qualifiers.schema_spec.into(),
851 name: entry.name.item,
852 create_sql,
853 owner_id: entry.owner_id,
854 privileges: entry.privileges.into_all_values().collect(),
855 extra_versions,
856 }
857 }
858}
859
860#[derive(Debug, Clone, Serialize)]
861pub struct Table {
862 pub create_sql: Option<String>,
864 pub desc: VersionedRelationDesc,
866 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
868 pub collections: BTreeMap<RelationVersion, GlobalId>,
869 #[serde(skip)]
871 pub conn_id: Option<ConnectionId>,
872 pub resolved_ids: ResolvedIds,
874 pub custom_logical_compaction_window: Option<CompactionWindow>,
876 pub is_retained_metrics_object: bool,
881 pub data_source: TableDataSource,
883}
884
885impl Table {
886 pub fn timeline(&self) -> Timeline {
887 match &self.data_source {
888 TableDataSource::TableWrites { .. } => Timeline::EpochMilliseconds,
891 TableDataSource::DataSource { timeline, .. } => timeline.clone(),
892 }
893 }
894
895 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
897 self.collections.values().copied()
898 }
899
900 pub fn global_id_writes(&self) -> GlobalId {
902 *self
903 .collections
904 .last_key_value()
905 .expect("at least one version of a table")
906 .1
907 }
908
909 pub fn collection_descs(
911 &self,
912 ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
913 self.collections.iter().map(|(version, gid)| {
914 let desc = self
915 .desc
916 .at_version(RelationVersionSelector::Specific(*version));
917 (*gid, *version, desc)
918 })
919 }
920
921 pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
923 let (version, _gid) = self
924 .collections
925 .iter()
926 .find(|(_version, gid)| *gid == id)
927 .expect("GlobalId to exist");
928 self.desc
929 .at_version(RelationVersionSelector::Specific(*version))
930 }
931}
932
933#[derive(Clone, Debug, Serialize)]
934pub enum TableDataSource {
935 TableWrites {
937 #[serde(skip)]
938 defaults: Vec<Expr<Aug>>,
939 },
940
941 DataSource {
944 desc: DataSourceDesc,
945 timeline: Timeline,
946 },
947}
948
949#[derive(Debug, Clone, Serialize)]
950pub enum DataSourceDesc {
951 Ingestion {
953 desc: SourceDesc<ReferencedConnection>,
954 cluster_id: ClusterId,
955 },
956 OldSyntaxIngestion {
958 desc: SourceDesc<ReferencedConnection>,
959 cluster_id: ClusterId,
960 progress_subsource: CatalogItemId,
963 data_config: SourceExportDataConfig<ReferencedConnection>,
964 details: SourceExportDetails,
965 },
966 IngestionExport {
974 ingestion_id: CatalogItemId,
975 external_reference: UnresolvedItemName,
976 details: SourceExportDetails,
977 data_config: SourceExportDataConfig<ReferencedConnection>,
978 },
979 Introspection(IntrospectionType),
981 Progress,
983 Webhook {
985 validate_using: Option<WebhookValidation>,
987 body_format: WebhookBodyFormat,
989 headers: WebhookHeaders,
991 cluster_id: ClusterId,
993 },
994}
995
996impl DataSourceDesc {
997 pub fn formats(&self) -> (Option<&str>, Option<&str>) {
999 match &self {
1000 DataSourceDesc::Ingestion { .. } => (None, None),
1001 DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1002 match &data_config.encoding.as_ref() {
1003 Some(encoding) => match &encoding.key {
1004 Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1005 None => (None, Some(encoding.value.type_())),
1006 },
1007 None => (None, None),
1008 }
1009 }
1010 DataSourceDesc::IngestionExport { data_config, .. } => match &data_config.encoding {
1011 Some(encoding) => match &encoding.key {
1012 Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1013 None => (None, Some(encoding.value.type_())),
1014 },
1015 None => (None, None),
1016 },
1017 DataSourceDesc::Introspection(_)
1018 | DataSourceDesc::Webhook { .. }
1019 | DataSourceDesc::Progress => (None, None),
1020 }
1021 }
1022
1023 pub fn envelope(&self) -> Option<&str> {
1025 fn envelope_string(envelope: &SourceEnvelope) -> &str {
1030 match envelope {
1031 SourceEnvelope::None(_) => "none",
1032 SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
1033 mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert",
1034 mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
1035 "debezium"
1039 }
1040 mz_storage_types::sources::envelope::UpsertStyle::ValueErrInline { .. } => {
1041 "upsert-value-err-inline"
1042 }
1043 },
1044 SourceEnvelope::CdcV2 => {
1045 "materialize"
1048 }
1049 }
1050 }
1051
1052 match self {
1053 DataSourceDesc::Ingestion { .. } => None,
1058 DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1059 Some(envelope_string(&data_config.envelope))
1060 }
1061 DataSourceDesc::IngestionExport { data_config, .. } => {
1062 Some(envelope_string(&data_config.envelope))
1063 }
1064 DataSourceDesc::Introspection(_)
1065 | DataSourceDesc::Webhook { .. }
1066 | DataSourceDesc::Progress => None,
1067 }
1068 }
1069}
1070
1071#[derive(Debug, Clone, Serialize)]
1072pub struct Source {
1073 pub create_sql: Option<String>,
1075 pub global_id: GlobalId,
1077 #[serde(skip)]
1079 pub data_source: DataSourceDesc,
1080 pub desc: RelationDesc,
1082 pub timeline: Timeline,
1084 pub resolved_ids: ResolvedIds,
1086 pub custom_logical_compaction_window: Option<CompactionWindow>,
1090 pub is_retained_metrics_object: bool,
1093}
1094
1095impl Source {
1096 pub fn new(
1103 plan: CreateSourcePlan,
1104 global_id: GlobalId,
1105 resolved_ids: ResolvedIds,
1106 custom_logical_compaction_window: Option<CompactionWindow>,
1107 is_retained_metrics_object: bool,
1108 ) -> Source {
1109 Source {
1110 create_sql: Some(plan.source.create_sql),
1111 data_source: match plan.source.data_source {
1112 mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1113 desc,
1114 cluster_id: plan
1115 .in_cluster
1116 .expect("ingestion-based sources must be given a cluster ID"),
1117 },
1118 mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1119 desc,
1120 progress_subsource,
1121 data_config,
1122 details,
1123 } => DataSourceDesc::OldSyntaxIngestion {
1124 desc,
1125 cluster_id: plan
1126 .in_cluster
1127 .expect("ingestion-based sources must be given a cluster ID"),
1128 progress_subsource,
1129 data_config,
1130 details,
1131 },
1132 mz_sql::plan::DataSourceDesc::Progress => {
1133 assert!(
1134 plan.in_cluster.is_none(),
1135 "subsources must not have a host config or cluster_id defined"
1136 );
1137 DataSourceDesc::Progress
1138 }
1139 mz_sql::plan::DataSourceDesc::IngestionExport {
1140 ingestion_id,
1141 external_reference,
1142 details,
1143 data_config,
1144 } => {
1145 assert!(
1146 plan.in_cluster.is_none(),
1147 "subsources must not have a host config or cluster_id defined"
1148 );
1149 DataSourceDesc::IngestionExport {
1150 ingestion_id,
1151 external_reference,
1152 details,
1153 data_config,
1154 }
1155 }
1156 mz_sql::plan::DataSourceDesc::Webhook {
1157 validate_using,
1158 body_format,
1159 headers,
1160 cluster_id,
1161 } => {
1162 mz_ore::soft_assert_or_log!(
1163 cluster_id.is_none(),
1164 "cluster_id set at Source level for Webhooks"
1165 );
1166 DataSourceDesc::Webhook {
1167 validate_using,
1168 body_format,
1169 headers,
1170 cluster_id: plan
1171 .in_cluster
1172 .expect("webhook sources must be given a cluster ID"),
1173 }
1174 }
1175 },
1176 desc: plan.source.desc,
1177 global_id,
1178 timeline: plan.timeline,
1179 resolved_ids,
1180 custom_logical_compaction_window: plan
1181 .source
1182 .compaction_window
1183 .or(custom_logical_compaction_window),
1184 is_retained_metrics_object,
1185 }
1186 }
1187
1188 pub fn source_type(&self) -> &str {
1190 match &self.data_source {
1191 DataSourceDesc::Ingestion { desc, .. }
1192 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.name(),
1193 DataSourceDesc::Progress => "progress",
1194 DataSourceDesc::IngestionExport { .. } => "subsource",
1195 DataSourceDesc::Introspection(_) => "source",
1196 DataSourceDesc::Webhook { .. } => "webhook",
1197 }
1198 }
1199
1200 pub fn connection_id(&self) -> Option<CatalogItemId> {
1202 match &self.data_source {
1203 DataSourceDesc::Ingestion { desc, .. }
1204 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.connection_id(),
1205 DataSourceDesc::IngestionExport { .. }
1206 | DataSourceDesc::Introspection(_)
1207 | DataSourceDesc::Webhook { .. }
1208 | DataSourceDesc::Progress => None,
1209 }
1210 }
1211
1212 pub fn global_id(&self) -> GlobalId {
1214 self.global_id
1215 }
1216
1217 pub fn user_controllable_persist_shard_count(&self) -> i64 {
1225 match &self.data_source {
1226 DataSourceDesc::Ingestion { .. } => 0,
1227 DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1228 match &desc.connection {
1229 GenericSourceConnection::Postgres(_)
1233 | GenericSourceConnection::MySql(_)
1234 | GenericSourceConnection::SqlServer(_) => 0,
1235 GenericSourceConnection::LoadGenerator(lg) => match lg.load_generator {
1236 LoadGenerator::Clock
1238 | LoadGenerator::Counter { .. }
1239 | LoadGenerator::Datums
1240 | LoadGenerator::KeyValue(_) => 1,
1241 LoadGenerator::Auction
1242 | LoadGenerator::Marketing
1243 | LoadGenerator::Tpch { .. } => 0,
1244 },
1245 GenericSourceConnection::Kafka(_) => 1,
1246 }
1247 }
1248 DataSourceDesc::IngestionExport { .. } => 1,
1251 DataSourceDesc::Webhook { .. } => 1,
1252 DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => 0,
1255 }
1256 }
1257}
1258
1259#[derive(Debug, Clone, Serialize)]
1260pub struct Log {
1261 pub variant: LogVariant,
1263 pub global_id: GlobalId,
1265}
1266
1267impl Log {
1268 pub fn global_id(&self) -> GlobalId {
1270 self.global_id
1271 }
1272}
1273
1274#[derive(Debug, Clone, Serialize)]
1275pub struct Sink {
1276 pub create_sql: String,
1278 pub global_id: GlobalId,
1280 pub from: GlobalId,
1282 pub connection: StorageSinkConnection<ReferencedConnection>,
1284 pub envelope: SinkEnvelope,
1288 pub with_snapshot: bool,
1290 pub version: u64,
1292 pub resolved_ids: ResolvedIds,
1294 pub cluster_id: ClusterId,
1296}
1297
1298impl Sink {
1299 pub fn sink_type(&self) -> &str {
1300 self.connection.name()
1301 }
1302
1303 pub fn envelope(&self) -> Option<&str> {
1305 match &self.envelope {
1306 SinkEnvelope::Debezium => Some("debezium"),
1307 SinkEnvelope::Upsert => Some("upsert"),
1308 }
1309 }
1310
1311 pub fn combined_format(&self) -> Option<Cow<'_, str>> {
1316 match &self.connection {
1317 StorageSinkConnection::Kafka(connection) => Some(connection.format.get_format_name()),
1318 _ => None,
1319 }
1320 }
1321
1322 pub fn formats(&self) -> Option<(Option<&str>, &str)> {
1324 match &self.connection {
1325 StorageSinkConnection::Kafka(connection) => {
1326 let key_format = connection
1327 .format
1328 .key_format
1329 .as_ref()
1330 .map(|f| f.get_format_name());
1331 let value_format = connection.format.value_format.get_format_name();
1332 Some((key_format, value_format))
1333 }
1334 _ => None,
1335 }
1336 }
1337
1338 pub fn connection_id(&self) -> Option<CatalogItemId> {
1339 self.connection.connection_id()
1340 }
1341
1342 pub fn global_id(&self) -> GlobalId {
1344 self.global_id
1345 }
1346}
1347
1348#[derive(Debug, Clone, Serialize)]
1349pub struct View {
1350 pub create_sql: String,
1352 pub global_id: GlobalId,
1354 pub raw_expr: Arc<HirRelationExpr>,
1356 pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1358 pub desc: RelationDesc,
1360 pub conn_id: Option<ConnectionId>,
1362 pub resolved_ids: ResolvedIds,
1364 pub dependencies: DependencyIds,
1366}
1367
1368impl View {
1369 pub fn global_id(&self) -> GlobalId {
1371 self.global_id
1372 }
1373}
1374
1375#[derive(Debug, Clone, Serialize)]
1376pub struct MaterializedView {
1377 pub create_sql: String,
1379 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
1381 pub collections: BTreeMap<RelationVersion, GlobalId>,
1382 pub raw_expr: Arc<HirRelationExpr>,
1384 pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1386 pub desc: VersionedRelationDesc,
1388 pub resolved_ids: ResolvedIds,
1390 pub dependencies: DependencyIds,
1392 pub cluster_id: ClusterId,
1394 pub non_null_assertions: Vec<usize>,
1398 pub custom_logical_compaction_window: Option<CompactionWindow>,
1400 pub refresh_schedule: Option<RefreshSchedule>,
1402 pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1407}
1408
1409impl MaterializedView {
1410 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1412 self.collections.values().copied()
1413 }
1414
1415 pub fn global_id_writes(&self) -> GlobalId {
1418 *self
1419 .collections
1420 .last_key_value()
1421 .expect("at least one version of a materialized view")
1422 .1
1423 }
1424
1425 pub fn collection_descs(
1427 &self,
1428 ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
1429 self.collections.iter().map(|(version, gid)| {
1430 let desc = self
1431 .desc
1432 .at_version(RelationVersionSelector::Specific(*version));
1433 (*gid, *version, desc)
1434 })
1435 }
1436
1437 pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
1439 let (version, _gid) = self
1440 .collections
1441 .iter()
1442 .find(|(_version, gid)| *gid == id)
1443 .expect("GlobalId to exist");
1444 self.desc
1445 .at_version(RelationVersionSelector::Specific(*version))
1446 }
1447}
1448
1449#[derive(Debug, Clone, Serialize)]
1450pub struct Index {
1451 pub create_sql: String,
1453 pub global_id: GlobalId,
1455 pub on: GlobalId,
1457 pub keys: Arc<[MirScalarExpr]>,
1459 pub conn_id: Option<ConnectionId>,
1461 pub resolved_ids: ResolvedIds,
1463 pub cluster_id: ClusterId,
1465 pub custom_logical_compaction_window: Option<CompactionWindow>,
1467 pub is_retained_metrics_object: bool,
1472}
1473
1474impl Index {
1475 pub fn global_id(&self) -> GlobalId {
1477 self.global_id
1478 }
1479}
1480
1481#[derive(Debug, Clone, Serialize)]
1482pub struct Type {
1483 pub create_sql: Option<String>,
1485 pub global_id: GlobalId,
1487 #[serde(skip)]
1488 pub details: CatalogTypeDetails<IdReference>,
1489 pub desc: Option<RelationDesc>,
1490 pub resolved_ids: ResolvedIds,
1492}
1493
1494#[derive(Debug, Clone, Serialize)]
1495pub struct Func {
1496 #[serde(skip)]
1498 pub inner: &'static mz_sql::func::Func,
1499 pub global_id: GlobalId,
1501}
1502
1503#[derive(Debug, Clone, Serialize)]
1504pub struct Secret {
1505 pub create_sql: String,
1507 pub global_id: GlobalId,
1509}
1510
1511#[derive(Debug, Clone, Serialize)]
1512pub struct Connection {
1513 pub create_sql: String,
1515 pub global_id: GlobalId,
1517 pub details: ConnectionDetails,
1519 pub resolved_ids: ResolvedIds,
1521}
1522
1523impl Connection {
1524 pub fn global_id(&self) -> GlobalId {
1526 self.global_id
1527 }
1528}
1529
1530#[derive(Debug, Clone, Serialize)]
1531pub struct ContinualTask {
1532 pub create_sql: String,
1534 pub global_id: GlobalId,
1536 pub input_id: GlobalId,
1538 pub with_snapshot: bool,
1539 pub raw_expr: Arc<HirRelationExpr>,
1544 pub desc: RelationDesc,
1546 pub resolved_ids: ResolvedIds,
1548 pub dependencies: DependencyIds,
1550 pub cluster_id: ClusterId,
1552 pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1554}
1555
1556impl ContinualTask {
1557 pub fn global_id(&self) -> GlobalId {
1559 self.global_id
1560 }
1561}
1562
1563#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
1564pub struct NetworkPolicy {
1565 pub name: String,
1566 pub id: NetworkPolicyId,
1567 pub oid: u32,
1568 pub rules: Vec<NetworkPolicyRule>,
1569 pub owner_id: RoleId,
1570 pub privileges: PrivilegeMap,
1571}
1572
1573impl From<NetworkPolicy> for durable::NetworkPolicy {
1574 fn from(policy: NetworkPolicy) -> durable::NetworkPolicy {
1575 durable::NetworkPolicy {
1576 id: policy.id,
1577 oid: policy.oid,
1578 name: policy.name,
1579 rules: policy.rules,
1580 owner_id: policy.owner_id,
1581 privileges: policy.privileges.into_all_values().collect(),
1582 }
1583 }
1584}
1585
1586impl From<durable::NetworkPolicy> for NetworkPolicy {
1587 fn from(
1588 durable::NetworkPolicy {
1589 id,
1590 oid,
1591 name,
1592 rules,
1593 owner_id,
1594 privileges,
1595 }: durable::NetworkPolicy,
1596 ) -> Self {
1597 NetworkPolicy {
1598 id,
1599 oid,
1600 name,
1601 rules,
1602 owner_id,
1603 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1604 }
1605 }
1606}
1607
1608impl UpdateFrom<durable::NetworkPolicy> for NetworkPolicy {
1609 fn update_from(
1610 &mut self,
1611 durable::NetworkPolicy {
1612 id,
1613 oid,
1614 name,
1615 rules,
1616 owner_id,
1617 privileges,
1618 }: durable::NetworkPolicy,
1619 ) {
1620 self.id = id;
1621 self.oid = oid;
1622 self.name = name;
1623 self.rules = rules;
1624 self.owner_id = owner_id;
1625 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1626 }
1627}
1628
1629impl CatalogItem {
1630 pub fn typ(&self) -> mz_sql::catalog::CatalogItemType {
1632 match self {
1633 CatalogItem::Table(_) => CatalogItemType::Table,
1634 CatalogItem::Source(_) => CatalogItemType::Source,
1635 CatalogItem::Log(_) => CatalogItemType::Source,
1636 CatalogItem::Sink(_) => CatalogItemType::Sink,
1637 CatalogItem::View(_) => CatalogItemType::View,
1638 CatalogItem::MaterializedView(_) => CatalogItemType::MaterializedView,
1639 CatalogItem::Index(_) => CatalogItemType::Index,
1640 CatalogItem::Type(_) => CatalogItemType::Type,
1641 CatalogItem::Func(_) => CatalogItemType::Func,
1642 CatalogItem::Secret(_) => CatalogItemType::Secret,
1643 CatalogItem::Connection(_) => CatalogItemType::Connection,
1644 CatalogItem::ContinualTask(_) => CatalogItemType::ContinualTask,
1645 }
1646 }
1647
1648 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1650 let gid = match self {
1651 CatalogItem::Source(source) => source.global_id,
1652 CatalogItem::Log(log) => log.global_id,
1653 CatalogItem::Sink(sink) => sink.global_id,
1654 CatalogItem::View(view) => view.global_id,
1655 CatalogItem::MaterializedView(mv) => {
1656 return itertools::Either::Left(mv.collections.values().copied());
1657 }
1658 CatalogItem::ContinualTask(ct) => ct.global_id,
1659 CatalogItem::Index(index) => index.global_id,
1660 CatalogItem::Func(func) => func.global_id,
1661 CatalogItem::Type(ty) => ty.global_id,
1662 CatalogItem::Secret(secret) => secret.global_id,
1663 CatalogItem::Connection(conn) => conn.global_id,
1664 CatalogItem::Table(table) => {
1665 return itertools::Either::Left(table.collections.values().copied());
1666 }
1667 };
1668 itertools::Either::Right(std::iter::once(gid))
1669 }
1670
1671 pub fn latest_global_id(&self) -> GlobalId {
1675 match self {
1676 CatalogItem::Source(source) => source.global_id,
1677 CatalogItem::Log(log) => log.global_id,
1678 CatalogItem::Sink(sink) => sink.global_id,
1679 CatalogItem::View(view) => view.global_id,
1680 CatalogItem::MaterializedView(mv) => mv.global_id_writes(),
1681 CatalogItem::ContinualTask(ct) => ct.global_id,
1682 CatalogItem::Index(index) => index.global_id,
1683 CatalogItem::Func(func) => func.global_id,
1684 CatalogItem::Type(ty) => ty.global_id,
1685 CatalogItem::Secret(secret) => secret.global_id,
1686 CatalogItem::Connection(conn) => conn.global_id,
1687 CatalogItem::Table(table) => table.global_id_writes(),
1688 }
1689 }
1690
1691 pub fn is_storage_collection(&self) -> bool {
1693 match self {
1694 CatalogItem::Table(_)
1695 | CatalogItem::Source(_)
1696 | CatalogItem::MaterializedView(_)
1697 | CatalogItem::Sink(_)
1698 | CatalogItem::ContinualTask(_) => true,
1699 CatalogItem::Log(_)
1700 | CatalogItem::View(_)
1701 | CatalogItem::Index(_)
1702 | CatalogItem::Type(_)
1703 | CatalogItem::Func(_)
1704 | CatalogItem::Secret(_)
1705 | CatalogItem::Connection(_) => false,
1706 }
1707 }
1708
1709 pub fn desc(
1710 &self,
1711 name: &FullItemName,
1712 version: RelationVersionSelector,
1713 ) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
1714 self.desc_opt(version)
1715 .ok_or_else(|| SqlCatalogError::InvalidDependency {
1716 name: name.to_string(),
1717 typ: self.typ(),
1718 })
1719 }
1720
1721 pub fn desc_opt(&self, version: RelationVersionSelector) -> Option<Cow<'_, RelationDesc>> {
1722 match &self {
1723 CatalogItem::Source(src) => Some(Cow::Borrowed(&src.desc)),
1724 CatalogItem::Log(log) => Some(Cow::Owned(log.variant.desc())),
1725 CatalogItem::Table(tbl) => Some(Cow::Owned(tbl.desc.at_version(version))),
1726 CatalogItem::View(view) => Some(Cow::Borrowed(&view.desc)),
1727 CatalogItem::MaterializedView(mview) => {
1728 Some(Cow::Owned(mview.desc.at_version(version)))
1729 }
1730 CatalogItem::Type(typ) => typ.desc.as_ref().map(Cow::Borrowed),
1731 CatalogItem::ContinualTask(ct) => Some(Cow::Borrowed(&ct.desc)),
1732 CatalogItem::Func(_)
1733 | CatalogItem::Index(_)
1734 | CatalogItem::Sink(_)
1735 | CatalogItem::Secret(_)
1736 | CatalogItem::Connection(_) => None,
1737 }
1738 }
1739
1740 pub fn func(
1741 &self,
1742 entry: &CatalogEntry,
1743 ) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
1744 match &self {
1745 CatalogItem::Func(func) => Ok(func.inner),
1746 _ => Err(SqlCatalogError::UnexpectedType {
1747 name: entry.name().item.to_string(),
1748 actual_type: entry.item_type(),
1749 expected_type: CatalogItemType::Func,
1750 }),
1751 }
1752 }
1753
1754 pub fn source_desc(
1755 &self,
1756 entry: &CatalogEntry,
1757 ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
1758 match &self {
1759 CatalogItem::Source(source) => match &source.data_source {
1760 DataSourceDesc::Ingestion { desc, .. }
1761 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => Ok(Some(desc)),
1762 DataSourceDesc::IngestionExport { .. }
1763 | DataSourceDesc::Introspection(_)
1764 | DataSourceDesc::Webhook { .. }
1765 | DataSourceDesc::Progress => Ok(None),
1766 },
1767 _ => Err(SqlCatalogError::UnexpectedType {
1768 name: entry.name().item.to_string(),
1769 actual_type: entry.item_type(),
1770 expected_type: CatalogItemType::Source,
1771 }),
1772 }
1773 }
1774
1775 pub fn is_progress_source(&self) -> bool {
1777 matches!(
1778 self,
1779 CatalogItem::Source(Source {
1780 data_source: DataSourceDesc::Progress,
1781 ..
1782 })
1783 )
1784 }
1785
1786 pub fn references(&self) -> &ResolvedIds {
1789 static EMPTY: LazyLock<ResolvedIds> = LazyLock::new(ResolvedIds::empty);
1790 match self {
1791 CatalogItem::Func(_) => &*EMPTY,
1792 CatalogItem::Index(idx) => &idx.resolved_ids,
1793 CatalogItem::Sink(sink) => &sink.resolved_ids,
1794 CatalogItem::Source(source) => &source.resolved_ids,
1795 CatalogItem::Log(_) => &*EMPTY,
1796 CatalogItem::Table(table) => &table.resolved_ids,
1797 CatalogItem::Type(typ) => &typ.resolved_ids,
1798 CatalogItem::View(view) => &view.resolved_ids,
1799 CatalogItem::MaterializedView(mview) => &mview.resolved_ids,
1800 CatalogItem::Secret(_) => &*EMPTY,
1801 CatalogItem::Connection(connection) => &connection.resolved_ids,
1802 CatalogItem::ContinualTask(ct) => &ct.resolved_ids,
1803 }
1804 }
1805
1806 pub fn uses(&self) -> BTreeSet<CatalogItemId> {
1812 let mut uses: BTreeSet<_> = self.references().items().copied().collect();
1813 match self {
1814 CatalogItem::Func(_) => {}
1817 CatalogItem::Index(_) => {}
1818 CatalogItem::Sink(_) => {}
1819 CatalogItem::Source(_) => {}
1820 CatalogItem::Log(_) => {}
1821 CatalogItem::Table(_) => {}
1822 CatalogItem::Type(_) => {}
1823 CatalogItem::View(view) => uses.extend(view.dependencies.0.iter().copied()),
1824 CatalogItem::MaterializedView(mview) => {
1825 uses.extend(mview.dependencies.0.iter().copied())
1826 }
1827 CatalogItem::ContinualTask(ct) => uses.extend(ct.dependencies.0.iter().copied()),
1828 CatalogItem::Secret(_) => {}
1829 CatalogItem::Connection(_) => {}
1830 }
1831 uses
1832 }
1833
1834 pub fn conn_id(&self) -> Option<&ConnectionId> {
1837 match self {
1838 CatalogItem::View(view) => view.conn_id.as_ref(),
1839 CatalogItem::Index(index) => index.conn_id.as_ref(),
1840 CatalogItem::Table(table) => table.conn_id.as_ref(),
1841 CatalogItem::Log(_)
1842 | CatalogItem::Source(_)
1843 | CatalogItem::Sink(_)
1844 | CatalogItem::MaterializedView(_)
1845 | CatalogItem::Secret(_)
1846 | CatalogItem::Type(_)
1847 | CatalogItem::Func(_)
1848 | CatalogItem::Connection(_)
1849 | CatalogItem::ContinualTask(_) => None,
1850 }
1851 }
1852
1853 pub fn is_temporary(&self) -> bool {
1855 self.conn_id().is_some()
1856 }
1857
1858 pub fn rename_schema_refs(
1859 &self,
1860 database_name: &str,
1861 cur_schema_name: &str,
1862 new_schema_name: &str,
1863 ) -> Result<CatalogItem, (String, String)> {
1864 let do_rewrite = |create_sql: String| -> Result<String, (String, String)> {
1865 let mut create_stmt = mz_sql::parse::parse(&create_sql)
1866 .expect("invalid create sql persisted to catalog")
1867 .into_element()
1868 .ast;
1869
1870 mz_sql::ast::transform::create_stmt_rename_schema_refs(
1872 &mut create_stmt,
1873 database_name,
1874 cur_schema_name,
1875 new_schema_name,
1876 )?;
1877
1878 Ok(create_stmt.to_ast_string_stable())
1879 };
1880
1881 match self {
1882 CatalogItem::Table(i) => {
1883 let mut i = i.clone();
1884 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1885 Ok(CatalogItem::Table(i))
1886 }
1887 CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
1888 CatalogItem::Source(i) => {
1889 let mut i = i.clone();
1890 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1891 Ok(CatalogItem::Source(i))
1892 }
1893 CatalogItem::Sink(i) => {
1894 let mut i = i.clone();
1895 i.create_sql = do_rewrite(i.create_sql)?;
1896 Ok(CatalogItem::Sink(i))
1897 }
1898 CatalogItem::View(i) => {
1899 let mut i = i.clone();
1900 i.create_sql = do_rewrite(i.create_sql)?;
1901 Ok(CatalogItem::View(i))
1902 }
1903 CatalogItem::MaterializedView(i) => {
1904 let mut i = i.clone();
1905 i.create_sql = do_rewrite(i.create_sql)?;
1906 Ok(CatalogItem::MaterializedView(i))
1907 }
1908 CatalogItem::Index(i) => {
1909 let mut i = i.clone();
1910 i.create_sql = do_rewrite(i.create_sql)?;
1911 Ok(CatalogItem::Index(i))
1912 }
1913 CatalogItem::Secret(i) => {
1914 let mut i = i.clone();
1915 i.create_sql = do_rewrite(i.create_sql)?;
1916 Ok(CatalogItem::Secret(i))
1917 }
1918 CatalogItem::Connection(i) => {
1919 let mut i = i.clone();
1920 i.create_sql = do_rewrite(i.create_sql)?;
1921 Ok(CatalogItem::Connection(i))
1922 }
1923 CatalogItem::Type(i) => {
1924 let mut i = i.clone();
1925 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1926 Ok(CatalogItem::Type(i))
1927 }
1928 CatalogItem::Func(i) => Ok(CatalogItem::Func(i.clone())),
1929 CatalogItem::ContinualTask(i) => {
1930 let mut i = i.clone();
1931 i.create_sql = do_rewrite(i.create_sql)?;
1932 Ok(CatalogItem::ContinualTask(i))
1933 }
1934 }
1935 }
1936
1937 pub fn rename_item_refs(
1941 &self,
1942 from: FullItemName,
1943 to_item_name: String,
1944 rename_self: bool,
1945 ) -> Result<CatalogItem, String> {
1946 let do_rewrite = |create_sql: String| -> Result<String, String> {
1947 let mut create_stmt = mz_sql::parse::parse(&create_sql)
1948 .expect("invalid create sql persisted to catalog")
1949 .into_element()
1950 .ast;
1951 if rename_self {
1952 mz_sql::ast::transform::create_stmt_rename(&mut create_stmt, to_item_name.clone());
1953 }
1954 mz_sql::ast::transform::create_stmt_rename_refs(&mut create_stmt, from, to_item_name)?;
1956 Ok(create_stmt.to_ast_string_stable())
1957 };
1958
1959 match self {
1960 CatalogItem::Table(i) => {
1961 let mut i = i.clone();
1962 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1963 Ok(CatalogItem::Table(i))
1964 }
1965 CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
1966 CatalogItem::Source(i) => {
1967 let mut i = i.clone();
1968 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1969 Ok(CatalogItem::Source(i))
1970 }
1971 CatalogItem::Sink(i) => {
1972 let mut i = i.clone();
1973 i.create_sql = do_rewrite(i.create_sql)?;
1974 Ok(CatalogItem::Sink(i))
1975 }
1976 CatalogItem::View(i) => {
1977 let mut i = i.clone();
1978 i.create_sql = do_rewrite(i.create_sql)?;
1979 Ok(CatalogItem::View(i))
1980 }
1981 CatalogItem::MaterializedView(i) => {
1982 let mut i = i.clone();
1983 i.create_sql = do_rewrite(i.create_sql)?;
1984 Ok(CatalogItem::MaterializedView(i))
1985 }
1986 CatalogItem::Index(i) => {
1987 let mut i = i.clone();
1988 i.create_sql = do_rewrite(i.create_sql)?;
1989 Ok(CatalogItem::Index(i))
1990 }
1991 CatalogItem::Secret(i) => {
1992 let mut i = i.clone();
1993 i.create_sql = do_rewrite(i.create_sql)?;
1994 Ok(CatalogItem::Secret(i))
1995 }
1996 CatalogItem::Func(_) | CatalogItem::Type(_) => {
1997 unreachable!("{}s cannot be renamed", self.typ())
1998 }
1999 CatalogItem::Connection(i) => {
2000 let mut i = i.clone();
2001 i.create_sql = do_rewrite(i.create_sql)?;
2002 Ok(CatalogItem::Connection(i))
2003 }
2004 CatalogItem::ContinualTask(i) => {
2005 let mut i = i.clone();
2006 i.create_sql = do_rewrite(i.create_sql)?;
2007 Ok(CatalogItem::ContinualTask(i))
2008 }
2009 }
2010 }
2011
2012 pub fn update_retain_history(
2015 &mut self,
2016 value: Option<Value>,
2017 window: CompactionWindow,
2018 ) -> Result<Option<WithOptionValue<Raw>>, ()> {
2019 let update = |mut ast: &mut Statement<Raw>| {
2020 macro_rules! update_retain_history {
2022 ( $stmt:ident, $opt:ident, $name:ident ) => {{
2023 let pos = $stmt
2025 .with_options
2026 .iter()
2027 .rposition(|o| o.name == mz_sql_parser::ast::$name::RetainHistory);
2029 if let Some(value) = value {
2030 let next = mz_sql_parser::ast::$opt {
2031 name: mz_sql_parser::ast::$name::RetainHistory,
2032 value: Some(WithOptionValue::RetainHistoryFor(value)),
2033 };
2034 if let Some(idx) = pos {
2035 let previous = $stmt.with_options[idx].clone();
2036 $stmt.with_options[idx] = next;
2037 previous.value
2038 } else {
2039 $stmt.with_options.push(next);
2040 None
2041 }
2042 } else {
2043 if let Some(idx) = pos {
2044 $stmt.with_options.swap_remove(idx).value
2045 } else {
2046 None
2047 }
2048 }
2049 }};
2050 }
2051 let previous = match &mut ast {
2052 Statement::CreateTable(stmt) => {
2053 update_retain_history!(stmt, TableOption, TableOptionName)
2054 }
2055 Statement::CreateIndex(stmt) => {
2056 update_retain_history!(stmt, IndexOption, IndexOptionName)
2057 }
2058 Statement::CreateSource(stmt) => {
2059 update_retain_history!(stmt, CreateSourceOption, CreateSourceOptionName)
2060 }
2061 Statement::CreateMaterializedView(stmt) => {
2062 update_retain_history!(stmt, MaterializedViewOption, MaterializedViewOptionName)
2063 }
2064 _ => {
2065 return Err(());
2066 }
2067 };
2068 Ok(previous)
2069 };
2070
2071 let res = self.update_sql(update)?;
2072 let cw = self
2073 .custom_logical_compaction_window_mut()
2074 .expect("item must have compaction window");
2075 *cw = Some(window);
2076 Ok(res)
2077 }
2078
2079 pub fn add_column(
2080 &mut self,
2081 name: ColumnName,
2082 typ: SqlColumnType,
2083 sql: RawDataType,
2084 ) -> Result<RelationVersion, PlanError> {
2085 let CatalogItem::Table(table) = self else {
2086 return Err(PlanError::Unsupported {
2087 feature: "adding columns to a non-Table".to_string(),
2088 discussion_no: None,
2089 });
2090 };
2091 let next_version = table.desc.add_column(name.clone(), typ);
2092
2093 let update = |mut ast: &mut Statement<Raw>| match &mut ast {
2094 Statement::CreateTable(stmt) => {
2095 let version = ColumnOptionDef {
2096 name: None,
2097 option: ColumnOption::Versioned {
2098 action: ColumnVersioned::Added,
2099 version: next_version.into(),
2100 },
2101 };
2102 let column = ColumnDef {
2103 name: name.into(),
2104 data_type: sql,
2105 collation: None,
2106 options: vec![version],
2107 };
2108 stmt.columns.push(column);
2109 Ok(())
2110 }
2111 _ => Err(()),
2112 };
2113
2114 self.update_sql(update)
2115 .map_err(|()| PlanError::Unstructured("expected CREATE TABLE statement".to_string()))?;
2116 Ok(next_version)
2117 }
2118
2119 pub fn update_sql<F, T>(&mut self, f: F) -> Result<T, ()>
2122 where
2123 F: FnOnce(&mut Statement<Raw>) -> Result<T, ()>,
2124 {
2125 let create_sql = match self {
2126 CatalogItem::Table(Table { create_sql, .. })
2127 | CatalogItem::Type(Type { create_sql, .. })
2128 | CatalogItem::Source(Source { create_sql, .. }) => create_sql.as_mut(),
2129 CatalogItem::Sink(Sink { create_sql, .. })
2130 | CatalogItem::View(View { create_sql, .. })
2131 | CatalogItem::MaterializedView(MaterializedView { create_sql, .. })
2132 | CatalogItem::Index(Index { create_sql, .. })
2133 | CatalogItem::Secret(Secret { create_sql, .. })
2134 | CatalogItem::Connection(Connection { create_sql, .. })
2135 | CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => Some(create_sql),
2136 CatalogItem::Func(_) | CatalogItem::Log(_) => None,
2137 };
2138 let Some(create_sql) = create_sql else {
2139 return Err(());
2140 };
2141 let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
2142 .expect("non-system items must be parseable")
2143 .into_element()
2144 .ast;
2145 debug!("rewrite: {}", ast.to_ast_string_redacted());
2146 let t = f(&mut ast)?;
2147 *create_sql = ast.to_ast_string_stable();
2148 debug!("rewrote: {}", ast.to_ast_string_redacted());
2149 Ok(t)
2150 }
2151
2152 pub fn is_compute_object_on_cluster(&self) -> Option<ClusterId> {
2159 match self {
2160 CatalogItem::Index(index) => Some(index.cluster_id),
2161 CatalogItem::Table(_)
2162 | CatalogItem::Source(_)
2163 | CatalogItem::Log(_)
2164 | CatalogItem::View(_)
2165 | CatalogItem::MaterializedView(_)
2166 | CatalogItem::Sink(_)
2167 | CatalogItem::Type(_)
2168 | CatalogItem::Func(_)
2169 | CatalogItem::Secret(_)
2170 | CatalogItem::Connection(_)
2171 | CatalogItem::ContinualTask(_) => None,
2172 }
2173 }
2174
2175 pub fn cluster_id(&self) -> Option<ClusterId> {
2176 match self {
2177 CatalogItem::MaterializedView(mv) => Some(mv.cluster_id),
2178 CatalogItem::Index(index) => Some(index.cluster_id),
2179 CatalogItem::Source(source) => match &source.data_source {
2180 DataSourceDesc::Ingestion { cluster_id, .. }
2181 | DataSourceDesc::OldSyntaxIngestion { cluster_id, .. } => Some(*cluster_id),
2182 DataSourceDesc::IngestionExport { .. } => None,
2186 DataSourceDesc::Webhook { cluster_id, .. } => Some(*cluster_id),
2187 DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => None,
2188 },
2189 CatalogItem::Sink(sink) => Some(sink.cluster_id),
2190 CatalogItem::ContinualTask(ct) => Some(ct.cluster_id),
2191 CatalogItem::Table(_)
2192 | CatalogItem::Log(_)
2193 | CatalogItem::View(_)
2194 | CatalogItem::Type(_)
2195 | CatalogItem::Func(_)
2196 | CatalogItem::Secret(_)
2197 | CatalogItem::Connection(_) => None,
2198 }
2199 }
2200
2201 pub fn custom_logical_compaction_window(&self) -> Option<CompactionWindow> {
2204 match self {
2205 CatalogItem::Table(table) => table.custom_logical_compaction_window,
2206 CatalogItem::Source(source) => source.custom_logical_compaction_window,
2207 CatalogItem::Index(index) => index.custom_logical_compaction_window,
2208 CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
2209 CatalogItem::Log(_)
2210 | CatalogItem::View(_)
2211 | CatalogItem::Sink(_)
2212 | CatalogItem::Type(_)
2213 | CatalogItem::Func(_)
2214 | CatalogItem::Secret(_)
2215 | CatalogItem::Connection(_)
2216 | CatalogItem::ContinualTask(_) => None,
2217 }
2218 }
2219
2220 pub fn custom_logical_compaction_window_mut(
2224 &mut self,
2225 ) -> Option<&mut Option<CompactionWindow>> {
2226 let cw = match self {
2227 CatalogItem::Table(table) => &mut table.custom_logical_compaction_window,
2228 CatalogItem::Source(source) => &mut source.custom_logical_compaction_window,
2229 CatalogItem::Index(index) => &mut index.custom_logical_compaction_window,
2230 CatalogItem::MaterializedView(mview) => &mut mview.custom_logical_compaction_window,
2231 CatalogItem::Log(_)
2232 | CatalogItem::View(_)
2233 | CatalogItem::Sink(_)
2234 | CatalogItem::Type(_)
2235 | CatalogItem::Func(_)
2236 | CatalogItem::Secret(_)
2237 | CatalogItem::Connection(_)
2238 | CatalogItem::ContinualTask(_) => return None,
2239 };
2240 Some(cw)
2241 }
2242
2243 pub fn initial_logical_compaction_window(&self) -> Option<CompactionWindow> {
2251 let custom_logical_compaction_window = match self {
2252 CatalogItem::Table(_)
2253 | CatalogItem::Source(_)
2254 | CatalogItem::Index(_)
2255 | CatalogItem::MaterializedView(_)
2256 | CatalogItem::ContinualTask(_) => self.custom_logical_compaction_window(),
2257 CatalogItem::Log(_)
2258 | CatalogItem::View(_)
2259 | CatalogItem::Sink(_)
2260 | CatalogItem::Type(_)
2261 | CatalogItem::Func(_)
2262 | CatalogItem::Secret(_)
2263 | CatalogItem::Connection(_) => return None,
2264 };
2265 Some(custom_logical_compaction_window.unwrap_or(CompactionWindow::Default))
2266 }
2267
2268 pub fn is_retained_metrics_object(&self) -> bool {
2272 match self {
2273 CatalogItem::Table(table) => table.is_retained_metrics_object,
2274 CatalogItem::Source(source) => source.is_retained_metrics_object,
2275 CatalogItem::Index(index) => index.is_retained_metrics_object,
2276 CatalogItem::Log(_)
2277 | CatalogItem::View(_)
2278 | CatalogItem::MaterializedView(_)
2279 | CatalogItem::Sink(_)
2280 | CatalogItem::Type(_)
2281 | CatalogItem::Func(_)
2282 | CatalogItem::Secret(_)
2283 | CatalogItem::Connection(_)
2284 | CatalogItem::ContinualTask(_) => false,
2285 }
2286 }
2287
2288 pub fn to_serialized(&self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2289 match self {
2290 CatalogItem::Table(table) => {
2291 let create_sql = table
2292 .create_sql
2293 .clone()
2294 .expect("builtin tables cannot be serialized");
2295 let mut collections = table.collections.clone();
2296 let global_id = collections
2297 .remove(&RelationVersion::root())
2298 .expect("at least one version");
2299 (create_sql, global_id, collections)
2300 }
2301 CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2302 CatalogItem::Source(source) => {
2303 assert!(
2304 !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2305 "cannot serialize introspection/builtin sources",
2306 );
2307 let create_sql = source
2308 .create_sql
2309 .clone()
2310 .expect("builtin sources cannot be serialized");
2311 (create_sql, source.global_id, BTreeMap::new())
2312 }
2313 CatalogItem::View(view) => (view.create_sql.clone(), view.global_id, BTreeMap::new()),
2314 CatalogItem::MaterializedView(mview) => {
2315 let mut collections = mview.collections.clone();
2316 let global_id = collections
2317 .remove(&RelationVersion::root())
2318 .expect("at least one version");
2319 (mview.create_sql.clone(), global_id, collections)
2320 }
2321 CatalogItem::Index(index) => {
2322 (index.create_sql.clone(), index.global_id, BTreeMap::new())
2323 }
2324 CatalogItem::Sink(sink) => (sink.create_sql.clone(), sink.global_id, BTreeMap::new()),
2325 CatalogItem::Type(typ) => {
2326 let create_sql = typ
2327 .create_sql
2328 .clone()
2329 .expect("builtin types cannot be serialized");
2330 (create_sql, typ.global_id, BTreeMap::new())
2331 }
2332 CatalogItem::Secret(secret) => {
2333 (secret.create_sql.clone(), secret.global_id, BTreeMap::new())
2334 }
2335 CatalogItem::Connection(connection) => (
2336 connection.create_sql.clone(),
2337 connection.global_id,
2338 BTreeMap::new(),
2339 ),
2340 CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2341 CatalogItem::ContinualTask(ct) => {
2342 (ct.create_sql.clone(), ct.global_id, BTreeMap::new())
2343 }
2344 }
2345 }
2346
2347 pub fn into_serialized(self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2348 match self {
2349 CatalogItem::Table(mut table) => {
2350 let create_sql = table
2351 .create_sql
2352 .expect("builtin tables cannot be serialized");
2353 let global_id = table
2354 .collections
2355 .remove(&RelationVersion::root())
2356 .expect("at least one version");
2357 (create_sql, global_id, table.collections)
2358 }
2359 CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2360 CatalogItem::Source(source) => {
2361 assert!(
2362 !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2363 "cannot serialize introspection/builtin sources",
2364 );
2365 let create_sql = source
2366 .create_sql
2367 .expect("builtin sources cannot be serialized");
2368 (create_sql, source.global_id, BTreeMap::new())
2369 }
2370 CatalogItem::View(view) => (view.create_sql, view.global_id, BTreeMap::new()),
2371 CatalogItem::MaterializedView(mut mview) => {
2372 let global_id = mview
2373 .collections
2374 .remove(&RelationVersion::root())
2375 .expect("at least one version");
2376 (mview.create_sql, global_id, mview.collections)
2377 }
2378 CatalogItem::Index(index) => (index.create_sql, index.global_id, BTreeMap::new()),
2379 CatalogItem::Sink(sink) => (sink.create_sql, sink.global_id, BTreeMap::new()),
2380 CatalogItem::Type(typ) => {
2381 let create_sql = typ.create_sql.expect("builtin types cannot be serialized");
2382 (create_sql, typ.global_id, BTreeMap::new())
2383 }
2384 CatalogItem::Secret(secret) => (secret.create_sql, secret.global_id, BTreeMap::new()),
2385 CatalogItem::Connection(connection) => {
2386 (connection.create_sql, connection.global_id, BTreeMap::new())
2387 }
2388 CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2389 CatalogItem::ContinualTask(ct) => (ct.create_sql, ct.global_id, BTreeMap::new()),
2390 }
2391 }
2392
2393 pub fn global_id_for_version(&self, version: RelationVersionSelector) -> Option<GlobalId> {
2396 let collections = match self {
2397 CatalogItem::MaterializedView(mv) => &mv.collections,
2398 CatalogItem::Table(table) => &table.collections,
2399 CatalogItem::Source(source) => return Some(source.global_id),
2400 CatalogItem::Log(log) => return Some(log.global_id),
2401 CatalogItem::View(view) => return Some(view.global_id),
2402 CatalogItem::Sink(sink) => return Some(sink.global_id),
2403 CatalogItem::Index(index) => return Some(index.global_id),
2404 CatalogItem::Type(ty) => return Some(ty.global_id),
2405 CatalogItem::Func(func) => return Some(func.global_id),
2406 CatalogItem::Secret(secret) => return Some(secret.global_id),
2407 CatalogItem::Connection(conn) => return Some(conn.global_id),
2408 CatalogItem::ContinualTask(ct) => return Some(ct.global_id),
2409 };
2410 match version {
2411 RelationVersionSelector::Latest => collections.values().last().copied(),
2412 RelationVersionSelector::Specific(version) => collections.get(&version).copied(),
2413 }
2414 }
2415}
2416
2417impl CatalogEntry {
2418 pub fn desc_latest(
2423 &self,
2424 name: &FullItemName,
2425 ) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
2426 self.item.desc(name, RelationVersionSelector::Latest)
2427 }
2428
2429 pub fn desc_opt_latest(&self) -> Option<Cow<'_, RelationDesc>> {
2432 self.item.desc_opt(RelationVersionSelector::Latest)
2433 }
2434
2435 pub fn has_columns(&self) -> bool {
2437 self.desc_opt_latest().is_some()
2438 }
2439
2440 pub fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
2442 self.item.func(self)
2443 }
2444
2445 pub fn index(&self) -> Option<&Index> {
2447 match self.item() {
2448 CatalogItem::Index(idx) => Some(idx),
2449 _ => None,
2450 }
2451 }
2452
2453 pub fn materialized_view(&self) -> Option<&MaterializedView> {
2455 match self.item() {
2456 CatalogItem::MaterializedView(mv) => Some(mv),
2457 _ => None,
2458 }
2459 }
2460
2461 pub fn table(&self) -> Option<&Table> {
2463 match self.item() {
2464 CatalogItem::Table(tbl) => Some(tbl),
2465 _ => None,
2466 }
2467 }
2468
2469 pub fn source(&self) -> Option<&Source> {
2471 match self.item() {
2472 CatalogItem::Source(src) => Some(src),
2473 _ => None,
2474 }
2475 }
2476
2477 pub fn sink(&self) -> Option<&Sink> {
2479 match self.item() {
2480 CatalogItem::Sink(sink) => Some(sink),
2481 _ => None,
2482 }
2483 }
2484
2485 pub fn secret(&self) -> Option<&Secret> {
2487 match self.item() {
2488 CatalogItem::Secret(secret) => Some(secret),
2489 _ => None,
2490 }
2491 }
2492
2493 pub fn connection(&self) -> Result<&Connection, SqlCatalogError> {
2494 match self.item() {
2495 CatalogItem::Connection(connection) => Ok(connection),
2496 _ => {
2497 let db_name = match self.name().qualifiers.database_spec {
2498 ResolvedDatabaseSpecifier::Ambient => "".to_string(),
2499 ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
2500 };
2501 Err(SqlCatalogError::UnknownConnection(format!(
2502 "{}{}.{}",
2503 db_name,
2504 self.name().qualifiers.schema_spec,
2505 self.name().item
2506 )))
2507 }
2508 }
2509 }
2510
2511 pub fn source_desc(
2514 &self,
2515 ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
2516 self.item.source_desc(self)
2517 }
2518
2519 pub fn is_connection(&self) -> bool {
2521 matches!(self.item(), CatalogItem::Connection(_))
2522 }
2523
2524 pub fn is_table(&self) -> bool {
2526 matches!(self.item(), CatalogItem::Table(_))
2527 }
2528
2529 pub fn is_source(&self) -> bool {
2532 matches!(self.item(), CatalogItem::Source(_))
2533 }
2534
2535 pub fn subsource_details(
2538 &self,
2539 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
2540 match &self.item() {
2541 CatalogItem::Source(source) => match &source.data_source {
2542 DataSourceDesc::IngestionExport {
2543 ingestion_id,
2544 external_reference,
2545 details,
2546 data_config: _,
2547 } => Some((*ingestion_id, external_reference, details)),
2548 _ => None,
2549 },
2550 _ => None,
2551 }
2552 }
2553
2554 pub fn source_export_details(
2557 &self,
2558 ) -> Option<(
2559 CatalogItemId,
2560 &UnresolvedItemName,
2561 &SourceExportDetails,
2562 &SourceExportDataConfig<ReferencedConnection>,
2563 )> {
2564 match &self.item() {
2565 CatalogItem::Source(source) => match &source.data_source {
2566 DataSourceDesc::IngestionExport {
2567 ingestion_id,
2568 external_reference,
2569 details,
2570 data_config,
2571 } => Some((*ingestion_id, external_reference, details, data_config)),
2572 _ => None,
2573 },
2574 CatalogItem::Table(table) => match &table.data_source {
2575 TableDataSource::DataSource {
2576 desc:
2577 DataSourceDesc::IngestionExport {
2578 ingestion_id,
2579 external_reference,
2580 details,
2581 data_config,
2582 },
2583 timeline: _,
2584 } => Some((*ingestion_id, external_reference, details, data_config)),
2585 _ => None,
2586 },
2587 _ => None,
2588 }
2589 }
2590
2591 pub fn is_progress_source(&self) -> bool {
2593 self.item().is_progress_source()
2594 }
2595
2596 pub fn progress_id(&self) -> Option<CatalogItemId> {
2598 match &self.item() {
2599 CatalogItem::Source(source) => match &source.data_source {
2600 DataSourceDesc::Ingestion { .. } => Some(self.id),
2601 DataSourceDesc::OldSyntaxIngestion {
2602 progress_subsource, ..
2603 } => Some(*progress_subsource),
2604 DataSourceDesc::IngestionExport { .. }
2605 | DataSourceDesc::Introspection(_)
2606 | DataSourceDesc::Progress
2607 | DataSourceDesc::Webhook { .. } => None,
2608 },
2609 CatalogItem::Table(_)
2610 | CatalogItem::Log(_)
2611 | CatalogItem::View(_)
2612 | CatalogItem::MaterializedView(_)
2613 | CatalogItem::Sink(_)
2614 | CatalogItem::Index(_)
2615 | CatalogItem::Type(_)
2616 | CatalogItem::Func(_)
2617 | CatalogItem::Secret(_)
2618 | CatalogItem::Connection(_)
2619 | CatalogItem::ContinualTask(_) => None,
2620 }
2621 }
2622
2623 pub fn is_sink(&self) -> bool {
2625 matches!(self.item(), CatalogItem::Sink(_))
2626 }
2627
2628 pub fn is_materialized_view(&self) -> bool {
2630 matches!(self.item(), CatalogItem::MaterializedView(_))
2631 }
2632
2633 pub fn is_view(&self) -> bool {
2635 matches!(self.item(), CatalogItem::View(_))
2636 }
2637
2638 pub fn is_secret(&self) -> bool {
2640 matches!(self.item(), CatalogItem::Secret(_))
2641 }
2642
2643 pub fn is_introspection_source(&self) -> bool {
2645 matches!(self.item(), CatalogItem::Log(_))
2646 }
2647
2648 pub fn is_index(&self) -> bool {
2650 matches!(self.item(), CatalogItem::Index(_))
2651 }
2652
2653 pub fn is_continual_task(&self) -> bool {
2655 matches!(self.item(), CatalogItem::ContinualTask(_))
2656 }
2657
2658 pub fn is_relation(&self) -> bool {
2660 mz_sql::catalog::ObjectType::from(self.item_type()).is_relation()
2661 }
2662
2663 pub fn references(&self) -> &ResolvedIds {
2666 self.item.references()
2667 }
2668
2669 pub fn uses(&self) -> BTreeSet<CatalogItemId> {
2675 self.item.uses()
2676 }
2677
2678 pub fn item(&self) -> &CatalogItem {
2680 &self.item
2681 }
2682
2683 pub fn id(&self) -> CatalogItemId {
2685 self.id
2686 }
2687
2688 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2690 self.item().global_ids()
2691 }
2692
2693 pub fn latest_global_id(&self) -> GlobalId {
2694 self.item().latest_global_id()
2695 }
2696
2697 pub fn oid(&self) -> u32 {
2699 self.oid
2700 }
2701
2702 pub fn name(&self) -> &QualifiedItemName {
2704 &self.name
2705 }
2706
2707 pub fn referenced_by(&self) -> &[CatalogItemId] {
2709 &self.referenced_by
2710 }
2711
2712 pub fn used_by(&self) -> &[CatalogItemId] {
2714 &self.used_by
2715 }
2716
2717 pub fn conn_id(&self) -> Option<&ConnectionId> {
2720 self.item.conn_id()
2721 }
2722
2723 pub fn owner_id(&self) -> &RoleId {
2725 &self.owner_id
2726 }
2727
2728 pub fn privileges(&self) -> &PrivilegeMap {
2730 &self.privileges
2731 }
2732}
2733
2734#[derive(Debug, Clone, Default)]
2735pub struct CommentsMap {
2736 map: BTreeMap<CommentObjectId, BTreeMap<Option<usize>, String>>,
2737}
2738
2739impl CommentsMap {
2740 pub fn update_comment(
2741 &mut self,
2742 object_id: CommentObjectId,
2743 sub_component: Option<usize>,
2744 comment: Option<String>,
2745 ) -> Option<String> {
2746 let object_comments = self.map.entry(object_id).or_default();
2747
2748 let (empty, prev) = if let Some(comment) = comment {
2750 let prev = object_comments.insert(sub_component, comment);
2751 (false, prev)
2752 } else {
2753 let prev = object_comments.remove(&sub_component);
2754 (object_comments.is_empty(), prev)
2755 };
2756
2757 if empty {
2759 self.map.remove(&object_id);
2760 }
2761
2762 prev
2764 }
2765
2766 pub fn drop_comments(
2772 &mut self,
2773 object_ids: &BTreeSet<CommentObjectId>,
2774 ) -> Vec<(CommentObjectId, Option<usize>, String)> {
2775 let mut removed_comments = Vec::new();
2776
2777 for object_id in object_ids {
2778 if let Some(comments) = self.map.remove(object_id) {
2779 let removed = comments
2780 .into_iter()
2781 .map(|(sub_comp, comment)| (object_id.clone(), sub_comp, comment));
2782 removed_comments.extend(removed);
2783 }
2784 }
2785
2786 removed_comments
2787 }
2788
2789 pub fn iter(&self) -> impl Iterator<Item = (CommentObjectId, Option<usize>, &str)> {
2790 self.map
2791 .iter()
2792 .map(|(id, comments)| {
2793 comments
2794 .iter()
2795 .map(|(pos, comment)| (*id, *pos, comment.as_str()))
2796 })
2797 .flatten()
2798 }
2799
2800 pub fn get_object_comments(
2801 &self,
2802 object_id: CommentObjectId,
2803 ) -> Option<&BTreeMap<Option<usize>, String>> {
2804 self.map.get(&object_id)
2805 }
2806}
2807
2808impl Serialize for CommentsMap {
2809 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2810 where
2811 S: serde::Serializer,
2812 {
2813 let comment_count = self
2814 .map
2815 .iter()
2816 .map(|(_object_id, comments)| comments.len())
2817 .sum();
2818
2819 let mut seq = serializer.serialize_seq(Some(comment_count))?;
2820 for (object_id, sub) in &self.map {
2821 for (sub_component, comment) in sub {
2822 seq.serialize_element(&(
2823 format!("{object_id:?}"),
2824 format!("{sub_component:?}"),
2825 comment,
2826 ))?;
2827 }
2828 }
2829 seq.end()
2830 }
2831}
2832
2833#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
2834pub struct DefaultPrivileges {
2835 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
2836 privileges: BTreeMap<DefaultPrivilegeObject, RoleDefaultPrivileges>,
2837}
2838
2839#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
2842struct RoleDefaultPrivileges(
2843 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
2845 BTreeMap<RoleId, DefaultPrivilegeAclItem>,
2846);
2847
2848impl Deref for RoleDefaultPrivileges {
2849 type Target = BTreeMap<RoleId, DefaultPrivilegeAclItem>;
2850
2851 fn deref(&self) -> &Self::Target {
2852 &self.0
2853 }
2854}
2855
2856impl DerefMut for RoleDefaultPrivileges {
2857 fn deref_mut(&mut self) -> &mut Self::Target {
2858 &mut self.0
2859 }
2860}
2861
2862impl DefaultPrivileges {
2863 pub fn grant(&mut self, object: DefaultPrivilegeObject, privilege: DefaultPrivilegeAclItem) {
2865 if privilege.acl_mode.is_empty() {
2866 return;
2867 }
2868
2869 let privileges = self.privileges.entry(object).or_default();
2870 if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
2871 default_privilege.acl_mode |= privilege.acl_mode;
2872 } else {
2873 privileges.insert(privilege.grantee, privilege);
2874 }
2875 }
2876
2877 pub fn revoke(&mut self, object: &DefaultPrivilegeObject, privilege: &DefaultPrivilegeAclItem) {
2879 if let Some(privileges) = self.privileges.get_mut(object) {
2880 if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
2881 default_privilege.acl_mode =
2882 default_privilege.acl_mode.difference(privilege.acl_mode);
2883 if default_privilege.acl_mode.is_empty() {
2884 privileges.remove(&privilege.grantee);
2885 }
2886 }
2887 if privileges.is_empty() {
2888 self.privileges.remove(object);
2889 }
2890 }
2891 }
2892
2893 pub fn get_privileges_for_grantee(
2896 &self,
2897 object: &DefaultPrivilegeObject,
2898 grantee: &RoleId,
2899 ) -> Option<&AclMode> {
2900 self.privileges
2901 .get(object)
2902 .and_then(|privileges| privileges.get(grantee))
2903 .map(|privilege| &privilege.acl_mode)
2904 }
2905
2906 pub fn get_applicable_privileges(
2908 &self,
2909 role_id: RoleId,
2910 database_id: Option<DatabaseId>,
2911 schema_id: Option<SchemaId>,
2912 object_type: mz_sql::catalog::ObjectType,
2913 ) -> impl Iterator<Item = DefaultPrivilegeAclItem> + '_ {
2914 let privilege_object_type = if object_type.is_relation() {
2918 mz_sql::catalog::ObjectType::Table
2919 } else {
2920 object_type
2921 };
2922 let valid_acl_mode = rbac::all_object_privileges(SystemObjectType::Object(object_type));
2923
2924 [
2928 DefaultPrivilegeObject {
2929 role_id,
2930 database_id,
2931 schema_id,
2932 object_type: privilege_object_type,
2933 },
2934 DefaultPrivilegeObject {
2935 role_id,
2936 database_id,
2937 schema_id: None,
2938 object_type: privilege_object_type,
2939 },
2940 DefaultPrivilegeObject {
2941 role_id,
2942 database_id: None,
2943 schema_id: None,
2944 object_type: privilege_object_type,
2945 },
2946 DefaultPrivilegeObject {
2947 role_id: RoleId::Public,
2948 database_id,
2949 schema_id,
2950 object_type: privilege_object_type,
2951 },
2952 DefaultPrivilegeObject {
2953 role_id: RoleId::Public,
2954 database_id,
2955 schema_id: None,
2956 object_type: privilege_object_type,
2957 },
2958 DefaultPrivilegeObject {
2959 role_id: RoleId::Public,
2960 database_id: None,
2961 schema_id: None,
2962 object_type: privilege_object_type,
2963 },
2964 ]
2965 .into_iter()
2966 .filter_map(|object| self.privileges.get(&object))
2967 .flat_map(|acl_map| acl_map.values())
2968 .fold(
2970 BTreeMap::new(),
2971 |mut accum, DefaultPrivilegeAclItem { grantee, acl_mode }| {
2972 let accum_acl_mode = accum.entry(grantee).or_insert_with(AclMode::empty);
2973 *accum_acl_mode |= *acl_mode;
2974 accum
2975 },
2976 )
2977 .into_iter()
2978 .map(move |(grantee, acl_mode)| (grantee, acl_mode & valid_acl_mode))
2983 .filter(|(_, acl_mode)| !acl_mode.is_empty())
2985 .map(|(grantee, acl_mode)| DefaultPrivilegeAclItem {
2986 grantee: *grantee,
2987 acl_mode,
2988 })
2989 }
2990
2991 pub fn iter(
2992 &self,
2993 ) -> impl Iterator<
2994 Item = (
2995 &DefaultPrivilegeObject,
2996 impl Iterator<Item = &DefaultPrivilegeAclItem>,
2997 ),
2998 > {
2999 self.privileges
3000 .iter()
3001 .map(|(object, acl_map)| (object, acl_map.values()))
3002 }
3003}
3004
3005#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3006pub struct ClusterConfig {
3007 pub variant: ClusterVariant,
3008 pub workload_class: Option<String>,
3009}
3010
3011impl ClusterConfig {
3012 pub fn features(&self) -> Option<&OptimizerFeatureOverrides> {
3013 match &self.variant {
3014 ClusterVariant::Managed(managed) => Some(&managed.optimizer_feature_overrides),
3015 ClusterVariant::Unmanaged => None,
3016 }
3017 }
3018}
3019
3020impl From<ClusterConfig> for durable::ClusterConfig {
3021 fn from(config: ClusterConfig) -> Self {
3022 Self {
3023 variant: config.variant.into(),
3024 workload_class: config.workload_class,
3025 }
3026 }
3027}
3028
3029impl From<durable::ClusterConfig> for ClusterConfig {
3030 fn from(config: durable::ClusterConfig) -> Self {
3031 Self {
3032 variant: config.variant.into(),
3033 workload_class: config.workload_class,
3034 }
3035 }
3036}
3037
3038#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3039pub struct ClusterVariantManaged {
3040 pub size: String,
3041 pub availability_zones: Vec<String>,
3042 pub logging: ReplicaLogging,
3043 pub replication_factor: u32,
3044 pub optimizer_feature_overrides: OptimizerFeatureOverrides,
3045 pub schedule: ClusterSchedule,
3046}
3047
3048impl From<ClusterVariantManaged> for durable::ClusterVariantManaged {
3049 fn from(managed: ClusterVariantManaged) -> Self {
3050 Self {
3051 size: managed.size,
3052 availability_zones: managed.availability_zones,
3053 logging: managed.logging,
3054 replication_factor: managed.replication_factor,
3055 optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3056 schedule: managed.schedule,
3057 }
3058 }
3059}
3060
3061impl From<durable::ClusterVariantManaged> for ClusterVariantManaged {
3062 fn from(managed: durable::ClusterVariantManaged) -> Self {
3063 Self {
3064 size: managed.size,
3065 availability_zones: managed.availability_zones,
3066 logging: managed.logging,
3067 replication_factor: managed.replication_factor,
3068 optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3069 schedule: managed.schedule,
3070 }
3071 }
3072}
3073
3074#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3075pub enum ClusterVariant {
3076 Managed(ClusterVariantManaged),
3077 Unmanaged,
3078}
3079
3080impl From<ClusterVariant> for durable::ClusterVariant {
3081 fn from(variant: ClusterVariant) -> Self {
3082 match variant {
3083 ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3084 ClusterVariant::Unmanaged => Self::Unmanaged,
3085 }
3086 }
3087}
3088
3089impl From<durable::ClusterVariant> for ClusterVariant {
3090 fn from(variant: durable::ClusterVariant) -> Self {
3091 match variant {
3092 durable::ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3093 durable::ClusterVariant::Unmanaged => Self::Unmanaged,
3094 }
3095 }
3096}
3097
3098impl mz_sql::catalog::CatalogDatabase for Database {
3099 fn name(&self) -> &str {
3100 &self.name
3101 }
3102
3103 fn id(&self) -> DatabaseId {
3104 self.id
3105 }
3106
3107 fn has_schemas(&self) -> bool {
3108 !self.schemas_by_name.is_empty()
3109 }
3110
3111 fn schema_ids(&self) -> &BTreeMap<String, SchemaId> {
3112 &self.schemas_by_name
3113 }
3114
3115 #[allow(clippy::as_conversions)]
3117 fn schemas(&self) -> Vec<&dyn CatalogSchema> {
3118 self.schemas_by_id
3119 .values()
3120 .map(|schema| schema as &dyn CatalogSchema)
3121 .collect()
3122 }
3123
3124 fn owner_id(&self) -> RoleId {
3125 self.owner_id
3126 }
3127
3128 fn privileges(&self) -> &PrivilegeMap {
3129 &self.privileges
3130 }
3131}
3132
3133impl mz_sql::catalog::CatalogSchema for Schema {
3134 fn database(&self) -> &ResolvedDatabaseSpecifier {
3135 &self.name.database
3136 }
3137
3138 fn name(&self) -> &QualifiedSchemaName {
3139 &self.name
3140 }
3141
3142 fn id(&self) -> &SchemaSpecifier {
3143 &self.id
3144 }
3145
3146 fn has_items(&self) -> bool {
3147 !self.items.is_empty()
3148 }
3149
3150 fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_> {
3151 Box::new(
3152 self.items
3153 .values()
3154 .chain(self.functions.values())
3155 .chain(self.types.values())
3156 .copied(),
3157 )
3158 }
3159
3160 fn owner_id(&self) -> RoleId {
3161 self.owner_id
3162 }
3163
3164 fn privileges(&self) -> &PrivilegeMap {
3165 &self.privileges
3166 }
3167}
3168
3169impl mz_sql::catalog::CatalogRole for Role {
3170 fn name(&self) -> &str {
3171 &self.name
3172 }
3173
3174 fn id(&self) -> RoleId {
3175 self.id
3176 }
3177
3178 fn membership(&self) -> &BTreeMap<RoleId, RoleId> {
3179 &self.membership.map
3180 }
3181
3182 fn attributes(&self) -> &RoleAttributes {
3183 &self.attributes
3184 }
3185
3186 fn vars(&self) -> &BTreeMap<String, OwnedVarInput> {
3187 &self.vars.map
3188 }
3189}
3190
3191impl mz_sql::catalog::CatalogNetworkPolicy for NetworkPolicy {
3192 fn name(&self) -> &str {
3193 &self.name
3194 }
3195
3196 fn id(&self) -> NetworkPolicyId {
3197 self.id
3198 }
3199
3200 fn owner_id(&self) -> RoleId {
3201 self.owner_id
3202 }
3203
3204 fn privileges(&self) -> &PrivilegeMap {
3205 &self.privileges
3206 }
3207}
3208
3209impl mz_sql::catalog::CatalogCluster<'_> for Cluster {
3210 fn name(&self) -> &str {
3211 &self.name
3212 }
3213
3214 fn id(&self) -> ClusterId {
3215 self.id
3216 }
3217
3218 fn bound_objects(&self) -> &BTreeSet<CatalogItemId> {
3219 &self.bound_objects
3220 }
3221
3222 fn replica_ids(&self) -> &BTreeMap<String, ReplicaId> {
3223 &self.replica_id_by_name_
3224 }
3225
3226 #[allow(clippy::as_conversions)]
3228 fn replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>> {
3229 self.replicas()
3230 .map(|replica| replica as &dyn CatalogClusterReplica)
3231 .collect()
3232 }
3233
3234 fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica<'_> {
3235 self.replica(id).expect("catalog out of sync")
3236 }
3237
3238 fn owner_id(&self) -> RoleId {
3239 self.owner_id
3240 }
3241
3242 fn privileges(&self) -> &PrivilegeMap {
3243 &self.privileges
3244 }
3245
3246 fn is_managed(&self) -> bool {
3247 self.is_managed()
3248 }
3249
3250 fn managed_size(&self) -> Option<&str> {
3251 match &self.config.variant {
3252 ClusterVariant::Managed(ClusterVariantManaged { size, .. }) => Some(size),
3253 _ => None,
3254 }
3255 }
3256
3257 fn schedule(&self) -> Option<&ClusterSchedule> {
3258 match &self.config.variant {
3259 ClusterVariant::Managed(ClusterVariantManaged { schedule, .. }) => Some(schedule),
3260 _ => None,
3261 }
3262 }
3263
3264 fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
3265 self.try_to_plan()
3266 }
3267}
3268
3269impl mz_sql::catalog::CatalogClusterReplica<'_> for ClusterReplica {
3270 fn name(&self) -> &str {
3271 &self.name
3272 }
3273
3274 fn cluster_id(&self) -> ClusterId {
3275 self.cluster_id
3276 }
3277
3278 fn replica_id(&self) -> ReplicaId {
3279 self.replica_id
3280 }
3281
3282 fn owner_id(&self) -> RoleId {
3283 self.owner_id
3284 }
3285
3286 fn internal(&self) -> bool {
3287 self.config.location.internal()
3288 }
3289}
3290
3291impl mz_sql::catalog::CatalogItem for CatalogEntry {
3292 fn name(&self) -> &QualifiedItemName {
3293 self.name()
3294 }
3295
3296 fn id(&self) -> CatalogItemId {
3297 self.id()
3298 }
3299
3300 fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
3301 Box::new(self.global_ids())
3302 }
3303
3304 fn oid(&self) -> u32 {
3305 self.oid()
3306 }
3307
3308 fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
3309 self.func()
3310 }
3311
3312 fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
3313 self.source_desc()
3314 }
3315
3316 fn connection(
3317 &self,
3318 ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
3319 {
3320 Ok(self.connection()?.details.to_connection())
3321 }
3322
3323 fn create_sql(&self) -> &str {
3324 match self.item() {
3325 CatalogItem::Table(Table { create_sql, .. }) => {
3326 create_sql.as_deref().unwrap_or("<builtin>")
3327 }
3328 CatalogItem::Source(Source { create_sql, .. }) => {
3329 create_sql.as_deref().unwrap_or("<builtin>")
3330 }
3331 CatalogItem::Sink(Sink { create_sql, .. }) => create_sql,
3332 CatalogItem::View(View { create_sql, .. }) => create_sql,
3333 CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) => create_sql,
3334 CatalogItem::Index(Index { create_sql, .. }) => create_sql,
3335 CatalogItem::Type(Type { create_sql, .. }) => {
3336 create_sql.as_deref().unwrap_or("<builtin>")
3337 }
3338 CatalogItem::Secret(Secret { create_sql, .. }) => create_sql,
3339 CatalogItem::Connection(Connection { create_sql, .. }) => create_sql,
3340 CatalogItem::Func(_) => "<builtin>",
3341 CatalogItem::Log(_) => "<builtin>",
3342 CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => create_sql,
3343 }
3344 }
3345
3346 fn item_type(&self) -> SqlCatalogItemType {
3347 self.item().typ()
3348 }
3349
3350 fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
3351 if let CatalogItem::Index(Index { keys, on, .. }) = self.item() {
3352 Some((keys, *on))
3353 } else {
3354 None
3355 }
3356 }
3357
3358 fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
3359 if let CatalogItem::Table(Table {
3360 data_source: TableDataSource::TableWrites { defaults },
3361 ..
3362 }) = self.item()
3363 {
3364 Some(defaults.as_slice())
3365 } else {
3366 None
3367 }
3368 }
3369
3370 fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
3371 if let CatalogItem::Type(Type { details, .. }) = self.item() {
3372 Some(details)
3373 } else {
3374 None
3375 }
3376 }
3377
3378 fn references(&self) -> &ResolvedIds {
3379 self.references()
3380 }
3381
3382 fn uses(&self) -> BTreeSet<CatalogItemId> {
3383 self.uses()
3384 }
3385
3386 fn referenced_by(&self) -> &[CatalogItemId] {
3387 self.referenced_by()
3388 }
3389
3390 fn used_by(&self) -> &[CatalogItemId] {
3391 self.used_by()
3392 }
3393
3394 fn subsource_details(
3395 &self,
3396 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
3397 self.subsource_details()
3398 }
3399
3400 fn source_export_details(
3401 &self,
3402 ) -> Option<(
3403 CatalogItemId,
3404 &UnresolvedItemName,
3405 &SourceExportDetails,
3406 &SourceExportDataConfig<ReferencedConnection>,
3407 )> {
3408 self.source_export_details()
3409 }
3410
3411 fn is_progress_source(&self) -> bool {
3412 self.is_progress_source()
3413 }
3414
3415 fn progress_id(&self) -> Option<CatalogItemId> {
3416 self.progress_id()
3417 }
3418
3419 fn owner_id(&self) -> RoleId {
3420 self.owner_id
3421 }
3422
3423 fn privileges(&self) -> &PrivilegeMap {
3424 &self.privileges
3425 }
3426
3427 fn cluster_id(&self) -> Option<ClusterId> {
3428 self.item().cluster_id()
3429 }
3430
3431 fn at_version(
3432 &self,
3433 version: RelationVersionSelector,
3434 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
3435 Box::new(CatalogCollectionEntry {
3436 entry: self.clone(),
3437 version,
3438 })
3439 }
3440
3441 fn latest_version(&self) -> Option<RelationVersion> {
3442 self.table().map(|t| t.desc.latest_version())
3443 }
3444}
3445
3446#[derive(Debug)]
3448pub struct StateUpdate {
3449 pub kind: StateUpdateKind,
3450 pub ts: Timestamp,
3451 pub diff: StateDiff,
3452}
3453
3454#[derive(Debug, Clone)]
3458pub enum StateUpdateKind {
3459 Role(durable::objects::Role),
3460 RoleAuth(durable::objects::RoleAuth),
3461 Database(durable::objects::Database),
3462 Schema(durable::objects::Schema),
3463 DefaultPrivilege(durable::objects::DefaultPrivilege),
3464 SystemPrivilege(MzAclItem),
3465 SystemConfiguration(durable::objects::SystemConfiguration),
3466 Cluster(durable::objects::Cluster),
3467 NetworkPolicy(durable::objects::NetworkPolicy),
3468 IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3469 ClusterReplica(durable::objects::ClusterReplica),
3470 SourceReferences(durable::objects::SourceReferences),
3471 SystemObjectMapping(durable::objects::SystemObjectMapping),
3472 TemporaryItem(TemporaryItem),
3475 Item(durable::objects::Item),
3476 Comment(durable::objects::Comment),
3477 AuditLog(durable::objects::AuditLog),
3478 StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3480 UnfinalizedShard(durable::objects::UnfinalizedShard),
3481}
3482
3483#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
3485pub enum StateDiff {
3486 Retraction,
3487 Addition,
3488}
3489
3490impl From<StateDiff> for Diff {
3491 fn from(diff: StateDiff) -> Self {
3492 match diff {
3493 StateDiff::Retraction => Diff::MINUS_ONE,
3494 StateDiff::Addition => Diff::ONE,
3495 }
3496 }
3497}
3498impl TryFrom<Diff> for StateDiff {
3499 type Error = String;
3500
3501 fn try_from(diff: Diff) -> Result<Self, Self::Error> {
3502 match diff {
3503 Diff::MINUS_ONE => Ok(Self::Retraction),
3504 Diff::ONE => Ok(Self::Addition),
3505 diff => Err(format!("invalid diff {diff}")),
3506 }
3507 }
3508}
3509
3510#[derive(Debug, Clone)]
3512pub struct TemporaryItem {
3513 pub id: CatalogItemId,
3514 pub oid: u32,
3515 pub name: QualifiedItemName,
3516 pub item: CatalogItem,
3517 pub owner_id: RoleId,
3518 pub privileges: PrivilegeMap,
3519}
3520
3521impl From<CatalogEntry> for TemporaryItem {
3522 fn from(entry: CatalogEntry) -> Self {
3523 TemporaryItem {
3524 id: entry.id,
3525 oid: entry.oid,
3526 name: entry.name,
3527 item: entry.item,
3528 owner_id: entry.owner_id,
3529 privileges: entry.privileges,
3530 }
3531 }
3532}
3533
3534#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
3536pub enum BootstrapStateUpdateKind {
3537 Role(durable::objects::Role),
3538 RoleAuth(durable::objects::RoleAuth),
3539 Database(durable::objects::Database),
3540 Schema(durable::objects::Schema),
3541 DefaultPrivilege(durable::objects::DefaultPrivilege),
3542 SystemPrivilege(MzAclItem),
3543 SystemConfiguration(durable::objects::SystemConfiguration),
3544 Cluster(durable::objects::Cluster),
3545 NetworkPolicy(durable::objects::NetworkPolicy),
3546 IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3547 ClusterReplica(durable::objects::ClusterReplica),
3548 SourceReferences(durable::objects::SourceReferences),
3549 SystemObjectMapping(durable::objects::SystemObjectMapping),
3550 Item(durable::objects::Item),
3551 Comment(durable::objects::Comment),
3552 AuditLog(durable::objects::AuditLog),
3553 StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3555 UnfinalizedShard(durable::objects::UnfinalizedShard),
3556}
3557
3558impl From<BootstrapStateUpdateKind> for StateUpdateKind {
3559 fn from(value: BootstrapStateUpdateKind) -> Self {
3560 match value {
3561 BootstrapStateUpdateKind::Role(kind) => StateUpdateKind::Role(kind),
3562 BootstrapStateUpdateKind::RoleAuth(kind) => StateUpdateKind::RoleAuth(kind),
3563 BootstrapStateUpdateKind::Database(kind) => StateUpdateKind::Database(kind),
3564 BootstrapStateUpdateKind::Schema(kind) => StateUpdateKind::Schema(kind),
3565 BootstrapStateUpdateKind::DefaultPrivilege(kind) => {
3566 StateUpdateKind::DefaultPrivilege(kind)
3567 }
3568 BootstrapStateUpdateKind::SystemPrivilege(kind) => {
3569 StateUpdateKind::SystemPrivilege(kind)
3570 }
3571 BootstrapStateUpdateKind::SystemConfiguration(kind) => {
3572 StateUpdateKind::SystemConfiguration(kind)
3573 }
3574 BootstrapStateUpdateKind::SourceReferences(kind) => {
3575 StateUpdateKind::SourceReferences(kind)
3576 }
3577 BootstrapStateUpdateKind::Cluster(kind) => StateUpdateKind::Cluster(kind),
3578 BootstrapStateUpdateKind::NetworkPolicy(kind) => StateUpdateKind::NetworkPolicy(kind),
3579 BootstrapStateUpdateKind::IntrospectionSourceIndex(kind) => {
3580 StateUpdateKind::IntrospectionSourceIndex(kind)
3581 }
3582 BootstrapStateUpdateKind::ClusterReplica(kind) => StateUpdateKind::ClusterReplica(kind),
3583 BootstrapStateUpdateKind::SystemObjectMapping(kind) => {
3584 StateUpdateKind::SystemObjectMapping(kind)
3585 }
3586 BootstrapStateUpdateKind::Item(kind) => StateUpdateKind::Item(kind),
3587 BootstrapStateUpdateKind::Comment(kind) => StateUpdateKind::Comment(kind),
3588 BootstrapStateUpdateKind::AuditLog(kind) => StateUpdateKind::AuditLog(kind),
3589 BootstrapStateUpdateKind::StorageCollectionMetadata(kind) => {
3590 StateUpdateKind::StorageCollectionMetadata(kind)
3591 }
3592 BootstrapStateUpdateKind::UnfinalizedShard(kind) => {
3593 StateUpdateKind::UnfinalizedShard(kind)
3594 }
3595 }
3596 }
3597}
3598
3599impl TryFrom<StateUpdateKind> for BootstrapStateUpdateKind {
3600 type Error = TemporaryItem;
3601
3602 fn try_from(value: StateUpdateKind) -> Result<Self, Self::Error> {
3603 match value {
3604 StateUpdateKind::Role(kind) => Ok(BootstrapStateUpdateKind::Role(kind)),
3605 StateUpdateKind::RoleAuth(kind) => Ok(BootstrapStateUpdateKind::RoleAuth(kind)),
3606 StateUpdateKind::Database(kind) => Ok(BootstrapStateUpdateKind::Database(kind)),
3607 StateUpdateKind::Schema(kind) => Ok(BootstrapStateUpdateKind::Schema(kind)),
3608 StateUpdateKind::DefaultPrivilege(kind) => {
3609 Ok(BootstrapStateUpdateKind::DefaultPrivilege(kind))
3610 }
3611 StateUpdateKind::SystemPrivilege(kind) => {
3612 Ok(BootstrapStateUpdateKind::SystemPrivilege(kind))
3613 }
3614 StateUpdateKind::SystemConfiguration(kind) => {
3615 Ok(BootstrapStateUpdateKind::SystemConfiguration(kind))
3616 }
3617 StateUpdateKind::Cluster(kind) => Ok(BootstrapStateUpdateKind::Cluster(kind)),
3618 StateUpdateKind::NetworkPolicy(kind) => {
3619 Ok(BootstrapStateUpdateKind::NetworkPolicy(kind))
3620 }
3621 StateUpdateKind::IntrospectionSourceIndex(kind) => {
3622 Ok(BootstrapStateUpdateKind::IntrospectionSourceIndex(kind))
3623 }
3624 StateUpdateKind::ClusterReplica(kind) => {
3625 Ok(BootstrapStateUpdateKind::ClusterReplica(kind))
3626 }
3627 StateUpdateKind::SourceReferences(kind) => {
3628 Ok(BootstrapStateUpdateKind::SourceReferences(kind))
3629 }
3630 StateUpdateKind::SystemObjectMapping(kind) => {
3631 Ok(BootstrapStateUpdateKind::SystemObjectMapping(kind))
3632 }
3633 StateUpdateKind::TemporaryItem(kind) => Err(kind),
3634 StateUpdateKind::Item(kind) => Ok(BootstrapStateUpdateKind::Item(kind)),
3635 StateUpdateKind::Comment(kind) => Ok(BootstrapStateUpdateKind::Comment(kind)),
3636 StateUpdateKind::AuditLog(kind) => Ok(BootstrapStateUpdateKind::AuditLog(kind)),
3637 StateUpdateKind::StorageCollectionMetadata(kind) => {
3638 Ok(BootstrapStateUpdateKind::StorageCollectionMetadata(kind))
3639 }
3640 StateUpdateKind::UnfinalizedShard(kind) => {
3641 Ok(BootstrapStateUpdateKind::UnfinalizedShard(kind))
3642 }
3643 }
3644 }
3645}