1use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, LazyLock};
18
19use chrono::{DateTime, Utc};
20use mz_adapter_types::compaction::CompactionWindow;
21use mz_adapter_types::connection::ConnectionId;
22use mz_compute_client::logging::LogVariant;
23use mz_controller::clusters::{ClusterRole, ClusterStatus, ReplicaConfig, ReplicaLogging};
24use mz_controller_types::{ClusterId, ReplicaId};
25use mz_expr::{MirScalarExpr, OptimizedMirRelationExpr};
26use mz_ore::collections::CollectionExt;
27use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
28use mz_repr::network_policy_id::NetworkPolicyId;
29use mz_repr::optimize::OptimizerFeatureOverrides;
30use mz_repr::refresh_schedule::RefreshSchedule;
31use mz_repr::role_id::RoleId;
32use mz_repr::{
33 CatalogItemId, ColumnName, Diff, GlobalId, RelationDesc, RelationVersion,
34 RelationVersionSelector, SqlColumnType, Timestamp, VersionedRelationDesc,
35};
36use mz_sql::ast::display::AstDisplay;
37use mz_sql::ast::{
38 ColumnDef, ColumnOption, ColumnOptionDef, ColumnVersioned, Expr, Raw, RawDataType, Statement,
39 UnresolvedItemName, Value, WithOptionValue,
40};
41use mz_sql::catalog::{
42 CatalogClusterReplica, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem,
43 CatalogItemType as SqlCatalogItemType, CatalogItemType, CatalogSchema, CatalogTypeDetails,
44 DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, RoleAttributes, RoleMembership,
45 RoleVars, SystemObjectType,
46};
47use mz_sql::names::{
48 Aug, CommentObjectId, DatabaseId, DependencyIds, FullItemName, QualifiedItemName,
49 QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier,
50};
51use mz_sql::plan::{
52 ClusterSchedule, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, ConnectionDetails,
53 CreateClusterManagedPlan, CreateClusterPlan, CreateClusterVariant, CreateSourcePlan,
54 HirRelationExpr, NetworkPolicyRule, PlanError, WebhookBodyFormat, WebhookHeaders,
55 WebhookValidation,
56};
57use mz_sql::rbac;
58use mz_sql::session::vars::OwnedVarInput;
59use mz_storage_client::controller::IntrospectionType;
60use mz_storage_types::connections::inline::ReferencedConnection;
61use mz_storage_types::sinks::{SinkEnvelope, StorageSinkConnection};
62use mz_storage_types::sources::load_generator::LoadGenerator;
63use mz_storage_types::sources::{
64 GenericSourceConnection, SourceConnection, SourceDesc, SourceEnvelope, SourceExportDataConfig,
65 SourceExportDetails, Timeline,
66};
67use serde::ser::SerializeSeq;
68use serde::{Deserialize, Serialize};
69use timely::progress::Antichain;
70use tracing::debug;
71
72use crate::builtin::{MZ_CATALOG_SERVER_CLUSTER, MZ_SYSTEM_CLUSTER};
73use crate::durable;
74
75pub trait UpdateFrom<T>: From<T> {
77 fn update_from(&mut self, from: T);
78}
79
80#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
81pub struct Database {
82 pub name: String,
83 pub id: DatabaseId,
84 pub oid: u32,
85 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
86 pub schemas_by_id: BTreeMap<SchemaId, Schema>,
87 pub schemas_by_name: BTreeMap<String, SchemaId>,
88 pub owner_id: RoleId,
89 pub privileges: PrivilegeMap,
90}
91
92impl From<Database> for durable::Database {
93 fn from(database: Database) -> durable::Database {
94 durable::Database {
95 id: database.id,
96 oid: database.oid,
97 name: database.name,
98 owner_id: database.owner_id,
99 privileges: database.privileges.into_all_values().collect(),
100 }
101 }
102}
103
104impl From<durable::Database> for Database {
105 fn from(
106 durable::Database {
107 id,
108 oid,
109 name,
110 owner_id,
111 privileges,
112 }: durable::Database,
113 ) -> Database {
114 Database {
115 id,
116 oid,
117 schemas_by_id: BTreeMap::new(),
118 schemas_by_name: BTreeMap::new(),
119 name,
120 owner_id,
121 privileges: PrivilegeMap::from_mz_acl_items(privileges),
122 }
123 }
124}
125
126impl UpdateFrom<durable::Database> for Database {
127 fn update_from(
128 &mut self,
129 durable::Database {
130 id,
131 oid,
132 name,
133 owner_id,
134 privileges,
135 }: durable::Database,
136 ) {
137 self.id = id;
138 self.oid = oid;
139 self.name = name;
140 self.owner_id = owner_id;
141 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
142 }
143}
144
145#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
146pub struct Schema {
147 pub name: QualifiedSchemaName,
148 pub id: SchemaSpecifier,
149 pub oid: u32,
150 pub items: BTreeMap<String, CatalogItemId>,
151 pub functions: BTreeMap<String, CatalogItemId>,
152 pub types: BTreeMap<String, CatalogItemId>,
153 pub owner_id: RoleId,
154 pub privileges: PrivilegeMap,
155}
156
157impl From<Schema> for durable::Schema {
158 fn from(schema: Schema) -> durable::Schema {
159 durable::Schema {
160 id: schema.id.into(),
161 oid: schema.oid,
162 name: schema.name.schema,
163 database_id: schema.name.database.id(),
164 owner_id: schema.owner_id,
165 privileges: schema.privileges.into_all_values().collect(),
166 }
167 }
168}
169
170impl From<durable::Schema> for Schema {
171 fn from(
172 durable::Schema {
173 id,
174 oid,
175 name,
176 database_id,
177 owner_id,
178 privileges,
179 }: durable::Schema,
180 ) -> Schema {
181 Schema {
182 name: QualifiedSchemaName {
183 database: database_id.into(),
184 schema: name,
185 },
186 id: id.into(),
187 oid,
188 items: BTreeMap::new(),
189 functions: BTreeMap::new(),
190 types: BTreeMap::new(),
191 owner_id,
192 privileges: PrivilegeMap::from_mz_acl_items(privileges),
193 }
194 }
195}
196
197impl UpdateFrom<durable::Schema> for Schema {
198 fn update_from(
199 &mut self,
200 durable::Schema {
201 id,
202 oid,
203 name,
204 database_id,
205 owner_id,
206 privileges,
207 }: durable::Schema,
208 ) {
209 self.name = QualifiedSchemaName {
210 database: database_id.into(),
211 schema: name,
212 };
213 self.id = id.into();
214 self.oid = oid;
215 self.owner_id = owner_id;
216 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
217 }
218}
219
220#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
221pub struct Role {
222 pub name: String,
223 pub id: RoleId,
224 pub oid: u32,
225 pub attributes: RoleAttributes,
226 pub membership: RoleMembership,
227 pub vars: RoleVars,
228}
229
230impl Role {
231 pub fn is_user(&self) -> bool {
232 self.id.is_user()
233 }
234
235 pub fn vars<'a>(&'a self) -> impl Iterator<Item = (&'a str, &'a OwnedVarInput)> {
236 self.vars.map.iter().map(|(name, val)| (name.as_str(), val))
237 }
238}
239
240impl From<Role> for durable::Role {
241 fn from(role: Role) -> durable::Role {
242 durable::Role {
243 id: role.id,
244 oid: role.oid,
245 name: role.name,
246 attributes: role.attributes,
247 membership: role.membership,
248 vars: role.vars,
249 }
250 }
251}
252
253impl From<durable::Role> for Role {
254 fn from(
255 durable::Role {
256 id,
257 oid,
258 name,
259 attributes,
260 membership,
261 vars,
262 }: durable::Role,
263 ) -> Self {
264 Role {
265 name,
266 id,
267 oid,
268 attributes,
269 membership,
270 vars,
271 }
272 }
273}
274
275impl UpdateFrom<durable::Role> for Role {
276 fn update_from(
277 &mut self,
278 durable::Role {
279 id,
280 oid,
281 name,
282 attributes,
283 membership,
284 vars,
285 }: durable::Role,
286 ) {
287 self.id = id;
288 self.oid = oid;
289 self.name = name;
290 self.attributes = attributes;
291 self.membership = membership;
292 self.vars = vars;
293 }
294}
295
296#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
297pub struct RoleAuth {
298 pub role_id: RoleId,
299 pub password_hash: Option<String>,
300 pub updated_at: u64,
301}
302
303impl From<RoleAuth> for durable::RoleAuth {
304 fn from(role_auth: RoleAuth) -> durable::RoleAuth {
305 durable::RoleAuth {
306 role_id: role_auth.role_id,
307 password_hash: role_auth.password_hash,
308 updated_at: role_auth.updated_at,
309 }
310 }
311}
312
313impl From<durable::RoleAuth> for RoleAuth {
314 fn from(
315 durable::RoleAuth {
316 role_id,
317 password_hash,
318 updated_at,
319 }: durable::RoleAuth,
320 ) -> RoleAuth {
321 RoleAuth {
322 role_id,
323 password_hash,
324 updated_at,
325 }
326 }
327}
328
329impl UpdateFrom<durable::RoleAuth> for RoleAuth {
330 fn update_from(&mut self, from: durable::RoleAuth) {
331 self.role_id = from.role_id;
332 self.password_hash = from.password_hash;
333 }
334}
335
336#[derive(Debug, Serialize, Clone, PartialEq)]
337pub struct Cluster {
338 pub name: String,
339 pub id: ClusterId,
340 pub config: ClusterConfig,
341 #[serde(skip)]
342 pub log_indexes: BTreeMap<LogVariant, GlobalId>,
343 pub bound_objects: BTreeSet<CatalogItemId>,
346 pub replica_id_by_name_: BTreeMap<String, ReplicaId>,
347 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
348 pub replicas_by_id_: BTreeMap<ReplicaId, ClusterReplica>,
349 pub owner_id: RoleId,
350 pub privileges: PrivilegeMap,
351}
352
353impl Cluster {
354 pub fn role(&self) -> ClusterRole {
356 if self.name == MZ_SYSTEM_CLUSTER.name {
359 ClusterRole::SystemCritical
360 } else if self.name == MZ_CATALOG_SERVER_CLUSTER.name {
361 ClusterRole::System
362 } else {
363 ClusterRole::User
364 }
365 }
366
367 pub fn is_managed(&self) -> bool {
369 matches!(self.config.variant, ClusterVariant::Managed { .. })
370 }
371
372 pub fn user_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
374 self.replicas().filter(|r| !r.config.location.internal())
375 }
376
377 pub fn replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
379 self.replicas_by_id_.values()
380 }
381
382 pub fn replica(&self, replica_id: ReplicaId) -> Option<&ClusterReplica> {
384 self.replicas_by_id_.get(&replica_id)
385 }
386
387 pub fn replica_id(&self, name: &str) -> Option<ReplicaId> {
389 self.replica_id_by_name_.get(name).copied()
390 }
391
392 pub fn availability_zones(&self) -> Option<&[String]> {
394 match &self.config.variant {
395 ClusterVariant::Managed(managed) => Some(&managed.availability_zones),
396 ClusterVariant::Unmanaged => None,
397 }
398 }
399
400 pub fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
401 let name = self.name.clone();
402 let variant = match &self.config.variant {
403 ClusterVariant::Managed(ClusterVariantManaged {
404 size,
405 availability_zones,
406 logging,
407 replication_factor,
408 optimizer_feature_overrides,
409 schedule,
410 }) => {
411 let introspection = match logging {
412 ReplicaLogging {
413 log_logging,
414 interval: Some(interval),
415 } => Some(ComputeReplicaIntrospectionConfig {
416 debugging: *log_logging,
417 interval: interval.clone(),
418 }),
419 ReplicaLogging {
420 log_logging: _,
421 interval: None,
422 } => None,
423 };
424 let compute = ComputeReplicaConfig { introspection };
425 CreateClusterVariant::Managed(CreateClusterManagedPlan {
426 replication_factor: replication_factor.clone(),
427 size: size.clone(),
428 availability_zones: availability_zones.clone(),
429 compute,
430 optimizer_feature_overrides: optimizer_feature_overrides.clone(),
431 schedule: schedule.clone(),
432 })
433 }
434 ClusterVariant::Unmanaged => {
435 return Err(PlanError::Unsupported {
438 feature: "SHOW CREATE for unmanaged clusters".to_string(),
439 discussion_no: None,
440 });
441 }
442 };
443 let workload_class = self.config.workload_class.clone();
444 Ok(CreateClusterPlan {
445 name,
446 variant,
447 workload_class,
448 })
449 }
450}
451
452impl From<Cluster> for durable::Cluster {
453 fn from(cluster: Cluster) -> durable::Cluster {
454 durable::Cluster {
455 id: cluster.id,
456 name: cluster.name,
457 owner_id: cluster.owner_id,
458 privileges: cluster.privileges.into_all_values().collect(),
459 config: cluster.config.into(),
460 }
461 }
462}
463
464impl From<durable::Cluster> for Cluster {
465 fn from(
466 durable::Cluster {
467 id,
468 name,
469 owner_id,
470 privileges,
471 config,
472 }: durable::Cluster,
473 ) -> Self {
474 Cluster {
475 name: name.clone(),
476 id,
477 bound_objects: BTreeSet::new(),
478 log_indexes: BTreeMap::new(),
479 replica_id_by_name_: BTreeMap::new(),
480 replicas_by_id_: BTreeMap::new(),
481 owner_id,
482 privileges: PrivilegeMap::from_mz_acl_items(privileges),
483 config: config.into(),
484 }
485 }
486}
487
488impl UpdateFrom<durable::Cluster> for Cluster {
489 fn update_from(
490 &mut self,
491 durable::Cluster {
492 id,
493 name,
494 owner_id,
495 privileges,
496 config,
497 }: durable::Cluster,
498 ) {
499 self.id = id;
500 self.name = name;
501 self.owner_id = owner_id;
502 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
503 self.config = config.into();
504 }
505}
506
507#[derive(Debug, Serialize, Clone, PartialEq)]
508pub struct ClusterReplica {
509 pub name: String,
510 pub cluster_id: ClusterId,
511 pub replica_id: ReplicaId,
512 pub config: ReplicaConfig,
513 pub owner_id: RoleId,
514}
515
516impl From<ClusterReplica> for durable::ClusterReplica {
517 fn from(replica: ClusterReplica) -> durable::ClusterReplica {
518 durable::ClusterReplica {
519 cluster_id: replica.cluster_id,
520 replica_id: replica.replica_id,
521 name: replica.name,
522 config: replica.config.into(),
523 owner_id: replica.owner_id,
524 }
525 }
526}
527
528#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
529pub struct ClusterReplicaProcessStatus {
530 pub status: ClusterStatus,
531 pub time: DateTime<Utc>,
532}
533
534#[derive(Debug, Serialize, Clone, PartialEq)]
535pub struct SourceReferences {
536 pub updated_at: u64,
537 pub references: Vec<SourceReference>,
538}
539
540#[derive(Debug, Serialize, Clone, PartialEq)]
541pub struct SourceReference {
542 pub name: String,
543 pub namespace: Option<String>,
544 pub columns: Vec<String>,
545}
546
547impl From<SourceReference> for durable::SourceReference {
548 fn from(source_reference: SourceReference) -> durable::SourceReference {
549 durable::SourceReference {
550 name: source_reference.name,
551 namespace: source_reference.namespace,
552 columns: source_reference.columns,
553 }
554 }
555}
556
557impl SourceReferences {
558 pub fn to_durable(self, source_id: CatalogItemId) -> durable::SourceReferences {
559 durable::SourceReferences {
560 source_id,
561 updated_at: self.updated_at,
562 references: self.references.into_iter().map(Into::into).collect(),
563 }
564 }
565}
566
567impl From<durable::SourceReference> for SourceReference {
568 fn from(source_reference: durable::SourceReference) -> SourceReference {
569 SourceReference {
570 name: source_reference.name,
571 namespace: source_reference.namespace,
572 columns: source_reference.columns,
573 }
574 }
575}
576
577impl From<durable::SourceReferences> for SourceReferences {
578 fn from(source_references: durable::SourceReferences) -> SourceReferences {
579 SourceReferences {
580 updated_at: source_references.updated_at,
581 references: source_references
582 .references
583 .into_iter()
584 .map(|source_reference| source_reference.into())
585 .collect(),
586 }
587 }
588}
589
590impl From<mz_sql::plan::SourceReference> for SourceReference {
591 fn from(source_reference: mz_sql::plan::SourceReference) -> SourceReference {
592 SourceReference {
593 name: source_reference.name,
594 namespace: source_reference.namespace,
595 columns: source_reference.columns,
596 }
597 }
598}
599
600impl From<mz_sql::plan::SourceReferences> for SourceReferences {
601 fn from(source_references: mz_sql::plan::SourceReferences) -> SourceReferences {
602 SourceReferences {
603 updated_at: source_references.updated_at,
604 references: source_references
605 .references
606 .into_iter()
607 .map(|source_reference| source_reference.into())
608 .collect(),
609 }
610 }
611}
612
613impl From<SourceReferences> for mz_sql::plan::SourceReferences {
614 fn from(source_references: SourceReferences) -> mz_sql::plan::SourceReferences {
615 mz_sql::plan::SourceReferences {
616 updated_at: source_references.updated_at,
617 references: source_references
618 .references
619 .into_iter()
620 .map(|source_reference| source_reference.into())
621 .collect(),
622 }
623 }
624}
625
626impl From<SourceReference> for mz_sql::plan::SourceReference {
627 fn from(source_reference: SourceReference) -> mz_sql::plan::SourceReference {
628 mz_sql::plan::SourceReference {
629 name: source_reference.name,
630 namespace: source_reference.namespace,
631 columns: source_reference.columns,
632 }
633 }
634}
635
636#[derive(Clone, Debug, Serialize)]
637pub struct CatalogEntry {
638 pub item: CatalogItem,
639 #[serde(skip)]
640 pub referenced_by: Vec<CatalogItemId>,
641 #[serde(skip)]
645 pub used_by: Vec<CatalogItemId>,
646 pub id: CatalogItemId,
647 pub oid: u32,
648 pub name: QualifiedItemName,
649 pub owner_id: RoleId,
650 pub privileges: PrivilegeMap,
651}
652
653#[derive(Clone, Debug)]
671pub struct CatalogCollectionEntry {
672 pub entry: CatalogEntry,
673 pub version: RelationVersionSelector,
674}
675
676impl CatalogCollectionEntry {
677 pub fn desc(&self, name: &FullItemName) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
678 self.item().desc(name, self.version)
679 }
680
681 pub fn desc_opt(&self) -> Option<Cow<'_, RelationDesc>> {
682 self.item().desc_opt(self.version)
683 }
684}
685
686impl mz_sql::catalog::CatalogCollectionItem for CatalogCollectionEntry {
687 fn desc(&self, name: &FullItemName) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
688 CatalogCollectionEntry::desc(self, name)
689 }
690
691 fn global_id(&self) -> GlobalId {
692 match self.entry.item() {
693 CatalogItem::Source(source) => source.global_id,
694 CatalogItem::Log(log) => log.global_id,
695 CatalogItem::View(view) => view.global_id,
696 CatalogItem::MaterializedView(mv) => mv.global_id,
697 CatalogItem::Sink(sink) => sink.global_id,
698 CatalogItem::Index(index) => index.global_id,
699 CatalogItem::Type(ty) => ty.global_id,
700 CatalogItem::Func(func) => func.global_id,
701 CatalogItem::Secret(secret) => secret.global_id,
702 CatalogItem::Connection(conn) => conn.global_id,
703 CatalogItem::ContinualTask(ct) => ct.global_id,
704 CatalogItem::Table(table) => match self.version {
705 RelationVersionSelector::Latest => {
706 let (_version, gid) = table
707 .collections
708 .last_key_value()
709 .expect("at least one version");
710 *gid
711 }
712 RelationVersionSelector::Specific(version) => table
713 .collections
714 .get(&version)
715 .expect("catalog corruption, missing version!")
716 .clone(),
717 },
718 }
719 }
720}
721
722impl Deref for CatalogCollectionEntry {
723 type Target = CatalogEntry;
724
725 fn deref(&self) -> &CatalogEntry {
726 &self.entry
727 }
728}
729
730impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry {
731 fn name(&self) -> &QualifiedItemName {
732 self.entry.name()
733 }
734
735 fn id(&self) -> CatalogItemId {
736 self.entry.id()
737 }
738
739 fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
740 Box::new(self.entry.global_ids())
741 }
742
743 fn oid(&self) -> u32 {
744 self.entry.oid()
745 }
746
747 fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
748 self.entry.func()
749 }
750
751 fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
752 self.entry.source_desc()
753 }
754
755 fn connection(
756 &self,
757 ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
758 {
759 mz_sql::catalog::CatalogItem::connection(&self.entry)
760 }
761
762 fn create_sql(&self) -> &str {
763 self.entry.create_sql()
764 }
765
766 fn item_type(&self) -> SqlCatalogItemType {
767 self.entry.item_type()
768 }
769
770 fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
771 self.entry.index_details()
772 }
773
774 fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
775 self.entry.writable_table_details()
776 }
777
778 fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
779 self.entry.type_details()
780 }
781
782 fn references(&self) -> &ResolvedIds {
783 self.entry.references()
784 }
785
786 fn uses(&self) -> BTreeSet<CatalogItemId> {
787 self.entry.uses()
788 }
789
790 fn referenced_by(&self) -> &[CatalogItemId] {
791 self.entry.referenced_by()
792 }
793
794 fn used_by(&self) -> &[CatalogItemId] {
795 self.entry.used_by()
796 }
797
798 fn subsource_details(
799 &self,
800 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
801 self.entry.subsource_details()
802 }
803
804 fn source_export_details(
805 &self,
806 ) -> Option<(
807 CatalogItemId,
808 &UnresolvedItemName,
809 &SourceExportDetails,
810 &SourceExportDataConfig<ReferencedConnection>,
811 )> {
812 self.entry.source_export_details()
813 }
814
815 fn is_progress_source(&self) -> bool {
816 self.entry.is_progress_source()
817 }
818
819 fn progress_id(&self) -> Option<CatalogItemId> {
820 self.entry.progress_id()
821 }
822
823 fn owner_id(&self) -> RoleId {
824 *self.entry.owner_id()
825 }
826
827 fn privileges(&self) -> &PrivilegeMap {
828 self.entry.privileges()
829 }
830
831 fn cluster_id(&self) -> Option<ClusterId> {
832 self.entry.item().cluster_id()
833 }
834
835 fn at_version(
836 &self,
837 version: RelationVersionSelector,
838 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
839 Box::new(CatalogCollectionEntry {
840 entry: self.entry.clone(),
841 version,
842 })
843 }
844
845 fn latest_version(&self) -> Option<RelationVersion> {
846 self.entry.latest_version()
847 }
848}
849
850#[derive(Debug, Clone, Serialize)]
851pub enum CatalogItem {
852 Table(Table),
853 Source(Source),
854 Log(Log),
855 View(View),
856 MaterializedView(MaterializedView),
857 Sink(Sink),
858 Index(Index),
859 Type(Type),
860 Func(Func),
861 Secret(Secret),
862 Connection(Connection),
863 ContinualTask(ContinualTask),
864}
865
866impl From<CatalogEntry> for durable::Item {
867 fn from(entry: CatalogEntry) -> durable::Item {
868 let (create_sql, global_id, extra_versions) = entry.item.into_serialized();
869 durable::Item {
870 id: entry.id,
871 oid: entry.oid,
872 global_id,
873 schema_id: entry.name.qualifiers.schema_spec.into(),
874 name: entry.name.item,
875 create_sql,
876 owner_id: entry.owner_id,
877 privileges: entry.privileges.into_all_values().collect(),
878 extra_versions,
879 }
880 }
881}
882
883#[derive(Debug, Clone, Serialize)]
884pub struct Table {
885 pub create_sql: Option<String>,
887 pub desc: VersionedRelationDesc,
889 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
891 pub collections: BTreeMap<RelationVersion, GlobalId>,
892 #[serde(skip)]
894 pub conn_id: Option<ConnectionId>,
895 pub resolved_ids: ResolvedIds,
897 pub custom_logical_compaction_window: Option<CompactionWindow>,
899 pub is_retained_metrics_object: bool,
904 pub data_source: TableDataSource,
906}
907
908impl Table {
909 pub fn timeline(&self) -> Timeline {
910 match &self.data_source {
911 TableDataSource::TableWrites { .. } => Timeline::EpochMilliseconds,
914 TableDataSource::DataSource { timeline, .. } => timeline.clone(),
915 }
916 }
917
918 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
920 self.collections.values().copied()
921 }
922
923 pub fn global_id_writes(&self) -> GlobalId {
925 self.collections
926 .last_key_value()
927 .expect("at least one version of a table")
928 .1
929 .clone()
930 }
931
932 pub fn collection_descs(
934 &self,
935 ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
936 self.collections.iter().map(|(version, gid)| {
937 let desc = self
938 .desc
939 .at_version(RelationVersionSelector::Specific(*version));
940 (*gid, *version, desc)
941 })
942 }
943
944 pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
946 let (version, _gid) = self
947 .collections
948 .iter()
949 .find(|(_version, gid)| *gid == id)
950 .expect("GlobalId to exist");
951 self.desc
952 .at_version(RelationVersionSelector::Specific(*version))
953 }
954}
955
956#[derive(Clone, Debug, Serialize)]
957pub enum TableDataSource {
958 TableWrites {
960 #[serde(skip)]
961 defaults: Vec<Expr<Aug>>,
962 },
963
964 DataSource {
967 desc: DataSourceDesc,
968 timeline: Timeline,
969 },
970}
971
972#[derive(Debug, Clone, Serialize)]
973pub enum DataSourceDesc {
974 Ingestion {
976 desc: SourceDesc<ReferencedConnection>,
977 cluster_id: ClusterId,
978 },
979 OldSyntaxIngestion {
981 desc: SourceDesc<ReferencedConnection>,
982 cluster_id: ClusterId,
983 progress_subsource: CatalogItemId,
986 data_config: SourceExportDataConfig<ReferencedConnection>,
987 details: SourceExportDetails,
988 },
989 IngestionExport {
997 ingestion_id: CatalogItemId,
998 external_reference: UnresolvedItemName,
999 details: SourceExportDetails,
1000 data_config: SourceExportDataConfig<ReferencedConnection>,
1001 },
1002 Introspection(IntrospectionType),
1004 Progress,
1006 Webhook {
1008 validate_using: Option<WebhookValidation>,
1010 body_format: WebhookBodyFormat,
1012 headers: WebhookHeaders,
1014 cluster_id: ClusterId,
1016 },
1017}
1018
1019impl DataSourceDesc {
1020 pub fn formats(&self) -> (Option<&str>, Option<&str>) {
1022 match &self {
1023 DataSourceDesc::Ingestion { .. } => (None, None),
1024 DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1025 match &data_config.encoding.as_ref() {
1026 Some(encoding) => match &encoding.key {
1027 Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1028 None => (None, Some(encoding.value.type_())),
1029 },
1030 None => (None, None),
1031 }
1032 }
1033 DataSourceDesc::IngestionExport { data_config, .. } => match &data_config.encoding {
1034 Some(encoding) => match &encoding.key {
1035 Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1036 None => (None, Some(encoding.value.type_())),
1037 },
1038 None => (None, None),
1039 },
1040 DataSourceDesc::Introspection(_)
1041 | DataSourceDesc::Webhook { .. }
1042 | DataSourceDesc::Progress => (None, None),
1043 }
1044 }
1045
1046 pub fn envelope(&self) -> Option<&str> {
1048 fn envelope_string(envelope: &SourceEnvelope) -> &str {
1053 match envelope {
1054 SourceEnvelope::None(_) => "none",
1055 SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
1056 mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert",
1057 mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
1058 "debezium"
1062 }
1063 mz_storage_types::sources::envelope::UpsertStyle::ValueErrInline { .. } => {
1064 "upsert-value-err-inline"
1065 }
1066 },
1067 SourceEnvelope::CdcV2 => {
1068 "materialize"
1071 }
1072 }
1073 }
1074
1075 match self {
1076 DataSourceDesc::Ingestion { .. } => None,
1081 DataSourceDesc::OldSyntaxIngestion { data_config, .. } => {
1082 Some(envelope_string(&data_config.envelope))
1083 }
1084 DataSourceDesc::IngestionExport { data_config, .. } => {
1085 Some(envelope_string(&data_config.envelope))
1086 }
1087 DataSourceDesc::Introspection(_)
1088 | DataSourceDesc::Webhook { .. }
1089 | DataSourceDesc::Progress => None,
1090 }
1091 }
1092}
1093
1094#[derive(Debug, Clone, Serialize)]
1095pub struct Source {
1096 pub create_sql: Option<String>,
1098 pub global_id: GlobalId,
1100 #[serde(skip)]
1102 pub data_source: DataSourceDesc,
1103 pub desc: RelationDesc,
1105 pub timeline: Timeline,
1107 pub resolved_ids: ResolvedIds,
1109 pub custom_logical_compaction_window: Option<CompactionWindow>,
1113 pub is_retained_metrics_object: bool,
1116}
1117
1118impl Source {
1119 pub fn new(
1126 plan: CreateSourcePlan,
1127 global_id: GlobalId,
1128 resolved_ids: ResolvedIds,
1129 custom_logical_compaction_window: Option<CompactionWindow>,
1130 is_retained_metrics_object: bool,
1131 ) -> Source {
1132 Source {
1133 create_sql: Some(plan.source.create_sql),
1134 data_source: match plan.source.data_source {
1135 mz_sql::plan::DataSourceDesc::Ingestion(desc) => DataSourceDesc::Ingestion {
1136 desc,
1137 cluster_id: plan
1138 .in_cluster
1139 .expect("ingestion-based sources must be given a cluster ID"),
1140 },
1141 mz_sql::plan::DataSourceDesc::OldSyntaxIngestion {
1142 desc,
1143 progress_subsource,
1144 data_config,
1145 details,
1146 } => DataSourceDesc::OldSyntaxIngestion {
1147 desc,
1148 cluster_id: plan
1149 .in_cluster
1150 .expect("ingestion-based sources must be given a cluster ID"),
1151 progress_subsource,
1152 data_config,
1153 details,
1154 },
1155 mz_sql::plan::DataSourceDesc::Progress => {
1156 assert!(
1157 plan.in_cluster.is_none(),
1158 "subsources must not have a host config or cluster_id defined"
1159 );
1160 DataSourceDesc::Progress
1161 }
1162 mz_sql::plan::DataSourceDesc::IngestionExport {
1163 ingestion_id,
1164 external_reference,
1165 details,
1166 data_config,
1167 } => {
1168 assert!(
1169 plan.in_cluster.is_none(),
1170 "subsources must not have a host config or cluster_id defined"
1171 );
1172 DataSourceDesc::IngestionExport {
1173 ingestion_id,
1174 external_reference,
1175 details,
1176 data_config,
1177 }
1178 }
1179 mz_sql::plan::DataSourceDesc::Webhook {
1180 validate_using,
1181 body_format,
1182 headers,
1183 cluster_id,
1184 } => {
1185 mz_ore::soft_assert_or_log!(
1186 cluster_id.is_none(),
1187 "cluster_id set at Source level for Webhooks"
1188 );
1189 DataSourceDesc::Webhook {
1190 validate_using,
1191 body_format,
1192 headers,
1193 cluster_id: plan
1194 .in_cluster
1195 .expect("webhook sources must be given a cluster ID"),
1196 }
1197 }
1198 },
1199 desc: plan.source.desc,
1200 global_id,
1201 timeline: plan.timeline,
1202 resolved_ids,
1203 custom_logical_compaction_window: plan
1204 .source
1205 .compaction_window
1206 .or(custom_logical_compaction_window),
1207 is_retained_metrics_object,
1208 }
1209 }
1210
1211 pub fn source_type(&self) -> &str {
1213 match &self.data_source {
1214 DataSourceDesc::Ingestion { desc, .. }
1215 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.name(),
1216 DataSourceDesc::Progress => "progress",
1217 DataSourceDesc::IngestionExport { .. } => "subsource",
1218 DataSourceDesc::Introspection(_) => "source",
1219 DataSourceDesc::Webhook { .. } => "webhook",
1220 }
1221 }
1222
1223 pub fn connection_id(&self) -> Option<CatalogItemId> {
1225 match &self.data_source {
1226 DataSourceDesc::Ingestion { desc, .. }
1227 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => desc.connection.connection_id(),
1228 DataSourceDesc::IngestionExport { .. }
1229 | DataSourceDesc::Introspection(_)
1230 | DataSourceDesc::Webhook { .. }
1231 | DataSourceDesc::Progress => None,
1232 }
1233 }
1234
1235 pub fn global_id(&self) -> GlobalId {
1237 self.global_id
1238 }
1239
1240 pub fn user_controllable_persist_shard_count(&self) -> i64 {
1248 match &self.data_source {
1249 DataSourceDesc::Ingestion { .. } => 0,
1250 DataSourceDesc::OldSyntaxIngestion { desc, .. } => {
1251 match &desc.connection {
1252 GenericSourceConnection::Postgres(_)
1256 | GenericSourceConnection::MySql(_)
1257 | GenericSourceConnection::SqlServer(_) => 0,
1258 GenericSourceConnection::LoadGenerator(lg) => match lg.load_generator {
1259 LoadGenerator::Clock
1261 | LoadGenerator::Counter { .. }
1262 | LoadGenerator::Datums
1263 | LoadGenerator::KeyValue(_) => 1,
1264 LoadGenerator::Auction
1265 | LoadGenerator::Marketing
1266 | LoadGenerator::Tpch { .. } => 0,
1267 },
1268 GenericSourceConnection::Kafka(_) => 1,
1269 }
1270 }
1271 DataSourceDesc::IngestionExport { .. } => 1,
1274 DataSourceDesc::Webhook { .. } => 1,
1275 DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => 0,
1278 }
1279 }
1280}
1281
1282#[derive(Debug, Clone, Serialize)]
1283pub struct Log {
1284 pub variant: LogVariant,
1286 pub global_id: GlobalId,
1288}
1289
1290impl Log {
1291 pub fn global_id(&self) -> GlobalId {
1293 self.global_id
1294 }
1295}
1296
1297#[derive(Debug, Clone, Serialize)]
1298pub struct Sink {
1299 pub create_sql: String,
1301 pub global_id: GlobalId,
1303 pub from: GlobalId,
1305 pub connection: StorageSinkConnection<ReferencedConnection>,
1307 pub envelope: SinkEnvelope,
1311 pub with_snapshot: bool,
1313 pub version: u64,
1315 pub resolved_ids: ResolvedIds,
1317 pub cluster_id: ClusterId,
1319}
1320
1321impl Sink {
1322 pub fn sink_type(&self) -> &str {
1323 self.connection.name()
1324 }
1325
1326 pub fn envelope(&self) -> Option<&str> {
1328 match &self.envelope {
1329 SinkEnvelope::Debezium => Some("debezium"),
1330 SinkEnvelope::Upsert => Some("upsert"),
1331 }
1332 }
1333
1334 pub fn combined_format(&self) -> Option<Cow<'_, str>> {
1339 match &self.connection {
1340 StorageSinkConnection::Kafka(connection) => Some(connection.format.get_format_name()),
1341 _ => None,
1342 }
1343 }
1344
1345 pub fn formats(&self) -> Option<(Option<&str>, &str)> {
1347 match &self.connection {
1348 StorageSinkConnection::Kafka(connection) => {
1349 let key_format = connection
1350 .format
1351 .key_format
1352 .as_ref()
1353 .map(|f| f.get_format_name());
1354 let value_format = connection.format.value_format.get_format_name();
1355 Some((key_format, value_format))
1356 }
1357 _ => None,
1358 }
1359 }
1360
1361 pub fn connection_id(&self) -> Option<CatalogItemId> {
1362 self.connection.connection_id()
1363 }
1364
1365 pub fn global_id(&self) -> GlobalId {
1367 self.global_id
1368 }
1369}
1370
1371#[derive(Debug, Clone, Serialize)]
1372pub struct View {
1373 pub create_sql: String,
1375 pub global_id: GlobalId,
1377 pub raw_expr: Arc<HirRelationExpr>,
1379 pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1381 pub desc: RelationDesc,
1383 pub conn_id: Option<ConnectionId>,
1385 pub resolved_ids: ResolvedIds,
1387 pub dependencies: DependencyIds,
1389}
1390
1391impl View {
1392 pub fn global_id(&self) -> GlobalId {
1394 self.global_id
1395 }
1396}
1397
1398#[derive(Debug, Clone, Serialize)]
1399pub struct MaterializedView {
1400 pub create_sql: String,
1402 pub global_id: GlobalId,
1404 pub raw_expr: Arc<HirRelationExpr>,
1406 pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1408 pub desc: RelationDesc,
1410 pub resolved_ids: ResolvedIds,
1412 pub dependencies: DependencyIds,
1414 pub cluster_id: ClusterId,
1416 pub non_null_assertions: Vec<usize>,
1420 pub custom_logical_compaction_window: Option<CompactionWindow>,
1422 pub refresh_schedule: Option<RefreshSchedule>,
1424 pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1429}
1430
1431impl MaterializedView {
1432 pub fn global_id(&self) -> GlobalId {
1434 self.global_id
1435 }
1436}
1437
1438#[derive(Debug, Clone, Serialize)]
1439pub struct Index {
1440 pub create_sql: String,
1442 pub global_id: GlobalId,
1444 pub on: GlobalId,
1446 pub keys: Arc<[MirScalarExpr]>,
1448 pub conn_id: Option<ConnectionId>,
1450 pub resolved_ids: ResolvedIds,
1452 pub cluster_id: ClusterId,
1454 pub custom_logical_compaction_window: Option<CompactionWindow>,
1456 pub is_retained_metrics_object: bool,
1461}
1462
1463impl Index {
1464 pub fn global_id(&self) -> GlobalId {
1466 self.global_id
1467 }
1468}
1469
1470#[derive(Debug, Clone, Serialize)]
1471pub struct Type {
1472 pub create_sql: Option<String>,
1474 pub global_id: GlobalId,
1476 #[serde(skip)]
1477 pub details: CatalogTypeDetails<IdReference>,
1478 pub desc: Option<RelationDesc>,
1479 pub resolved_ids: ResolvedIds,
1481}
1482
1483#[derive(Debug, Clone, Serialize)]
1484pub struct Func {
1485 #[serde(skip)]
1487 pub inner: &'static mz_sql::func::Func,
1488 pub global_id: GlobalId,
1490}
1491
1492#[derive(Debug, Clone, Serialize)]
1493pub struct Secret {
1494 pub create_sql: String,
1496 pub global_id: GlobalId,
1498}
1499
1500#[derive(Debug, Clone, Serialize)]
1501pub struct Connection {
1502 pub create_sql: String,
1504 pub global_id: GlobalId,
1506 pub details: ConnectionDetails,
1508 pub resolved_ids: ResolvedIds,
1510}
1511
1512impl Connection {
1513 pub fn global_id(&self) -> GlobalId {
1515 self.global_id
1516 }
1517}
1518
1519#[derive(Debug, Clone, Serialize)]
1520pub struct ContinualTask {
1521 pub create_sql: String,
1523 pub global_id: GlobalId,
1525 pub input_id: GlobalId,
1527 pub with_snapshot: bool,
1528 pub raw_expr: Arc<HirRelationExpr>,
1533 pub desc: RelationDesc,
1535 pub resolved_ids: ResolvedIds,
1537 pub dependencies: DependencyIds,
1539 pub cluster_id: ClusterId,
1541 pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1543}
1544
1545impl ContinualTask {
1546 pub fn global_id(&self) -> GlobalId {
1548 self.global_id
1549 }
1550}
1551
1552#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
1553pub struct NetworkPolicy {
1554 pub name: String,
1555 pub id: NetworkPolicyId,
1556 pub oid: u32,
1557 pub rules: Vec<NetworkPolicyRule>,
1558 pub owner_id: RoleId,
1559 pub privileges: PrivilegeMap,
1560}
1561
1562impl From<NetworkPolicy> for durable::NetworkPolicy {
1563 fn from(policy: NetworkPolicy) -> durable::NetworkPolicy {
1564 durable::NetworkPolicy {
1565 id: policy.id,
1566 oid: policy.oid,
1567 name: policy.name,
1568 rules: policy.rules,
1569 owner_id: policy.owner_id,
1570 privileges: policy.privileges.into_all_values().collect(),
1571 }
1572 }
1573}
1574
1575impl From<durable::NetworkPolicy> for NetworkPolicy {
1576 fn from(
1577 durable::NetworkPolicy {
1578 id,
1579 oid,
1580 name,
1581 rules,
1582 owner_id,
1583 privileges,
1584 }: durable::NetworkPolicy,
1585 ) -> Self {
1586 NetworkPolicy {
1587 id,
1588 oid,
1589 name,
1590 rules,
1591 owner_id,
1592 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1593 }
1594 }
1595}
1596
1597impl UpdateFrom<durable::NetworkPolicy> for NetworkPolicy {
1598 fn update_from(
1599 &mut self,
1600 durable::NetworkPolicy {
1601 id,
1602 oid,
1603 name,
1604 rules,
1605 owner_id,
1606 privileges,
1607 }: durable::NetworkPolicy,
1608 ) {
1609 self.id = id;
1610 self.oid = oid;
1611 self.name = name;
1612 self.rules = rules;
1613 self.owner_id = owner_id;
1614 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1615 }
1616}
1617
1618impl CatalogItem {
1619 pub fn typ(&self) -> mz_sql::catalog::CatalogItemType {
1621 match self {
1622 CatalogItem::Table(_) => mz_sql::catalog::CatalogItemType::Table,
1623 CatalogItem::Source(_) => mz_sql::catalog::CatalogItemType::Source,
1624 CatalogItem::Log(_) => mz_sql::catalog::CatalogItemType::Source,
1625 CatalogItem::Sink(_) => mz_sql::catalog::CatalogItemType::Sink,
1626 CatalogItem::View(_) => mz_sql::catalog::CatalogItemType::View,
1627 CatalogItem::MaterializedView(_) => mz_sql::catalog::CatalogItemType::MaterializedView,
1628 CatalogItem::Index(_) => mz_sql::catalog::CatalogItemType::Index,
1629 CatalogItem::Type(_) => mz_sql::catalog::CatalogItemType::Type,
1630 CatalogItem::Func(_) => mz_sql::catalog::CatalogItemType::Func,
1631 CatalogItem::Secret(_) => mz_sql::catalog::CatalogItemType::Secret,
1632 CatalogItem::Connection(_) => mz_sql::catalog::CatalogItemType::Connection,
1633 CatalogItem::ContinualTask(_) => mz_sql::catalog::CatalogItemType::ContinualTask,
1634 }
1635 }
1636
1637 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1639 let gid = match self {
1640 CatalogItem::Source(source) => source.global_id,
1641 CatalogItem::Log(log) => log.global_id,
1642 CatalogItem::Sink(sink) => sink.global_id,
1643 CatalogItem::View(view) => view.global_id,
1644 CatalogItem::MaterializedView(mv) => mv.global_id,
1645 CatalogItem::ContinualTask(ct) => ct.global_id,
1646 CatalogItem::Index(index) => index.global_id,
1647 CatalogItem::Func(func) => func.global_id,
1648 CatalogItem::Type(ty) => ty.global_id,
1649 CatalogItem::Secret(secret) => secret.global_id,
1650 CatalogItem::Connection(conn) => conn.global_id,
1651 CatalogItem::Table(table) => {
1652 return itertools::Either::Left(table.collections.values().copied());
1653 }
1654 };
1655 itertools::Either::Right(std::iter::once(gid))
1656 }
1657
1658 pub fn latest_global_id(&self) -> GlobalId {
1662 match self {
1663 CatalogItem::Source(source) => source.global_id,
1664 CatalogItem::Log(log) => log.global_id,
1665 CatalogItem::Sink(sink) => sink.global_id,
1666 CatalogItem::View(view) => view.global_id,
1667 CatalogItem::MaterializedView(mv) => mv.global_id,
1668 CatalogItem::ContinualTask(ct) => ct.global_id,
1669 CatalogItem::Index(index) => index.global_id,
1670 CatalogItem::Func(func) => func.global_id,
1671 CatalogItem::Type(ty) => ty.global_id,
1672 CatalogItem::Secret(secret) => secret.global_id,
1673 CatalogItem::Connection(conn) => conn.global_id,
1674 CatalogItem::Table(table) => table.global_id_writes(),
1675 }
1676 }
1677
1678 pub fn is_storage_collection(&self) -> bool {
1680 match self {
1681 CatalogItem::Table(_)
1682 | CatalogItem::Source(_)
1683 | CatalogItem::MaterializedView(_)
1684 | CatalogItem::Sink(_)
1685 | CatalogItem::ContinualTask(_) => true,
1686 CatalogItem::Log(_)
1687 | CatalogItem::View(_)
1688 | CatalogItem::Index(_)
1689 | CatalogItem::Type(_)
1690 | CatalogItem::Func(_)
1691 | CatalogItem::Secret(_)
1692 | CatalogItem::Connection(_) => false,
1693 }
1694 }
1695
1696 pub fn desc(
1697 &self,
1698 name: &FullItemName,
1699 version: RelationVersionSelector,
1700 ) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
1701 self.desc_opt(version)
1702 .ok_or_else(|| SqlCatalogError::InvalidDependency {
1703 name: name.to_string(),
1704 typ: self.typ(),
1705 })
1706 }
1707
1708 pub fn desc_opt(&self, version: RelationVersionSelector) -> Option<Cow<'_, RelationDesc>> {
1709 match &self {
1710 CatalogItem::Source(src) => Some(Cow::Borrowed(&src.desc)),
1711 CatalogItem::Log(log) => Some(Cow::Owned(log.variant.desc())),
1712 CatalogItem::Table(tbl) => Some(Cow::Owned(tbl.desc.at_version(version))),
1713 CatalogItem::View(view) => Some(Cow::Borrowed(&view.desc)),
1714 CatalogItem::MaterializedView(mview) => Some(Cow::Borrowed(&mview.desc)),
1715 CatalogItem::Type(typ) => typ.desc.as_ref().map(Cow::Borrowed),
1716 CatalogItem::ContinualTask(ct) => Some(Cow::Borrowed(&ct.desc)),
1717 CatalogItem::Func(_)
1718 | CatalogItem::Index(_)
1719 | CatalogItem::Sink(_)
1720 | CatalogItem::Secret(_)
1721 | CatalogItem::Connection(_) => None,
1722 }
1723 }
1724
1725 pub fn func(
1726 &self,
1727 entry: &CatalogEntry,
1728 ) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
1729 match &self {
1730 CatalogItem::Func(func) => Ok(func.inner),
1731 _ => Err(SqlCatalogError::UnexpectedType {
1732 name: entry.name().item.to_string(),
1733 actual_type: entry.item_type(),
1734 expected_type: CatalogItemType::Func,
1735 }),
1736 }
1737 }
1738
1739 pub fn source_desc(
1740 &self,
1741 entry: &CatalogEntry,
1742 ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
1743 match &self {
1744 CatalogItem::Source(source) => match &source.data_source {
1745 DataSourceDesc::Ingestion { desc, .. }
1746 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => Ok(Some(desc)),
1747 DataSourceDesc::IngestionExport { .. }
1748 | DataSourceDesc::Introspection(_)
1749 | DataSourceDesc::Webhook { .. }
1750 | DataSourceDesc::Progress => Ok(None),
1751 },
1752 _ => Err(SqlCatalogError::UnexpectedType {
1753 name: entry.name().item.to_string(),
1754 actual_type: entry.item_type(),
1755 expected_type: CatalogItemType::Source,
1756 }),
1757 }
1758 }
1759
1760 pub fn is_progress_source(&self) -> bool {
1762 matches!(
1763 self,
1764 CatalogItem::Source(Source {
1765 data_source: DataSourceDesc::Progress,
1766 ..
1767 })
1768 )
1769 }
1770
1771 pub fn references(&self) -> &ResolvedIds {
1774 static EMPTY: LazyLock<ResolvedIds> = LazyLock::new(ResolvedIds::empty);
1775 match self {
1776 CatalogItem::Func(_) => &*EMPTY,
1777 CatalogItem::Index(idx) => &idx.resolved_ids,
1778 CatalogItem::Sink(sink) => &sink.resolved_ids,
1779 CatalogItem::Source(source) => &source.resolved_ids,
1780 CatalogItem::Log(_) => &*EMPTY,
1781 CatalogItem::Table(table) => &table.resolved_ids,
1782 CatalogItem::Type(typ) => &typ.resolved_ids,
1783 CatalogItem::View(view) => &view.resolved_ids,
1784 CatalogItem::MaterializedView(mview) => &mview.resolved_ids,
1785 CatalogItem::Secret(_) => &*EMPTY,
1786 CatalogItem::Connection(connection) => &connection.resolved_ids,
1787 CatalogItem::ContinualTask(ct) => &ct.resolved_ids,
1788 }
1789 }
1790
1791 pub fn uses(&self) -> BTreeSet<CatalogItemId> {
1797 let mut uses: BTreeSet<_> = self.references().items().copied().collect();
1798 match self {
1799 CatalogItem::Func(_) => {}
1802 CatalogItem::Index(_) => {}
1803 CatalogItem::Sink(_) => {}
1804 CatalogItem::Source(_) => {}
1805 CatalogItem::Log(_) => {}
1806 CatalogItem::Table(_) => {}
1807 CatalogItem::Type(_) => {}
1808 CatalogItem::View(view) => uses.extend(view.dependencies.0.iter().copied()),
1809 CatalogItem::MaterializedView(mview) => {
1810 uses.extend(mview.dependencies.0.iter().copied())
1811 }
1812 CatalogItem::ContinualTask(ct) => uses.extend(ct.dependencies.0.iter().copied()),
1813 CatalogItem::Secret(_) => {}
1814 CatalogItem::Connection(_) => {}
1815 }
1816 uses
1817 }
1818
1819 pub fn conn_id(&self) -> Option<&ConnectionId> {
1822 match self {
1823 CatalogItem::View(view) => view.conn_id.as_ref(),
1824 CatalogItem::Index(index) => index.conn_id.as_ref(),
1825 CatalogItem::Table(table) => table.conn_id.as_ref(),
1826 CatalogItem::Log(_)
1827 | CatalogItem::Source(_)
1828 | CatalogItem::Sink(_)
1829 | CatalogItem::MaterializedView(_)
1830 | CatalogItem::Secret(_)
1831 | CatalogItem::Type(_)
1832 | CatalogItem::Func(_)
1833 | CatalogItem::Connection(_)
1834 | CatalogItem::ContinualTask(_) => None,
1835 }
1836 }
1837
1838 pub fn is_temporary(&self) -> bool {
1840 self.conn_id().is_some()
1841 }
1842
1843 pub fn rename_schema_refs(
1844 &self,
1845 database_name: &str,
1846 cur_schema_name: &str,
1847 new_schema_name: &str,
1848 ) -> Result<CatalogItem, (String, String)> {
1849 let do_rewrite = |create_sql: String| -> Result<String, (String, String)> {
1850 let mut create_stmt = mz_sql::parse::parse(&create_sql)
1851 .expect("invalid create sql persisted to catalog")
1852 .into_element()
1853 .ast;
1854
1855 mz_sql::ast::transform::create_stmt_rename_schema_refs(
1857 &mut create_stmt,
1858 database_name,
1859 cur_schema_name,
1860 new_schema_name,
1861 )?;
1862
1863 Ok(create_stmt.to_ast_string_stable())
1864 };
1865
1866 match self {
1867 CatalogItem::Table(i) => {
1868 let mut i = i.clone();
1869 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1870 Ok(CatalogItem::Table(i))
1871 }
1872 CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
1873 CatalogItem::Source(i) => {
1874 let mut i = i.clone();
1875 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1876 Ok(CatalogItem::Source(i))
1877 }
1878 CatalogItem::Sink(i) => {
1879 let mut i = i.clone();
1880 i.create_sql = do_rewrite(i.create_sql)?;
1881 Ok(CatalogItem::Sink(i))
1882 }
1883 CatalogItem::View(i) => {
1884 let mut i = i.clone();
1885 i.create_sql = do_rewrite(i.create_sql)?;
1886 Ok(CatalogItem::View(i))
1887 }
1888 CatalogItem::MaterializedView(i) => {
1889 let mut i = i.clone();
1890 i.create_sql = do_rewrite(i.create_sql)?;
1891 Ok(CatalogItem::MaterializedView(i))
1892 }
1893 CatalogItem::Index(i) => {
1894 let mut i = i.clone();
1895 i.create_sql = do_rewrite(i.create_sql)?;
1896 Ok(CatalogItem::Index(i))
1897 }
1898 CatalogItem::Secret(i) => {
1899 let mut i = i.clone();
1900 i.create_sql = do_rewrite(i.create_sql)?;
1901 Ok(CatalogItem::Secret(i))
1902 }
1903 CatalogItem::Connection(i) => {
1904 let mut i = i.clone();
1905 i.create_sql = do_rewrite(i.create_sql)?;
1906 Ok(CatalogItem::Connection(i))
1907 }
1908 CatalogItem::Type(i) => {
1909 let mut i = i.clone();
1910 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1911 Ok(CatalogItem::Type(i))
1912 }
1913 CatalogItem::Func(i) => Ok(CatalogItem::Func(i.clone())),
1914 CatalogItem::ContinualTask(i) => {
1915 let mut i = i.clone();
1916 i.create_sql = do_rewrite(i.create_sql)?;
1917 Ok(CatalogItem::ContinualTask(i))
1918 }
1919 }
1920 }
1921
1922 pub fn rename_item_refs(
1926 &self,
1927 from: FullItemName,
1928 to_item_name: String,
1929 rename_self: bool,
1930 ) -> Result<CatalogItem, String> {
1931 let do_rewrite = |create_sql: String| -> Result<String, String> {
1932 let mut create_stmt = mz_sql::parse::parse(&create_sql)
1933 .expect("invalid create sql persisted to catalog")
1934 .into_element()
1935 .ast;
1936 if rename_self {
1937 mz_sql::ast::transform::create_stmt_rename(&mut create_stmt, to_item_name.clone());
1938 }
1939 mz_sql::ast::transform::create_stmt_rename_refs(&mut create_stmt, from, to_item_name)?;
1941 Ok(create_stmt.to_ast_string_stable())
1942 };
1943
1944 match self {
1945 CatalogItem::Table(i) => {
1946 let mut i = i.clone();
1947 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1948 Ok(CatalogItem::Table(i))
1949 }
1950 CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
1951 CatalogItem::Source(i) => {
1952 let mut i = i.clone();
1953 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1954 Ok(CatalogItem::Source(i))
1955 }
1956 CatalogItem::Sink(i) => {
1957 let mut i = i.clone();
1958 i.create_sql = do_rewrite(i.create_sql)?;
1959 Ok(CatalogItem::Sink(i))
1960 }
1961 CatalogItem::View(i) => {
1962 let mut i = i.clone();
1963 i.create_sql = do_rewrite(i.create_sql)?;
1964 Ok(CatalogItem::View(i))
1965 }
1966 CatalogItem::MaterializedView(i) => {
1967 let mut i = i.clone();
1968 i.create_sql = do_rewrite(i.create_sql)?;
1969 Ok(CatalogItem::MaterializedView(i))
1970 }
1971 CatalogItem::Index(i) => {
1972 let mut i = i.clone();
1973 i.create_sql = do_rewrite(i.create_sql)?;
1974 Ok(CatalogItem::Index(i))
1975 }
1976 CatalogItem::Secret(i) => {
1977 let mut i = i.clone();
1978 i.create_sql = do_rewrite(i.create_sql)?;
1979 Ok(CatalogItem::Secret(i))
1980 }
1981 CatalogItem::Func(_) | CatalogItem::Type(_) => {
1982 unreachable!("{}s cannot be renamed", self.typ())
1983 }
1984 CatalogItem::Connection(i) => {
1985 let mut i = i.clone();
1986 i.create_sql = do_rewrite(i.create_sql)?;
1987 Ok(CatalogItem::Connection(i))
1988 }
1989 CatalogItem::ContinualTask(i) => {
1990 let mut i = i.clone();
1991 i.create_sql = do_rewrite(i.create_sql)?;
1992 Ok(CatalogItem::ContinualTask(i))
1993 }
1994 }
1995 }
1996
1997 pub fn update_retain_history(
2000 &mut self,
2001 value: Option<Value>,
2002 window: CompactionWindow,
2003 ) -> Result<Option<WithOptionValue<Raw>>, ()> {
2004 let update = |mut ast: &mut Statement<Raw>| {
2005 macro_rules! update_retain_history {
2007 ( $stmt:ident, $opt:ident, $name:ident ) => {{
2008 let pos = $stmt
2010 .with_options
2011 .iter()
2012 .rposition(|o| o.name == mz_sql_parser::ast::$name::RetainHistory);
2014 if let Some(value) = value {
2015 let next = mz_sql_parser::ast::$opt {
2016 name: mz_sql_parser::ast::$name::RetainHistory,
2017 value: Some(WithOptionValue::RetainHistoryFor(value)),
2018 };
2019 if let Some(idx) = pos {
2020 let previous = $stmt.with_options[idx].clone();
2021 $stmt.with_options[idx] = next;
2022 previous.value
2023 } else {
2024 $stmt.with_options.push(next);
2025 None
2026 }
2027 } else {
2028 if let Some(idx) = pos {
2029 $stmt.with_options.swap_remove(idx).value
2030 } else {
2031 None
2032 }
2033 }
2034 }};
2035 }
2036 let previous = match &mut ast {
2037 Statement::CreateTable(stmt) => {
2038 update_retain_history!(stmt, TableOption, TableOptionName)
2039 }
2040 Statement::CreateIndex(stmt) => {
2041 update_retain_history!(stmt, IndexOption, IndexOptionName)
2042 }
2043 Statement::CreateSource(stmt) => {
2044 update_retain_history!(stmt, CreateSourceOption, CreateSourceOptionName)
2045 }
2046 Statement::CreateMaterializedView(stmt) => {
2047 update_retain_history!(stmt, MaterializedViewOption, MaterializedViewOptionName)
2048 }
2049 _ => {
2050 return Err(());
2051 }
2052 };
2053 Ok(previous)
2054 };
2055
2056 let res = self.update_sql(update)?;
2057 let cw = self
2058 .custom_logical_compaction_window_mut()
2059 .expect("item must have compaction window");
2060 *cw = Some(window);
2061 Ok(res)
2062 }
2063
2064 pub fn add_column(
2065 &mut self,
2066 name: ColumnName,
2067 typ: SqlColumnType,
2068 sql: RawDataType,
2069 ) -> Result<RelationVersion, PlanError> {
2070 let CatalogItem::Table(table) = self else {
2071 return Err(PlanError::Unsupported {
2072 feature: "adding columns to a non-Table".to_string(),
2073 discussion_no: None,
2074 });
2075 };
2076 let next_version = table.desc.add_column(name.clone(), typ);
2077
2078 let update = |mut ast: &mut Statement<Raw>| match &mut ast {
2079 Statement::CreateTable(stmt) => {
2080 let version = ColumnOptionDef {
2081 name: None,
2082 option: ColumnOption::Versioned {
2083 action: ColumnVersioned::Added,
2084 version: next_version.into(),
2085 },
2086 };
2087 let column = ColumnDef {
2088 name: name.into(),
2089 data_type: sql,
2090 collation: None,
2091 options: vec![version],
2092 };
2093 stmt.columns.push(column);
2094 Ok(())
2095 }
2096 _ => Err(()),
2097 };
2098
2099 self.update_sql(update)
2100 .map_err(|()| PlanError::Unstructured("expected CREATE TABLE statement".to_string()))?;
2101 Ok(next_version)
2102 }
2103
2104 pub fn update_sql<F, T>(&mut self, f: F) -> Result<T, ()>
2107 where
2108 F: FnOnce(&mut Statement<Raw>) -> Result<T, ()>,
2109 {
2110 let create_sql = match self {
2111 CatalogItem::Table(Table { create_sql, .. })
2112 | CatalogItem::Type(Type { create_sql, .. })
2113 | CatalogItem::Source(Source { create_sql, .. }) => create_sql.as_mut(),
2114 CatalogItem::Sink(Sink { create_sql, .. })
2115 | CatalogItem::View(View { create_sql, .. })
2116 | CatalogItem::MaterializedView(MaterializedView { create_sql, .. })
2117 | CatalogItem::Index(Index { create_sql, .. })
2118 | CatalogItem::Secret(Secret { create_sql, .. })
2119 | CatalogItem::Connection(Connection { create_sql, .. })
2120 | CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => Some(create_sql),
2121 CatalogItem::Func(_) | CatalogItem::Log(_) => None,
2122 };
2123 let Some(create_sql) = create_sql else {
2124 return Err(());
2125 };
2126 let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
2127 .expect("non-system items must be parseable")
2128 .into_element()
2129 .ast;
2130 debug!("rewrite: {}", ast.to_ast_string_redacted());
2131 let t = f(&mut ast)?;
2132 *create_sql = ast.to_ast_string_stable();
2133 debug!("rewrote: {}", ast.to_ast_string_redacted());
2134 Ok(t)
2135 }
2136
2137 pub fn is_compute_object_on_cluster(&self) -> Option<ClusterId> {
2144 match self {
2145 CatalogItem::Index(index) => Some(index.cluster_id),
2146 CatalogItem::Table(_)
2147 | CatalogItem::Source(_)
2148 | CatalogItem::Log(_)
2149 | CatalogItem::View(_)
2150 | CatalogItem::MaterializedView(_)
2151 | CatalogItem::Sink(_)
2152 | CatalogItem::Type(_)
2153 | CatalogItem::Func(_)
2154 | CatalogItem::Secret(_)
2155 | CatalogItem::Connection(_)
2156 | CatalogItem::ContinualTask(_) => None,
2157 }
2158 }
2159
2160 pub fn cluster_id(&self) -> Option<ClusterId> {
2161 match self {
2162 CatalogItem::MaterializedView(mv) => Some(mv.cluster_id),
2163 CatalogItem::Index(index) => Some(index.cluster_id),
2164 CatalogItem::Source(source) => match &source.data_source {
2165 DataSourceDesc::Ingestion { cluster_id, .. }
2166 | DataSourceDesc::OldSyntaxIngestion { cluster_id, .. } => Some(*cluster_id),
2167 DataSourceDesc::IngestionExport { .. } => None,
2171 DataSourceDesc::Webhook { cluster_id, .. } => Some(*cluster_id),
2172 DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => None,
2173 },
2174 CatalogItem::Sink(sink) => Some(sink.cluster_id),
2175 CatalogItem::ContinualTask(ct) => Some(ct.cluster_id),
2176 CatalogItem::Table(_)
2177 | CatalogItem::Log(_)
2178 | CatalogItem::View(_)
2179 | CatalogItem::Type(_)
2180 | CatalogItem::Func(_)
2181 | CatalogItem::Secret(_)
2182 | CatalogItem::Connection(_) => None,
2183 }
2184 }
2185
2186 pub fn custom_logical_compaction_window(&self) -> Option<CompactionWindow> {
2189 match self {
2190 CatalogItem::Table(table) => table.custom_logical_compaction_window,
2191 CatalogItem::Source(source) => source.custom_logical_compaction_window,
2192 CatalogItem::Index(index) => index.custom_logical_compaction_window,
2193 CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
2194 CatalogItem::Log(_)
2195 | CatalogItem::View(_)
2196 | CatalogItem::Sink(_)
2197 | CatalogItem::Type(_)
2198 | CatalogItem::Func(_)
2199 | CatalogItem::Secret(_)
2200 | CatalogItem::Connection(_)
2201 | CatalogItem::ContinualTask(_) => None,
2202 }
2203 }
2204
2205 pub fn custom_logical_compaction_window_mut(
2209 &mut self,
2210 ) -> Option<&mut Option<CompactionWindow>> {
2211 let cw = match self {
2212 CatalogItem::Table(table) => &mut table.custom_logical_compaction_window,
2213 CatalogItem::Source(source) => &mut source.custom_logical_compaction_window,
2214 CatalogItem::Index(index) => &mut index.custom_logical_compaction_window,
2215 CatalogItem::MaterializedView(mview) => &mut mview.custom_logical_compaction_window,
2216 CatalogItem::Log(_)
2217 | CatalogItem::View(_)
2218 | CatalogItem::Sink(_)
2219 | CatalogItem::Type(_)
2220 | CatalogItem::Func(_)
2221 | CatalogItem::Secret(_)
2222 | CatalogItem::Connection(_)
2223 | CatalogItem::ContinualTask(_) => return None,
2224 };
2225 Some(cw)
2226 }
2227
2228 pub fn initial_logical_compaction_window(&self) -> Option<CompactionWindow> {
2236 let custom_logical_compaction_window = match self {
2237 CatalogItem::Table(_)
2238 | CatalogItem::Source(_)
2239 | CatalogItem::Index(_)
2240 | CatalogItem::MaterializedView(_)
2241 | CatalogItem::ContinualTask(_) => self.custom_logical_compaction_window(),
2242 CatalogItem::Log(_)
2243 | CatalogItem::View(_)
2244 | CatalogItem::Sink(_)
2245 | CatalogItem::Type(_)
2246 | CatalogItem::Func(_)
2247 | CatalogItem::Secret(_)
2248 | CatalogItem::Connection(_) => return None,
2249 };
2250 Some(custom_logical_compaction_window.unwrap_or(CompactionWindow::Default))
2251 }
2252
2253 pub fn is_retained_metrics_object(&self) -> bool {
2257 match self {
2258 CatalogItem::Table(table) => table.is_retained_metrics_object,
2259 CatalogItem::Source(source) => source.is_retained_metrics_object,
2260 CatalogItem::Index(index) => index.is_retained_metrics_object,
2261 CatalogItem::Log(_)
2262 | CatalogItem::View(_)
2263 | CatalogItem::MaterializedView(_)
2264 | CatalogItem::Sink(_)
2265 | CatalogItem::Type(_)
2266 | CatalogItem::Func(_)
2267 | CatalogItem::Secret(_)
2268 | CatalogItem::Connection(_)
2269 | CatalogItem::ContinualTask(_) => false,
2270 }
2271 }
2272
2273 pub fn to_serialized(&self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2274 match self {
2275 CatalogItem::Table(table) => {
2276 let create_sql = table
2277 .create_sql
2278 .clone()
2279 .expect("builtin tables cannot be serialized");
2280 let mut collections = table.collections.clone();
2281 let global_id = collections
2282 .remove(&RelationVersion::root())
2283 .expect("at least one version");
2284 (create_sql, global_id, collections)
2285 }
2286 CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2287 CatalogItem::Source(source) => {
2288 assert!(
2289 !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2290 "cannot serialize introspection/builtin sources",
2291 );
2292 let create_sql = source
2293 .create_sql
2294 .clone()
2295 .expect("builtin sources cannot be serialized");
2296 (create_sql, source.global_id, BTreeMap::new())
2297 }
2298 CatalogItem::View(view) => (view.create_sql.clone(), view.global_id, BTreeMap::new()),
2299 CatalogItem::MaterializedView(mview) => {
2300 (mview.create_sql.clone(), mview.global_id, BTreeMap::new())
2301 }
2302 CatalogItem::Index(index) => {
2303 (index.create_sql.clone(), index.global_id, BTreeMap::new())
2304 }
2305 CatalogItem::Sink(sink) => (sink.create_sql.clone(), sink.global_id, BTreeMap::new()),
2306 CatalogItem::Type(typ) => {
2307 let create_sql = typ
2308 .create_sql
2309 .clone()
2310 .expect("builtin types cannot be serialized");
2311 (create_sql, typ.global_id, BTreeMap::new())
2312 }
2313 CatalogItem::Secret(secret) => {
2314 (secret.create_sql.clone(), secret.global_id, BTreeMap::new())
2315 }
2316 CatalogItem::Connection(connection) => (
2317 connection.create_sql.clone(),
2318 connection.global_id,
2319 BTreeMap::new(),
2320 ),
2321 CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2322 CatalogItem::ContinualTask(ct) => {
2323 (ct.create_sql.clone(), ct.global_id, BTreeMap::new())
2324 }
2325 }
2326 }
2327
2328 pub fn into_serialized(self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2329 match self {
2330 CatalogItem::Table(mut table) => {
2331 let create_sql = table
2332 .create_sql
2333 .expect("builtin tables cannot be serialized");
2334 let global_id = table
2335 .collections
2336 .remove(&RelationVersion::root())
2337 .expect("at least one version");
2338 (create_sql, global_id, table.collections)
2339 }
2340 CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2341 CatalogItem::Source(source) => {
2342 assert!(
2343 !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2344 "cannot serialize introspection/builtin sources",
2345 );
2346 let create_sql = source
2347 .create_sql
2348 .expect("builtin sources cannot be serialized");
2349 (create_sql, source.global_id, BTreeMap::new())
2350 }
2351 CatalogItem::View(view) => (view.create_sql, view.global_id, BTreeMap::new()),
2352 CatalogItem::MaterializedView(mview) => {
2353 (mview.create_sql, mview.global_id, BTreeMap::new())
2354 }
2355 CatalogItem::Index(index) => (index.create_sql, index.global_id, BTreeMap::new()),
2356 CatalogItem::Sink(sink) => (sink.create_sql, sink.global_id, BTreeMap::new()),
2357 CatalogItem::Type(typ) => {
2358 let create_sql = typ.create_sql.expect("builtin types cannot be serialized");
2359 (create_sql, typ.global_id, BTreeMap::new())
2360 }
2361 CatalogItem::Secret(secret) => (secret.create_sql, secret.global_id, BTreeMap::new()),
2362 CatalogItem::Connection(connection) => {
2363 (connection.create_sql, connection.global_id, BTreeMap::new())
2364 }
2365 CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2366 CatalogItem::ContinualTask(ct) => (ct.create_sql, ct.global_id, BTreeMap::new()),
2367 }
2368 }
2369}
2370
2371impl CatalogEntry {
2372 pub fn desc_latest(
2377 &self,
2378 name: &FullItemName,
2379 ) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
2380 self.item.desc(name, RelationVersionSelector::Latest)
2381 }
2382
2383 pub fn desc_opt_latest(&self) -> Option<Cow<'_, RelationDesc>> {
2386 self.item.desc_opt(RelationVersionSelector::Latest)
2387 }
2388
2389 pub fn has_columns(&self) -> bool {
2391 self.desc_opt_latest().is_some()
2392 }
2393
2394 pub fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
2396 self.item.func(self)
2397 }
2398
2399 pub fn index(&self) -> Option<&Index> {
2401 match self.item() {
2402 CatalogItem::Index(idx) => Some(idx),
2403 _ => None,
2404 }
2405 }
2406
2407 pub fn materialized_view(&self) -> Option<&MaterializedView> {
2409 match self.item() {
2410 CatalogItem::MaterializedView(mv) => Some(mv),
2411 _ => None,
2412 }
2413 }
2414
2415 pub fn table(&self) -> Option<&Table> {
2417 match self.item() {
2418 CatalogItem::Table(tbl) => Some(tbl),
2419 _ => None,
2420 }
2421 }
2422
2423 pub fn source(&self) -> Option<&Source> {
2425 match self.item() {
2426 CatalogItem::Source(src) => Some(src),
2427 _ => None,
2428 }
2429 }
2430
2431 pub fn sink(&self) -> Option<&Sink> {
2433 match self.item() {
2434 CatalogItem::Sink(sink) => Some(sink),
2435 _ => None,
2436 }
2437 }
2438
2439 pub fn secret(&self) -> Option<&Secret> {
2441 match self.item() {
2442 CatalogItem::Secret(secret) => Some(secret),
2443 _ => None,
2444 }
2445 }
2446
2447 pub fn connection(&self) -> Result<&Connection, SqlCatalogError> {
2448 match self.item() {
2449 CatalogItem::Connection(connection) => Ok(connection),
2450 _ => {
2451 let db_name = match self.name().qualifiers.database_spec {
2452 ResolvedDatabaseSpecifier::Ambient => "".to_string(),
2453 ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
2454 };
2455 Err(SqlCatalogError::UnknownConnection(format!(
2456 "{}{}.{}",
2457 db_name,
2458 self.name().qualifiers.schema_spec,
2459 self.name().item
2460 )))
2461 }
2462 }
2463 }
2464
2465 pub fn source_desc(
2468 &self,
2469 ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
2470 self.item.source_desc(self)
2471 }
2472
2473 pub fn is_connection(&self) -> bool {
2475 matches!(self.item(), CatalogItem::Connection(_))
2476 }
2477
2478 pub fn is_table(&self) -> bool {
2480 matches!(self.item(), CatalogItem::Table(_))
2481 }
2482
2483 pub fn is_source(&self) -> bool {
2486 matches!(self.item(), CatalogItem::Source(_))
2487 }
2488
2489 pub fn subsource_details(
2492 &self,
2493 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
2494 match &self.item() {
2495 CatalogItem::Source(source) => match &source.data_source {
2496 DataSourceDesc::IngestionExport {
2497 ingestion_id,
2498 external_reference,
2499 details,
2500 data_config: _,
2501 } => Some((*ingestion_id, external_reference, details)),
2502 _ => None,
2503 },
2504 _ => None,
2505 }
2506 }
2507
2508 pub fn source_export_details(
2511 &self,
2512 ) -> Option<(
2513 CatalogItemId,
2514 &UnresolvedItemName,
2515 &SourceExportDetails,
2516 &SourceExportDataConfig<ReferencedConnection>,
2517 )> {
2518 match &self.item() {
2519 CatalogItem::Source(source) => match &source.data_source {
2520 DataSourceDesc::IngestionExport {
2521 ingestion_id,
2522 external_reference,
2523 details,
2524 data_config,
2525 } => Some((*ingestion_id, external_reference, details, data_config)),
2526 _ => None,
2527 },
2528 CatalogItem::Table(table) => match &table.data_source {
2529 TableDataSource::DataSource {
2530 desc:
2531 DataSourceDesc::IngestionExport {
2532 ingestion_id,
2533 external_reference,
2534 details,
2535 data_config,
2536 },
2537 timeline: _,
2538 } => Some((*ingestion_id, external_reference, details, data_config)),
2539 _ => None,
2540 },
2541 _ => None,
2542 }
2543 }
2544
2545 pub fn is_progress_source(&self) -> bool {
2547 self.item().is_progress_source()
2548 }
2549
2550 pub fn progress_id(&self) -> Option<CatalogItemId> {
2552 match &self.item() {
2553 CatalogItem::Source(source) => match &source.data_source {
2554 DataSourceDesc::Ingestion { .. } => Some(self.id),
2555 DataSourceDesc::OldSyntaxIngestion {
2556 progress_subsource, ..
2557 } => Some(*progress_subsource),
2558 DataSourceDesc::IngestionExport { .. }
2559 | DataSourceDesc::Introspection(_)
2560 | DataSourceDesc::Progress
2561 | DataSourceDesc::Webhook { .. } => None,
2562 },
2563 CatalogItem::Table(_)
2564 | CatalogItem::Log(_)
2565 | CatalogItem::View(_)
2566 | CatalogItem::MaterializedView(_)
2567 | CatalogItem::Sink(_)
2568 | CatalogItem::Index(_)
2569 | CatalogItem::Type(_)
2570 | CatalogItem::Func(_)
2571 | CatalogItem::Secret(_)
2572 | CatalogItem::Connection(_)
2573 | CatalogItem::ContinualTask(_) => None,
2574 }
2575 }
2576
2577 pub fn is_sink(&self) -> bool {
2579 matches!(self.item(), CatalogItem::Sink(_))
2580 }
2581
2582 pub fn is_materialized_view(&self) -> bool {
2584 matches!(self.item(), CatalogItem::MaterializedView(_))
2585 }
2586
2587 pub fn is_view(&self) -> bool {
2589 matches!(self.item(), CatalogItem::View(_))
2590 }
2591
2592 pub fn is_secret(&self) -> bool {
2594 matches!(self.item(), CatalogItem::Secret(_))
2595 }
2596
2597 pub fn is_introspection_source(&self) -> bool {
2599 matches!(self.item(), CatalogItem::Log(_))
2600 }
2601
2602 pub fn is_index(&self) -> bool {
2604 matches!(self.item(), CatalogItem::Index(_))
2605 }
2606
2607 pub fn is_continual_task(&self) -> bool {
2609 matches!(self.item(), CatalogItem::ContinualTask(_))
2610 }
2611
2612 pub fn is_relation(&self) -> bool {
2614 mz_sql::catalog::ObjectType::from(self.item_type()).is_relation()
2615 }
2616
2617 pub fn references(&self) -> &ResolvedIds {
2620 self.item.references()
2621 }
2622
2623 pub fn uses(&self) -> BTreeSet<CatalogItemId> {
2629 self.item.uses()
2630 }
2631
2632 pub fn item(&self) -> &CatalogItem {
2634 &self.item
2635 }
2636
2637 pub fn id(&self) -> CatalogItemId {
2639 self.id
2640 }
2641
2642 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2644 self.item().global_ids()
2645 }
2646
2647 pub fn latest_global_id(&self) -> GlobalId {
2648 self.item().latest_global_id()
2649 }
2650
2651 pub fn oid(&self) -> u32 {
2653 self.oid
2654 }
2655
2656 pub fn name(&self) -> &QualifiedItemName {
2658 &self.name
2659 }
2660
2661 pub fn referenced_by(&self) -> &[CatalogItemId] {
2663 &self.referenced_by
2664 }
2665
2666 pub fn used_by(&self) -> &[CatalogItemId] {
2668 &self.used_by
2669 }
2670
2671 pub fn conn_id(&self) -> Option<&ConnectionId> {
2674 self.item.conn_id()
2675 }
2676
2677 pub fn owner_id(&self) -> &RoleId {
2679 &self.owner_id
2680 }
2681
2682 pub fn privileges(&self) -> &PrivilegeMap {
2684 &self.privileges
2685 }
2686}
2687
2688#[derive(Debug, Clone, Default)]
2689pub struct CommentsMap {
2690 map: BTreeMap<CommentObjectId, BTreeMap<Option<usize>, String>>,
2691}
2692
2693impl CommentsMap {
2694 pub fn update_comment(
2695 &mut self,
2696 object_id: CommentObjectId,
2697 sub_component: Option<usize>,
2698 comment: Option<String>,
2699 ) -> Option<String> {
2700 let object_comments = self.map.entry(object_id).or_default();
2701
2702 let (empty, prev) = if let Some(comment) = comment {
2704 let prev = object_comments.insert(sub_component, comment);
2705 (false, prev)
2706 } else {
2707 let prev = object_comments.remove(&sub_component);
2708 (object_comments.is_empty(), prev)
2709 };
2710
2711 if empty {
2713 self.map.remove(&object_id);
2714 }
2715
2716 prev
2718 }
2719
2720 pub fn drop_comments(
2726 &mut self,
2727 object_ids: &BTreeSet<CommentObjectId>,
2728 ) -> Vec<(CommentObjectId, Option<usize>, String)> {
2729 let mut removed_comments = Vec::new();
2730
2731 for object_id in object_ids {
2732 if let Some(comments) = self.map.remove(object_id) {
2733 let removed = comments
2734 .into_iter()
2735 .map(|(sub_comp, comment)| (object_id.clone(), sub_comp, comment));
2736 removed_comments.extend(removed);
2737 }
2738 }
2739
2740 removed_comments
2741 }
2742
2743 pub fn iter(&self) -> impl Iterator<Item = (CommentObjectId, Option<usize>, &str)> {
2744 self.map
2745 .iter()
2746 .map(|(id, comments)| {
2747 comments
2748 .iter()
2749 .map(|(pos, comment)| (*id, *pos, comment.as_str()))
2750 })
2751 .flatten()
2752 }
2753
2754 pub fn get_object_comments(
2755 &self,
2756 object_id: CommentObjectId,
2757 ) -> Option<&BTreeMap<Option<usize>, String>> {
2758 self.map.get(&object_id)
2759 }
2760}
2761
2762impl Serialize for CommentsMap {
2763 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2764 where
2765 S: serde::Serializer,
2766 {
2767 let comment_count = self
2768 .map
2769 .iter()
2770 .map(|(_object_id, comments)| comments.len())
2771 .sum();
2772
2773 let mut seq = serializer.serialize_seq(Some(comment_count))?;
2774 for (object_id, sub) in &self.map {
2775 for (sub_component, comment) in sub {
2776 seq.serialize_element(&(
2777 format!("{object_id:?}"),
2778 format!("{sub_component:?}"),
2779 comment,
2780 ))?;
2781 }
2782 }
2783 seq.end()
2784 }
2785}
2786
2787#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
2788pub struct DefaultPrivileges {
2789 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
2790 privileges: BTreeMap<DefaultPrivilegeObject, RoleDefaultPrivileges>,
2791}
2792
2793#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
2796struct RoleDefaultPrivileges(
2797 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
2799 BTreeMap<RoleId, DefaultPrivilegeAclItem>,
2800);
2801
2802impl Deref for RoleDefaultPrivileges {
2803 type Target = BTreeMap<RoleId, DefaultPrivilegeAclItem>;
2804
2805 fn deref(&self) -> &Self::Target {
2806 &self.0
2807 }
2808}
2809
2810impl DerefMut for RoleDefaultPrivileges {
2811 fn deref_mut(&mut self) -> &mut Self::Target {
2812 &mut self.0
2813 }
2814}
2815
2816impl DefaultPrivileges {
2817 pub fn grant(&mut self, object: DefaultPrivilegeObject, privilege: DefaultPrivilegeAclItem) {
2819 if privilege.acl_mode.is_empty() {
2820 return;
2821 }
2822
2823 let privileges = self.privileges.entry(object).or_default();
2824 if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
2825 default_privilege.acl_mode |= privilege.acl_mode;
2826 } else {
2827 privileges.insert(privilege.grantee, privilege);
2828 }
2829 }
2830
2831 pub fn revoke(&mut self, object: &DefaultPrivilegeObject, privilege: &DefaultPrivilegeAclItem) {
2833 if let Some(privileges) = self.privileges.get_mut(object) {
2834 if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
2835 default_privilege.acl_mode =
2836 default_privilege.acl_mode.difference(privilege.acl_mode);
2837 if default_privilege.acl_mode.is_empty() {
2838 privileges.remove(&privilege.grantee);
2839 }
2840 }
2841 if privileges.is_empty() {
2842 self.privileges.remove(object);
2843 }
2844 }
2845 }
2846
2847 pub fn get_privileges_for_grantee(
2850 &self,
2851 object: &DefaultPrivilegeObject,
2852 grantee: &RoleId,
2853 ) -> Option<&AclMode> {
2854 self.privileges
2855 .get(object)
2856 .and_then(|privileges| privileges.get(grantee))
2857 .map(|privilege| &privilege.acl_mode)
2858 }
2859
2860 pub fn get_applicable_privileges(
2862 &self,
2863 role_id: RoleId,
2864 database_id: Option<DatabaseId>,
2865 schema_id: Option<SchemaId>,
2866 object_type: mz_sql::catalog::ObjectType,
2867 ) -> impl Iterator<Item = DefaultPrivilegeAclItem> + '_ {
2868 let privilege_object_type = if object_type.is_relation() {
2872 mz_sql::catalog::ObjectType::Table
2873 } else {
2874 object_type
2875 };
2876 let valid_acl_mode = rbac::all_object_privileges(SystemObjectType::Object(object_type));
2877
2878 [
2882 DefaultPrivilegeObject {
2883 role_id,
2884 database_id,
2885 schema_id,
2886 object_type: privilege_object_type,
2887 },
2888 DefaultPrivilegeObject {
2889 role_id,
2890 database_id,
2891 schema_id: None,
2892 object_type: privilege_object_type,
2893 },
2894 DefaultPrivilegeObject {
2895 role_id,
2896 database_id: None,
2897 schema_id: None,
2898 object_type: privilege_object_type,
2899 },
2900 DefaultPrivilegeObject {
2901 role_id: RoleId::Public,
2902 database_id,
2903 schema_id,
2904 object_type: privilege_object_type,
2905 },
2906 DefaultPrivilegeObject {
2907 role_id: RoleId::Public,
2908 database_id,
2909 schema_id: None,
2910 object_type: privilege_object_type,
2911 },
2912 DefaultPrivilegeObject {
2913 role_id: RoleId::Public,
2914 database_id: None,
2915 schema_id: None,
2916 object_type: privilege_object_type,
2917 },
2918 ]
2919 .into_iter()
2920 .filter_map(|object| self.privileges.get(&object))
2921 .flat_map(|acl_map| acl_map.values())
2922 .fold(
2924 BTreeMap::new(),
2925 |mut accum, DefaultPrivilegeAclItem { grantee, acl_mode }| {
2926 let accum_acl_mode = accum.entry(grantee).or_insert_with(AclMode::empty);
2927 *accum_acl_mode |= *acl_mode;
2928 accum
2929 },
2930 )
2931 .into_iter()
2932 .map(move |(grantee, acl_mode)| (grantee, acl_mode & valid_acl_mode))
2937 .filter(|(_, acl_mode)| !acl_mode.is_empty())
2939 .map(|(grantee, acl_mode)| DefaultPrivilegeAclItem {
2940 grantee: *grantee,
2941 acl_mode,
2942 })
2943 }
2944
2945 pub fn iter(
2946 &self,
2947 ) -> impl Iterator<
2948 Item = (
2949 &DefaultPrivilegeObject,
2950 impl Iterator<Item = &DefaultPrivilegeAclItem>,
2951 ),
2952 > {
2953 self.privileges
2954 .iter()
2955 .map(|(object, acl_map)| (object, acl_map.values()))
2956 }
2957}
2958
2959#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
2960pub struct ClusterConfig {
2961 pub variant: ClusterVariant,
2962 pub workload_class: Option<String>,
2963}
2964
2965impl ClusterConfig {
2966 pub fn features(&self) -> Option<&OptimizerFeatureOverrides> {
2967 match &self.variant {
2968 ClusterVariant::Managed(managed) => Some(&managed.optimizer_feature_overrides),
2969 ClusterVariant::Unmanaged => None,
2970 }
2971 }
2972}
2973
2974impl From<ClusterConfig> for durable::ClusterConfig {
2975 fn from(config: ClusterConfig) -> Self {
2976 Self {
2977 variant: config.variant.into(),
2978 workload_class: config.workload_class,
2979 }
2980 }
2981}
2982
2983impl From<durable::ClusterConfig> for ClusterConfig {
2984 fn from(config: durable::ClusterConfig) -> Self {
2985 Self {
2986 variant: config.variant.into(),
2987 workload_class: config.workload_class,
2988 }
2989 }
2990}
2991
2992#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
2993pub struct ClusterVariantManaged {
2994 pub size: String,
2995 pub availability_zones: Vec<String>,
2996 pub logging: ReplicaLogging,
2997 pub replication_factor: u32,
2998 pub optimizer_feature_overrides: OptimizerFeatureOverrides,
2999 pub schedule: ClusterSchedule,
3000}
3001
3002impl From<ClusterVariantManaged> for durable::ClusterVariantManaged {
3003 fn from(managed: ClusterVariantManaged) -> Self {
3004 Self {
3005 size: managed.size,
3006 availability_zones: managed.availability_zones,
3007 logging: managed.logging,
3008 replication_factor: managed.replication_factor,
3009 optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3010 schedule: managed.schedule,
3011 }
3012 }
3013}
3014
3015impl From<durable::ClusterVariantManaged> for ClusterVariantManaged {
3016 fn from(managed: durable::ClusterVariantManaged) -> Self {
3017 Self {
3018 size: managed.size,
3019 availability_zones: managed.availability_zones,
3020 logging: managed.logging,
3021 replication_factor: managed.replication_factor,
3022 optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
3023 schedule: managed.schedule,
3024 }
3025 }
3026}
3027
3028#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
3029pub enum ClusterVariant {
3030 Managed(ClusterVariantManaged),
3031 Unmanaged,
3032}
3033
3034impl From<ClusterVariant> for durable::ClusterVariant {
3035 fn from(variant: ClusterVariant) -> Self {
3036 match variant {
3037 ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3038 ClusterVariant::Unmanaged => Self::Unmanaged,
3039 }
3040 }
3041}
3042
3043impl From<durable::ClusterVariant> for ClusterVariant {
3044 fn from(variant: durable::ClusterVariant) -> Self {
3045 match variant {
3046 durable::ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3047 durable::ClusterVariant::Unmanaged => Self::Unmanaged,
3048 }
3049 }
3050}
3051
3052impl mz_sql::catalog::CatalogDatabase for Database {
3053 fn name(&self) -> &str {
3054 &self.name
3055 }
3056
3057 fn id(&self) -> DatabaseId {
3058 self.id
3059 }
3060
3061 fn has_schemas(&self) -> bool {
3062 !self.schemas_by_name.is_empty()
3063 }
3064
3065 fn schema_ids(&self) -> &BTreeMap<String, SchemaId> {
3066 &self.schemas_by_name
3067 }
3068
3069 #[allow(clippy::as_conversions)]
3071 fn schemas(&self) -> Vec<&dyn CatalogSchema> {
3072 self.schemas_by_id
3073 .values()
3074 .map(|schema| schema as &dyn CatalogSchema)
3075 .collect()
3076 }
3077
3078 fn owner_id(&self) -> RoleId {
3079 self.owner_id
3080 }
3081
3082 fn privileges(&self) -> &PrivilegeMap {
3083 &self.privileges
3084 }
3085}
3086
3087impl mz_sql::catalog::CatalogSchema for Schema {
3088 fn database(&self) -> &ResolvedDatabaseSpecifier {
3089 &self.name.database
3090 }
3091
3092 fn name(&self) -> &QualifiedSchemaName {
3093 &self.name
3094 }
3095
3096 fn id(&self) -> &SchemaSpecifier {
3097 &self.id
3098 }
3099
3100 fn has_items(&self) -> bool {
3101 !self.items.is_empty()
3102 }
3103
3104 fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_> {
3105 Box::new(
3106 self.items
3107 .values()
3108 .chain(self.functions.values())
3109 .chain(self.types.values())
3110 .copied(),
3111 )
3112 }
3113
3114 fn owner_id(&self) -> RoleId {
3115 self.owner_id
3116 }
3117
3118 fn privileges(&self) -> &PrivilegeMap {
3119 &self.privileges
3120 }
3121}
3122
3123impl mz_sql::catalog::CatalogRole for Role {
3124 fn name(&self) -> &str {
3125 &self.name
3126 }
3127
3128 fn id(&self) -> RoleId {
3129 self.id
3130 }
3131
3132 fn membership(&self) -> &BTreeMap<RoleId, RoleId> {
3133 &self.membership.map
3134 }
3135
3136 fn attributes(&self) -> &RoleAttributes {
3137 &self.attributes
3138 }
3139
3140 fn vars(&self) -> &BTreeMap<String, OwnedVarInput> {
3141 &self.vars.map
3142 }
3143}
3144
3145impl mz_sql::catalog::CatalogNetworkPolicy for NetworkPolicy {
3146 fn name(&self) -> &str {
3147 &self.name
3148 }
3149
3150 fn id(&self) -> NetworkPolicyId {
3151 self.id
3152 }
3153
3154 fn owner_id(&self) -> RoleId {
3155 self.owner_id
3156 }
3157
3158 fn privileges(&self) -> &PrivilegeMap {
3159 &self.privileges
3160 }
3161}
3162
3163impl mz_sql::catalog::CatalogCluster<'_> for Cluster {
3164 fn name(&self) -> &str {
3165 &self.name
3166 }
3167
3168 fn id(&self) -> ClusterId {
3169 self.id
3170 }
3171
3172 fn bound_objects(&self) -> &BTreeSet<CatalogItemId> {
3173 &self.bound_objects
3174 }
3175
3176 fn replica_ids(&self) -> &BTreeMap<String, ReplicaId> {
3177 &self.replica_id_by_name_
3178 }
3179
3180 #[allow(clippy::as_conversions)]
3182 fn replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>> {
3183 self.replicas()
3184 .map(|replica| replica as &dyn CatalogClusterReplica)
3185 .collect()
3186 }
3187
3188 fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica<'_> {
3189 self.replica(id).expect("catalog out of sync")
3190 }
3191
3192 fn owner_id(&self) -> RoleId {
3193 self.owner_id
3194 }
3195
3196 fn privileges(&self) -> &PrivilegeMap {
3197 &self.privileges
3198 }
3199
3200 fn is_managed(&self) -> bool {
3201 self.is_managed()
3202 }
3203
3204 fn managed_size(&self) -> Option<&str> {
3205 match &self.config.variant {
3206 ClusterVariant::Managed(ClusterVariantManaged { size, .. }) => Some(size),
3207 _ => None,
3208 }
3209 }
3210
3211 fn schedule(&self) -> Option<&ClusterSchedule> {
3212 match &self.config.variant {
3213 ClusterVariant::Managed(ClusterVariantManaged { schedule, .. }) => Some(schedule),
3214 _ => None,
3215 }
3216 }
3217
3218 fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
3219 self.try_to_plan()
3220 }
3221}
3222
3223impl mz_sql::catalog::CatalogClusterReplica<'_> for ClusterReplica {
3224 fn name(&self) -> &str {
3225 &self.name
3226 }
3227
3228 fn cluster_id(&self) -> ClusterId {
3229 self.cluster_id
3230 }
3231
3232 fn replica_id(&self) -> ReplicaId {
3233 self.replica_id
3234 }
3235
3236 fn owner_id(&self) -> RoleId {
3237 self.owner_id
3238 }
3239
3240 fn internal(&self) -> bool {
3241 self.config.location.internal()
3242 }
3243}
3244
3245impl mz_sql::catalog::CatalogItem for CatalogEntry {
3246 fn name(&self) -> &QualifiedItemName {
3247 self.name()
3248 }
3249
3250 fn id(&self) -> CatalogItemId {
3251 self.id()
3252 }
3253
3254 fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
3255 Box::new(self.global_ids())
3256 }
3257
3258 fn oid(&self) -> u32 {
3259 self.oid()
3260 }
3261
3262 fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
3263 self.func()
3264 }
3265
3266 fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
3267 self.source_desc()
3268 }
3269
3270 fn connection(
3271 &self,
3272 ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
3273 {
3274 Ok(self.connection()?.details.to_connection())
3275 }
3276
3277 fn create_sql(&self) -> &str {
3278 match self.item() {
3279 CatalogItem::Table(Table { create_sql, .. }) => {
3280 create_sql.as_deref().unwrap_or("<builtin>")
3281 }
3282 CatalogItem::Source(Source { create_sql, .. }) => {
3283 create_sql.as_deref().unwrap_or("<builtin>")
3284 }
3285 CatalogItem::Sink(Sink { create_sql, .. }) => create_sql,
3286 CatalogItem::View(View { create_sql, .. }) => create_sql,
3287 CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) => create_sql,
3288 CatalogItem::Index(Index { create_sql, .. }) => create_sql,
3289 CatalogItem::Type(Type { create_sql, .. }) => {
3290 create_sql.as_deref().unwrap_or("<builtin>")
3291 }
3292 CatalogItem::Secret(Secret { create_sql, .. }) => create_sql,
3293 CatalogItem::Connection(Connection { create_sql, .. }) => create_sql,
3294 CatalogItem::Func(_) => "<builtin>",
3295 CatalogItem::Log(_) => "<builtin>",
3296 CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => create_sql,
3297 }
3298 }
3299
3300 fn item_type(&self) -> SqlCatalogItemType {
3301 self.item().typ()
3302 }
3303
3304 fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
3305 if let CatalogItem::Index(Index { keys, on, .. }) = self.item() {
3306 Some((keys, *on))
3307 } else {
3308 None
3309 }
3310 }
3311
3312 fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
3313 if let CatalogItem::Table(Table {
3314 data_source: TableDataSource::TableWrites { defaults },
3315 ..
3316 }) = self.item()
3317 {
3318 Some(defaults.as_slice())
3319 } else {
3320 None
3321 }
3322 }
3323
3324 fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
3325 if let CatalogItem::Type(Type { details, .. }) = self.item() {
3326 Some(details)
3327 } else {
3328 None
3329 }
3330 }
3331
3332 fn references(&self) -> &ResolvedIds {
3333 self.references()
3334 }
3335
3336 fn uses(&self) -> BTreeSet<CatalogItemId> {
3337 self.uses()
3338 }
3339
3340 fn referenced_by(&self) -> &[CatalogItemId] {
3341 self.referenced_by()
3342 }
3343
3344 fn used_by(&self) -> &[CatalogItemId] {
3345 self.used_by()
3346 }
3347
3348 fn subsource_details(
3349 &self,
3350 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
3351 self.subsource_details()
3352 }
3353
3354 fn source_export_details(
3355 &self,
3356 ) -> Option<(
3357 CatalogItemId,
3358 &UnresolvedItemName,
3359 &SourceExportDetails,
3360 &SourceExportDataConfig<ReferencedConnection>,
3361 )> {
3362 self.source_export_details()
3363 }
3364
3365 fn is_progress_source(&self) -> bool {
3366 self.is_progress_source()
3367 }
3368
3369 fn progress_id(&self) -> Option<CatalogItemId> {
3370 self.progress_id()
3371 }
3372
3373 fn owner_id(&self) -> RoleId {
3374 self.owner_id
3375 }
3376
3377 fn privileges(&self) -> &PrivilegeMap {
3378 &self.privileges
3379 }
3380
3381 fn cluster_id(&self) -> Option<ClusterId> {
3382 self.item().cluster_id()
3383 }
3384
3385 fn at_version(
3386 &self,
3387 version: RelationVersionSelector,
3388 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
3389 Box::new(CatalogCollectionEntry {
3390 entry: self.clone(),
3391 version,
3392 })
3393 }
3394
3395 fn latest_version(&self) -> Option<RelationVersion> {
3396 self.table().map(|t| t.desc.latest_version())
3397 }
3398}
3399
3400#[derive(Debug)]
3402pub struct StateUpdate {
3403 pub kind: StateUpdateKind,
3404 pub ts: Timestamp,
3405 pub diff: StateDiff,
3406}
3407
3408#[derive(Debug, Clone)]
3412pub enum StateUpdateKind {
3413 Role(durable::objects::Role),
3414 RoleAuth(durable::objects::RoleAuth),
3415 Database(durable::objects::Database),
3416 Schema(durable::objects::Schema),
3417 DefaultPrivilege(durable::objects::DefaultPrivilege),
3418 SystemPrivilege(MzAclItem),
3419 SystemConfiguration(durable::objects::SystemConfiguration),
3420 Cluster(durable::objects::Cluster),
3421 NetworkPolicy(durable::objects::NetworkPolicy),
3422 IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3423 ClusterReplica(durable::objects::ClusterReplica),
3424 SourceReferences(durable::objects::SourceReferences),
3425 SystemObjectMapping(durable::objects::SystemObjectMapping),
3426 TemporaryItem(TemporaryItem),
3429 Item(durable::objects::Item),
3430 Comment(durable::objects::Comment),
3431 AuditLog(durable::objects::AuditLog),
3432 StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3434 UnfinalizedShard(durable::objects::UnfinalizedShard),
3435}
3436
3437#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
3439pub enum StateDiff {
3440 Retraction,
3441 Addition,
3442}
3443
3444impl From<StateDiff> for Diff {
3445 fn from(diff: StateDiff) -> Self {
3446 match diff {
3447 StateDiff::Retraction => Diff::MINUS_ONE,
3448 StateDiff::Addition => Diff::ONE,
3449 }
3450 }
3451}
3452impl TryFrom<Diff> for StateDiff {
3453 type Error = String;
3454
3455 fn try_from(diff: Diff) -> Result<Self, Self::Error> {
3456 match diff {
3457 Diff::MINUS_ONE => Ok(Self::Retraction),
3458 Diff::ONE => Ok(Self::Addition),
3459 diff => Err(format!("invalid diff {diff}")),
3460 }
3461 }
3462}
3463
3464#[derive(Debug, Clone)]
3466pub struct TemporaryItem {
3467 pub id: CatalogItemId,
3468 pub oid: u32,
3469 pub name: QualifiedItemName,
3470 pub item: CatalogItem,
3471 pub owner_id: RoleId,
3472 pub privileges: PrivilegeMap,
3473}
3474
3475impl From<CatalogEntry> for TemporaryItem {
3476 fn from(entry: CatalogEntry) -> Self {
3477 TemporaryItem {
3478 id: entry.id,
3479 oid: entry.oid,
3480 name: entry.name,
3481 item: entry.item,
3482 owner_id: entry.owner_id,
3483 privileges: entry.privileges,
3484 }
3485 }
3486}
3487
3488#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
3490pub enum BootstrapStateUpdateKind {
3491 Role(durable::objects::Role),
3492 RoleAuth(durable::objects::RoleAuth),
3493 Database(durable::objects::Database),
3494 Schema(durable::objects::Schema),
3495 DefaultPrivilege(durable::objects::DefaultPrivilege),
3496 SystemPrivilege(MzAclItem),
3497 SystemConfiguration(durable::objects::SystemConfiguration),
3498 Cluster(durable::objects::Cluster),
3499 NetworkPolicy(durable::objects::NetworkPolicy),
3500 IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3501 ClusterReplica(durable::objects::ClusterReplica),
3502 SourceReferences(durable::objects::SourceReferences),
3503 SystemObjectMapping(durable::objects::SystemObjectMapping),
3504 Item(durable::objects::Item),
3505 Comment(durable::objects::Comment),
3506 AuditLog(durable::objects::AuditLog),
3507 StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3509 UnfinalizedShard(durable::objects::UnfinalizedShard),
3510}
3511
3512impl From<BootstrapStateUpdateKind> for StateUpdateKind {
3513 fn from(value: BootstrapStateUpdateKind) -> Self {
3514 match value {
3515 BootstrapStateUpdateKind::Role(kind) => StateUpdateKind::Role(kind),
3516 BootstrapStateUpdateKind::RoleAuth(kind) => StateUpdateKind::RoleAuth(kind),
3517 BootstrapStateUpdateKind::Database(kind) => StateUpdateKind::Database(kind),
3518 BootstrapStateUpdateKind::Schema(kind) => StateUpdateKind::Schema(kind),
3519 BootstrapStateUpdateKind::DefaultPrivilege(kind) => {
3520 StateUpdateKind::DefaultPrivilege(kind)
3521 }
3522 BootstrapStateUpdateKind::SystemPrivilege(kind) => {
3523 StateUpdateKind::SystemPrivilege(kind)
3524 }
3525 BootstrapStateUpdateKind::SystemConfiguration(kind) => {
3526 StateUpdateKind::SystemConfiguration(kind)
3527 }
3528 BootstrapStateUpdateKind::SourceReferences(kind) => {
3529 StateUpdateKind::SourceReferences(kind)
3530 }
3531 BootstrapStateUpdateKind::Cluster(kind) => StateUpdateKind::Cluster(kind),
3532 BootstrapStateUpdateKind::NetworkPolicy(kind) => StateUpdateKind::NetworkPolicy(kind),
3533 BootstrapStateUpdateKind::IntrospectionSourceIndex(kind) => {
3534 StateUpdateKind::IntrospectionSourceIndex(kind)
3535 }
3536 BootstrapStateUpdateKind::ClusterReplica(kind) => StateUpdateKind::ClusterReplica(kind),
3537 BootstrapStateUpdateKind::SystemObjectMapping(kind) => {
3538 StateUpdateKind::SystemObjectMapping(kind)
3539 }
3540 BootstrapStateUpdateKind::Item(kind) => StateUpdateKind::Item(kind),
3541 BootstrapStateUpdateKind::Comment(kind) => StateUpdateKind::Comment(kind),
3542 BootstrapStateUpdateKind::AuditLog(kind) => StateUpdateKind::AuditLog(kind),
3543 BootstrapStateUpdateKind::StorageCollectionMetadata(kind) => {
3544 StateUpdateKind::StorageCollectionMetadata(kind)
3545 }
3546 BootstrapStateUpdateKind::UnfinalizedShard(kind) => {
3547 StateUpdateKind::UnfinalizedShard(kind)
3548 }
3549 }
3550 }
3551}
3552
3553impl TryFrom<StateUpdateKind> for BootstrapStateUpdateKind {
3554 type Error = TemporaryItem;
3555
3556 fn try_from(value: StateUpdateKind) -> Result<Self, Self::Error> {
3557 match value {
3558 StateUpdateKind::Role(kind) => Ok(BootstrapStateUpdateKind::Role(kind)),
3559 StateUpdateKind::RoleAuth(kind) => Ok(BootstrapStateUpdateKind::RoleAuth(kind)),
3560 StateUpdateKind::Database(kind) => Ok(BootstrapStateUpdateKind::Database(kind)),
3561 StateUpdateKind::Schema(kind) => Ok(BootstrapStateUpdateKind::Schema(kind)),
3562 StateUpdateKind::DefaultPrivilege(kind) => {
3563 Ok(BootstrapStateUpdateKind::DefaultPrivilege(kind))
3564 }
3565 StateUpdateKind::SystemPrivilege(kind) => {
3566 Ok(BootstrapStateUpdateKind::SystemPrivilege(kind))
3567 }
3568 StateUpdateKind::SystemConfiguration(kind) => {
3569 Ok(BootstrapStateUpdateKind::SystemConfiguration(kind))
3570 }
3571 StateUpdateKind::Cluster(kind) => Ok(BootstrapStateUpdateKind::Cluster(kind)),
3572 StateUpdateKind::NetworkPolicy(kind) => {
3573 Ok(BootstrapStateUpdateKind::NetworkPolicy(kind))
3574 }
3575 StateUpdateKind::IntrospectionSourceIndex(kind) => {
3576 Ok(BootstrapStateUpdateKind::IntrospectionSourceIndex(kind))
3577 }
3578 StateUpdateKind::ClusterReplica(kind) => {
3579 Ok(BootstrapStateUpdateKind::ClusterReplica(kind))
3580 }
3581 StateUpdateKind::SourceReferences(kind) => {
3582 Ok(BootstrapStateUpdateKind::SourceReferences(kind))
3583 }
3584 StateUpdateKind::SystemObjectMapping(kind) => {
3585 Ok(BootstrapStateUpdateKind::SystemObjectMapping(kind))
3586 }
3587 StateUpdateKind::TemporaryItem(kind) => Err(kind),
3588 StateUpdateKind::Item(kind) => Ok(BootstrapStateUpdateKind::Item(kind)),
3589 StateUpdateKind::Comment(kind) => Ok(BootstrapStateUpdateKind::Comment(kind)),
3590 StateUpdateKind::AuditLog(kind) => Ok(BootstrapStateUpdateKind::AuditLog(kind)),
3591 StateUpdateKind::StorageCollectionMetadata(kind) => {
3592 Ok(BootstrapStateUpdateKind::StorageCollectionMetadata(kind))
3593 }
3594 StateUpdateKind::UnfinalizedShard(kind) => {
3595 Ok(BootstrapStateUpdateKind::UnfinalizedShard(kind))
3596 }
3597 }
3598 }
3599}