1use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, LazyLock};
18
19use chrono::{DateTime, Utc};
20use mz_adapter_types::compaction::CompactionWindow;
21use mz_adapter_types::connection::ConnectionId;
22use mz_compute_client::logging::LogVariant;
23use mz_controller::clusters::{ClusterRole, ClusterStatus, ReplicaConfig, ReplicaLogging};
24use mz_controller_types::{ClusterId, ReplicaId};
25use mz_expr::{MirScalarExpr, OptimizedMirRelationExpr};
26use mz_ore::collections::CollectionExt;
27use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
28use mz_repr::network_policy_id::NetworkPolicyId;
29use mz_repr::optimize::OptimizerFeatureOverrides;
30use mz_repr::refresh_schedule::RefreshSchedule;
31use mz_repr::role_id::RoleId;
32use mz_repr::{
33 CatalogItemId, ColumnName, ColumnType, Diff, GlobalId, RelationDesc, RelationVersion,
34 RelationVersionSelector, Timestamp, VersionedRelationDesc,
35};
36use mz_sql::ast::display::AstDisplay;
37use mz_sql::ast::{
38 ColumnDef, ColumnOption, ColumnOptionDef, ColumnVersioned, Expr, Raw, RawDataType, Statement,
39 UnresolvedItemName, Value, WithOptionValue,
40};
41use mz_sql::catalog::{
42 CatalogClusterReplica, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem,
43 CatalogItemType as SqlCatalogItemType, CatalogItemType, CatalogSchema, CatalogTypeDetails,
44 DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, RoleAttributes, RoleMembership,
45 RoleVars, SystemObjectType,
46};
47use mz_sql::names::{
48 Aug, CommentObjectId, DatabaseId, DependencyIds, FullItemName, QualifiedItemName,
49 QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier,
50};
51use mz_sql::plan::{
52 ClusterSchedule, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, ConnectionDetails,
53 CreateClusterManagedPlan, CreateClusterPlan, CreateClusterVariant, CreateSourcePlan,
54 HirRelationExpr, Ingestion as PlanIngestion, NetworkPolicyRule, PlanError, WebhookBodyFormat,
55 WebhookHeaders, WebhookValidation,
56};
57use mz_sql::rbac;
58use mz_sql::session::vars::OwnedVarInput;
59use mz_storage_client::controller::IntrospectionType;
60use mz_storage_types::connections::inline::ReferencedConnection;
61use mz_storage_types::sinks::{SinkEnvelope, StorageSinkConnection};
62use mz_storage_types::sources::{
63 GenericSourceConnection, SourceConnection, SourceDesc, SourceEnvelope, SourceExportDataConfig,
64 SourceExportDetails, Timeline,
65};
66use serde::ser::SerializeSeq;
67use serde::{Deserialize, Serialize};
68use timely::progress::Antichain;
69use tracing::debug;
70
71use crate::builtin::{MZ_CATALOG_SERVER_CLUSTER, MZ_SYSTEM_CLUSTER};
72use crate::durable;
73
74pub trait UpdateFrom<T>: From<T> {
76 fn update_from(&mut self, from: T);
77}
78
79#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
80pub struct Database {
81 pub name: String,
82 pub id: DatabaseId,
83 pub oid: u32,
84 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
85 pub schemas_by_id: BTreeMap<SchemaId, Schema>,
86 pub schemas_by_name: BTreeMap<String, SchemaId>,
87 pub owner_id: RoleId,
88 pub privileges: PrivilegeMap,
89}
90
91impl From<Database> for durable::Database {
92 fn from(database: Database) -> durable::Database {
93 durable::Database {
94 id: database.id,
95 oid: database.oid,
96 name: database.name,
97 owner_id: database.owner_id,
98 privileges: database.privileges.into_all_values().collect(),
99 }
100 }
101}
102
103impl From<durable::Database> for Database {
104 fn from(
105 durable::Database {
106 id,
107 oid,
108 name,
109 owner_id,
110 privileges,
111 }: durable::Database,
112 ) -> Database {
113 Database {
114 id,
115 oid,
116 schemas_by_id: BTreeMap::new(),
117 schemas_by_name: BTreeMap::new(),
118 name,
119 owner_id,
120 privileges: PrivilegeMap::from_mz_acl_items(privileges),
121 }
122 }
123}
124
125impl UpdateFrom<durable::Database> for Database {
126 fn update_from(
127 &mut self,
128 durable::Database {
129 id,
130 oid,
131 name,
132 owner_id,
133 privileges,
134 }: durable::Database,
135 ) {
136 self.id = id;
137 self.oid = oid;
138 self.name = name;
139 self.owner_id = owner_id;
140 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
141 }
142}
143
144#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
145pub struct Schema {
146 pub name: QualifiedSchemaName,
147 pub id: SchemaSpecifier,
148 pub oid: u32,
149 pub items: BTreeMap<String, CatalogItemId>,
150 pub functions: BTreeMap<String, CatalogItemId>,
151 pub types: BTreeMap<String, CatalogItemId>,
152 pub owner_id: RoleId,
153 pub privileges: PrivilegeMap,
154}
155
156impl From<Schema> for durable::Schema {
157 fn from(schema: Schema) -> durable::Schema {
158 durable::Schema {
159 id: schema.id.into(),
160 oid: schema.oid,
161 name: schema.name.schema,
162 database_id: schema.name.database.id(),
163 owner_id: schema.owner_id,
164 privileges: schema.privileges.into_all_values().collect(),
165 }
166 }
167}
168
169impl From<durable::Schema> for Schema {
170 fn from(
171 durable::Schema {
172 id,
173 oid,
174 name,
175 database_id,
176 owner_id,
177 privileges,
178 }: durable::Schema,
179 ) -> Schema {
180 Schema {
181 name: QualifiedSchemaName {
182 database: database_id.into(),
183 schema: name,
184 },
185 id: id.into(),
186 oid,
187 items: BTreeMap::new(),
188 functions: BTreeMap::new(),
189 types: BTreeMap::new(),
190 owner_id,
191 privileges: PrivilegeMap::from_mz_acl_items(privileges),
192 }
193 }
194}
195
196impl UpdateFrom<durable::Schema> for Schema {
197 fn update_from(
198 &mut self,
199 durable::Schema {
200 id,
201 oid,
202 name,
203 database_id,
204 owner_id,
205 privileges,
206 }: durable::Schema,
207 ) {
208 self.name = QualifiedSchemaName {
209 database: database_id.into(),
210 schema: name,
211 };
212 self.id = id.into();
213 self.oid = oid;
214 self.owner_id = owner_id;
215 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
216 }
217}
218
219#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
220pub struct Role {
221 pub name: String,
222 pub id: RoleId,
223 pub oid: u32,
224 pub attributes: RoleAttributes,
225 pub membership: RoleMembership,
226 pub vars: RoleVars,
227}
228
229impl Role {
230 pub fn is_user(&self) -> bool {
231 self.id.is_user()
232 }
233
234 pub fn vars<'a>(&'a self) -> impl Iterator<Item = (&'a str, &'a OwnedVarInput)> {
235 self.vars.map.iter().map(|(name, val)| (name.as_str(), val))
236 }
237}
238
239impl From<Role> for durable::Role {
240 fn from(role: Role) -> durable::Role {
241 durable::Role {
242 id: role.id,
243 oid: role.oid,
244 name: role.name,
245 attributes: role.attributes,
246 membership: role.membership,
247 vars: role.vars,
248 }
249 }
250}
251
252impl From<durable::Role> for Role {
253 fn from(
254 durable::Role {
255 id,
256 oid,
257 name,
258 attributes,
259 membership,
260 vars,
261 }: durable::Role,
262 ) -> Self {
263 Role {
264 name,
265 id,
266 oid,
267 attributes,
268 membership,
269 vars,
270 }
271 }
272}
273
274impl UpdateFrom<durable::Role> for Role {
275 fn update_from(
276 &mut self,
277 durable::Role {
278 id,
279 oid,
280 name,
281 attributes,
282 membership,
283 vars,
284 }: durable::Role,
285 ) {
286 self.id = id;
287 self.oid = oid;
288 self.name = name;
289 self.attributes = attributes;
290 self.membership = membership;
291 self.vars = vars;
292 }
293}
294
295#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
296pub struct RoleAuth {
297 pub role_id: RoleId,
298 pub password_hash: Option<String>,
299 pub updated_at: u64,
300}
301
302impl From<RoleAuth> for durable::RoleAuth {
303 fn from(role_auth: RoleAuth) -> durable::RoleAuth {
304 durable::RoleAuth {
305 role_id: role_auth.role_id,
306 password_hash: role_auth.password_hash,
307 updated_at: role_auth.updated_at,
308 }
309 }
310}
311
312impl From<durable::RoleAuth> for RoleAuth {
313 fn from(
314 durable::RoleAuth {
315 role_id,
316 password_hash,
317 updated_at,
318 }: durable::RoleAuth,
319 ) -> RoleAuth {
320 RoleAuth {
321 role_id,
322 password_hash,
323 updated_at,
324 }
325 }
326}
327
328impl UpdateFrom<durable::RoleAuth> for RoleAuth {
329 fn update_from(&mut self, from: durable::RoleAuth) {
330 self.role_id = from.role_id;
331 self.password_hash = from.password_hash;
332 }
333}
334
335#[derive(Debug, Serialize, Clone, PartialEq)]
336pub struct Cluster {
337 pub name: String,
338 pub id: ClusterId,
339 pub config: ClusterConfig,
340 #[serde(skip)]
341 pub log_indexes: BTreeMap<LogVariant, GlobalId>,
342 pub bound_objects: BTreeSet<CatalogItemId>,
345 pub replica_id_by_name_: BTreeMap<String, ReplicaId>,
346 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
347 pub replicas_by_id_: BTreeMap<ReplicaId, ClusterReplica>,
348 pub owner_id: RoleId,
349 pub privileges: PrivilegeMap,
350}
351
352impl Cluster {
353 pub fn role(&self) -> ClusterRole {
355 if self.name == MZ_SYSTEM_CLUSTER.name {
358 ClusterRole::SystemCritical
359 } else if self.name == MZ_CATALOG_SERVER_CLUSTER.name {
360 ClusterRole::System
361 } else {
362 ClusterRole::User
363 }
364 }
365
366 pub fn is_managed(&self) -> bool {
368 matches!(self.config.variant, ClusterVariant::Managed { .. })
369 }
370
371 pub fn user_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
373 self.replicas().filter(|r| !r.config.location.internal())
374 }
375
376 pub fn replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
378 self.replicas_by_id_.values()
379 }
380
381 pub fn replica(&self, replica_id: ReplicaId) -> Option<&ClusterReplica> {
383 self.replicas_by_id_.get(&replica_id)
384 }
385
386 pub fn replica_id(&self, name: &str) -> Option<ReplicaId> {
388 self.replica_id_by_name_.get(name).copied()
389 }
390
391 pub fn availability_zones(&self) -> Option<&[String]> {
393 match &self.config.variant {
394 ClusterVariant::Managed(managed) => Some(&managed.availability_zones),
395 ClusterVariant::Unmanaged => None,
396 }
397 }
398
399 pub fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
400 let name = self.name.clone();
401 let variant = match &self.config.variant {
402 ClusterVariant::Managed(ClusterVariantManaged {
403 size,
404 availability_zones,
405 logging,
406 replication_factor,
407 disk,
408 optimizer_feature_overrides,
409 schedule,
410 }) => {
411 let introspection = match logging {
412 ReplicaLogging {
413 log_logging,
414 interval: Some(interval),
415 } => Some(ComputeReplicaIntrospectionConfig {
416 debugging: *log_logging,
417 interval: interval.clone(),
418 }),
419 ReplicaLogging {
420 log_logging: _,
421 interval: None,
422 } => None,
423 };
424 let compute = ComputeReplicaConfig { introspection };
425 CreateClusterVariant::Managed(CreateClusterManagedPlan {
426 replication_factor: replication_factor.clone(),
427 size: size.clone(),
428 availability_zones: availability_zones.clone(),
429 compute,
430 disk: disk.clone(),
431 optimizer_feature_overrides: optimizer_feature_overrides.clone(),
432 schedule: schedule.clone(),
433 })
434 }
435 ClusterVariant::Unmanaged => {
436 return Err(PlanError::Unsupported {
439 feature: "SHOW CREATE for unmanaged clusters".to_string(),
440 discussion_no: None,
441 });
442 }
443 };
444 let workload_class = self.config.workload_class.clone();
445 Ok(CreateClusterPlan {
446 name,
447 variant,
448 workload_class,
449 })
450 }
451}
452
453impl From<Cluster> for durable::Cluster {
454 fn from(cluster: Cluster) -> durable::Cluster {
455 durable::Cluster {
456 id: cluster.id,
457 name: cluster.name,
458 owner_id: cluster.owner_id,
459 privileges: cluster.privileges.into_all_values().collect(),
460 config: cluster.config.into(),
461 }
462 }
463}
464
465impl From<durable::Cluster> for Cluster {
466 fn from(
467 durable::Cluster {
468 id,
469 name,
470 owner_id,
471 privileges,
472 config,
473 }: durable::Cluster,
474 ) -> Self {
475 Cluster {
476 name: name.clone(),
477 id,
478 bound_objects: BTreeSet::new(),
479 log_indexes: BTreeMap::new(),
480 replica_id_by_name_: BTreeMap::new(),
481 replicas_by_id_: BTreeMap::new(),
482 owner_id,
483 privileges: PrivilegeMap::from_mz_acl_items(privileges),
484 config: config.into(),
485 }
486 }
487}
488
489impl UpdateFrom<durable::Cluster> for Cluster {
490 fn update_from(
491 &mut self,
492 durable::Cluster {
493 id,
494 name,
495 owner_id,
496 privileges,
497 config,
498 }: durable::Cluster,
499 ) {
500 self.id = id;
501 self.name = name;
502 self.owner_id = owner_id;
503 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
504 self.config = config.into();
505 }
506}
507
508#[derive(Debug, Serialize, Clone, PartialEq)]
509pub struct ClusterReplica {
510 pub name: String,
511 pub cluster_id: ClusterId,
512 pub replica_id: ReplicaId,
513 pub config: ReplicaConfig,
514 pub owner_id: RoleId,
515}
516
517impl From<ClusterReplica> for durable::ClusterReplica {
518 fn from(replica: ClusterReplica) -> durable::ClusterReplica {
519 durable::ClusterReplica {
520 cluster_id: replica.cluster_id,
521 replica_id: replica.replica_id,
522 name: replica.name,
523 config: replica.config.into(),
524 owner_id: replica.owner_id,
525 }
526 }
527}
528
529#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
530pub struct ClusterReplicaProcessStatus {
531 pub status: ClusterStatus,
532 pub time: DateTime<Utc>,
533}
534
535#[derive(Debug, Serialize, Clone, PartialEq)]
536pub struct SourceReferences {
537 pub updated_at: u64,
538 pub references: Vec<SourceReference>,
539}
540
541#[derive(Debug, Serialize, Clone, PartialEq)]
542pub struct SourceReference {
543 pub name: String,
544 pub namespace: Option<String>,
545 pub columns: Vec<String>,
546}
547
548impl From<SourceReference> for durable::SourceReference {
549 fn from(source_reference: SourceReference) -> durable::SourceReference {
550 durable::SourceReference {
551 name: source_reference.name,
552 namespace: source_reference.namespace,
553 columns: source_reference.columns,
554 }
555 }
556}
557
558impl SourceReferences {
559 pub fn to_durable(self, source_id: CatalogItemId) -> durable::SourceReferences {
560 durable::SourceReferences {
561 source_id,
562 updated_at: self.updated_at,
563 references: self.references.into_iter().map(Into::into).collect(),
564 }
565 }
566}
567
568impl From<durable::SourceReference> for SourceReference {
569 fn from(source_reference: durable::SourceReference) -> SourceReference {
570 SourceReference {
571 name: source_reference.name,
572 namespace: source_reference.namespace,
573 columns: source_reference.columns,
574 }
575 }
576}
577
578impl From<durable::SourceReferences> for SourceReferences {
579 fn from(source_references: durable::SourceReferences) -> SourceReferences {
580 SourceReferences {
581 updated_at: source_references.updated_at,
582 references: source_references
583 .references
584 .into_iter()
585 .map(|source_reference| source_reference.into())
586 .collect(),
587 }
588 }
589}
590
591impl From<mz_sql::plan::SourceReference> for SourceReference {
592 fn from(source_reference: mz_sql::plan::SourceReference) -> SourceReference {
593 SourceReference {
594 name: source_reference.name,
595 namespace: source_reference.namespace,
596 columns: source_reference.columns,
597 }
598 }
599}
600
601impl From<mz_sql::plan::SourceReferences> for SourceReferences {
602 fn from(source_references: mz_sql::plan::SourceReferences) -> SourceReferences {
603 SourceReferences {
604 updated_at: source_references.updated_at,
605 references: source_references
606 .references
607 .into_iter()
608 .map(|source_reference| source_reference.into())
609 .collect(),
610 }
611 }
612}
613
614impl From<SourceReferences> for mz_sql::plan::SourceReferences {
615 fn from(source_references: SourceReferences) -> mz_sql::plan::SourceReferences {
616 mz_sql::plan::SourceReferences {
617 updated_at: source_references.updated_at,
618 references: source_references
619 .references
620 .into_iter()
621 .map(|source_reference| source_reference.into())
622 .collect(),
623 }
624 }
625}
626
627impl From<SourceReference> for mz_sql::plan::SourceReference {
628 fn from(source_reference: SourceReference) -> mz_sql::plan::SourceReference {
629 mz_sql::plan::SourceReference {
630 name: source_reference.name,
631 namespace: source_reference.namespace,
632 columns: source_reference.columns,
633 }
634 }
635}
636
637#[derive(Clone, Debug, Serialize)]
638pub struct CatalogEntry {
639 pub item: CatalogItem,
640 #[serde(skip)]
641 pub referenced_by: Vec<CatalogItemId>,
642 #[serde(skip)]
646 pub used_by: Vec<CatalogItemId>,
647 pub id: CatalogItemId,
648 pub oid: u32,
649 pub name: QualifiedItemName,
650 pub owner_id: RoleId,
651 pub privileges: PrivilegeMap,
652}
653
654#[derive(Clone, Debug)]
672pub struct CatalogCollectionEntry {
673 pub entry: CatalogEntry,
674 pub version: RelationVersionSelector,
675}
676
677impl CatalogCollectionEntry {
678 pub fn desc(&self, name: &FullItemName) -> Result<Cow<RelationDesc>, SqlCatalogError> {
679 self.item().desc(name, self.version)
680 }
681
682 pub fn desc_opt(&self) -> Option<Cow<RelationDesc>> {
683 self.item().desc_opt(self.version)
684 }
685}
686
687impl mz_sql::catalog::CatalogCollectionItem for CatalogCollectionEntry {
688 fn desc(&self, name: &FullItemName) -> Result<Cow<RelationDesc>, SqlCatalogError> {
689 CatalogCollectionEntry::desc(self, name)
690 }
691
692 fn global_id(&self) -> GlobalId {
693 match self.entry.item() {
694 CatalogItem::Source(source) => source.global_id,
695 CatalogItem::Log(log) => log.global_id,
696 CatalogItem::View(view) => view.global_id,
697 CatalogItem::MaterializedView(mv) => mv.global_id,
698 CatalogItem::Sink(sink) => sink.global_id,
699 CatalogItem::Index(index) => index.global_id,
700 CatalogItem::Type(ty) => ty.global_id,
701 CatalogItem::Func(func) => func.global_id,
702 CatalogItem::Secret(secret) => secret.global_id,
703 CatalogItem::Connection(conn) => conn.global_id,
704 CatalogItem::ContinualTask(ct) => ct.global_id,
705 CatalogItem::Table(table) => match self.version {
706 RelationVersionSelector::Latest => {
707 let (_version, gid) = table
708 .collections
709 .last_key_value()
710 .expect("at least one version");
711 *gid
712 }
713 RelationVersionSelector::Specific(version) => table
714 .collections
715 .get(&version)
716 .expect("catalog corruption, missing version!")
717 .clone(),
718 },
719 }
720 }
721}
722
723impl Deref for CatalogCollectionEntry {
724 type Target = CatalogEntry;
725
726 fn deref(&self) -> &CatalogEntry {
727 &self.entry
728 }
729}
730
731impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry {
732 fn name(&self) -> &QualifiedItemName {
733 self.entry.name()
734 }
735
736 fn id(&self) -> CatalogItemId {
737 self.entry.id()
738 }
739
740 fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
741 Box::new(self.entry.global_ids())
742 }
743
744 fn oid(&self) -> u32 {
745 self.entry.oid()
746 }
747
748 fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
749 self.entry.func()
750 }
751
752 fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
753 self.entry.source_desc()
754 }
755
756 fn connection(
757 &self,
758 ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
759 {
760 mz_sql::catalog::CatalogItem::connection(&self.entry)
761 }
762
763 fn create_sql(&self) -> &str {
764 self.entry.create_sql()
765 }
766
767 fn item_type(&self) -> SqlCatalogItemType {
768 self.entry.item_type()
769 }
770
771 fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
772 self.entry.index_details()
773 }
774
775 fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
776 self.entry.writable_table_details()
777 }
778
779 fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
780 self.entry.type_details()
781 }
782
783 fn references(&self) -> &ResolvedIds {
784 self.entry.references()
785 }
786
787 fn uses(&self) -> BTreeSet<CatalogItemId> {
788 self.entry.uses()
789 }
790
791 fn referenced_by(&self) -> &[CatalogItemId] {
792 self.entry.referenced_by()
793 }
794
795 fn used_by(&self) -> &[CatalogItemId] {
796 self.entry.used_by()
797 }
798
799 fn subsource_details(
800 &self,
801 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
802 self.entry.subsource_details()
803 }
804
805 fn source_export_details(
806 &self,
807 ) -> Option<(
808 CatalogItemId,
809 &UnresolvedItemName,
810 &SourceExportDetails,
811 &SourceExportDataConfig<ReferencedConnection>,
812 )> {
813 self.entry.source_export_details()
814 }
815
816 fn is_progress_source(&self) -> bool {
817 self.entry.is_progress_source()
818 }
819
820 fn progress_id(&self) -> Option<CatalogItemId> {
821 self.entry.progress_id()
822 }
823
824 fn owner_id(&self) -> RoleId {
825 *self.entry.owner_id()
826 }
827
828 fn privileges(&self) -> &PrivilegeMap {
829 self.entry.privileges()
830 }
831
832 fn cluster_id(&self) -> Option<ClusterId> {
833 self.entry.item().cluster_id()
834 }
835
836 fn at_version(
837 &self,
838 version: RelationVersionSelector,
839 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
840 Box::new(CatalogCollectionEntry {
841 entry: self.entry.clone(),
842 version,
843 })
844 }
845
846 fn latest_version(&self) -> Option<RelationVersion> {
847 self.entry.latest_version()
848 }
849}
850
851#[derive(Debug, Clone, Serialize)]
852pub enum CatalogItem {
853 Table(Table),
854 Source(Source),
855 Log(Log),
856 View(View),
857 MaterializedView(MaterializedView),
858 Sink(Sink),
859 Index(Index),
860 Type(Type),
861 Func(Func),
862 Secret(Secret),
863 Connection(Connection),
864 ContinualTask(ContinualTask),
865}
866
867impl From<CatalogEntry> for durable::Item {
868 fn from(entry: CatalogEntry) -> durable::Item {
869 let (create_sql, global_id, extra_versions) = entry.item.into_serialized();
870 durable::Item {
871 id: entry.id,
872 oid: entry.oid,
873 global_id,
874 schema_id: entry.name.qualifiers.schema_spec.into(),
875 name: entry.name.item,
876 create_sql,
877 owner_id: entry.owner_id,
878 privileges: entry.privileges.into_all_values().collect(),
879 extra_versions,
880 }
881 }
882}
883
884#[derive(Debug, Clone, Serialize)]
885pub struct Table {
886 pub create_sql: Option<String>,
888 pub desc: VersionedRelationDesc,
890 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
892 pub collections: BTreeMap<RelationVersion, GlobalId>,
893 #[serde(skip)]
895 pub conn_id: Option<ConnectionId>,
896 pub resolved_ids: ResolvedIds,
898 pub custom_logical_compaction_window: Option<CompactionWindow>,
900 pub is_retained_metrics_object: bool,
905 pub data_source: TableDataSource,
907}
908
909impl Table {
910 pub fn timeline(&self) -> Timeline {
911 match &self.data_source {
912 TableDataSource::TableWrites { .. } => Timeline::EpochMilliseconds,
915 TableDataSource::DataSource { timeline, .. } => timeline.clone(),
916 }
917 }
918
919 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
921 self.collections.values().copied()
922 }
923
924 pub fn global_id_writes(&self) -> GlobalId {
926 self.collections
927 .last_key_value()
928 .expect("at least one version of a table")
929 .1
930 .clone()
931 }
932
933 pub fn collection_descs(
935 &self,
936 ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
937 self.collections.iter().map(|(version, gid)| {
938 let desc = self
939 .desc
940 .at_version(RelationVersionSelector::Specific(*version));
941 (*gid, *version, desc)
942 })
943 }
944
945 pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
947 let (version, _gid) = self
948 .collections
949 .iter()
950 .find(|(_version, gid)| *gid == id)
951 .expect("GlobalId to exist");
952 self.desc
953 .at_version(RelationVersionSelector::Specific(*version))
954 }
955}
956
957#[derive(Clone, Debug, Serialize)]
958pub enum TableDataSource {
959 TableWrites {
961 #[serde(skip)]
962 defaults: Vec<Expr<Aug>>,
963 },
964
965 DataSource {
968 desc: DataSourceDesc,
969 timeline: Timeline,
970 },
971}
972
973#[derive(Debug, Clone, Serialize)]
974pub enum DataSourceDesc {
975 Ingestion {
977 ingestion_desc: PlanIngestion,
978 cluster_id: ClusterId,
979 },
980 IngestionExport {
988 ingestion_id: CatalogItemId,
989 external_reference: UnresolvedItemName,
990 details: SourceExportDetails,
991 data_config: SourceExportDataConfig<ReferencedConnection>,
992 },
993 Introspection(IntrospectionType),
995 Progress,
997 Webhook {
999 validate_using: Option<WebhookValidation>,
1001 body_format: WebhookBodyFormat,
1003 headers: WebhookHeaders,
1005 cluster_id: ClusterId,
1007 },
1008}
1009
1010impl DataSourceDesc {
1011 pub fn formats(&self) -> (Option<&str>, Option<&str>) {
1013 match &self {
1014 DataSourceDesc::Ingestion { ingestion_desc, .. } => {
1015 match &ingestion_desc.desc.primary_export.encoding.as_ref() {
1016 Some(encoding) => match &encoding.key {
1017 Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1018 None => (None, Some(encoding.value.type_())),
1019 },
1020 None => (None, None),
1021 }
1022 }
1023 DataSourceDesc::IngestionExport { data_config, .. } => match &data_config.encoding {
1024 Some(encoding) => match &encoding.key {
1025 Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1026 None => (None, Some(encoding.value.type_())),
1027 },
1028 None => (None, None),
1029 },
1030 DataSourceDesc::Introspection(_)
1031 | DataSourceDesc::Webhook { .. }
1032 | DataSourceDesc::Progress => (None, None),
1033 }
1034 }
1035
1036 pub fn envelope(&self) -> Option<&str> {
1038 fn envelope_string(envelope: &SourceEnvelope) -> &str {
1043 match envelope {
1044 SourceEnvelope::None(_) => "none",
1045 SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
1046 mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert",
1047 mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
1048 "debezium"
1052 }
1053 mz_storage_types::sources::envelope::UpsertStyle::ValueErrInline { .. } => {
1054 "upsert-value-err-inline"
1055 }
1056 },
1057 SourceEnvelope::CdcV2 => {
1058 "materialize"
1061 }
1062 }
1063 }
1064
1065 match self {
1066 DataSourceDesc::Ingestion { ingestion_desc, .. } => Some(envelope_string(
1071 &ingestion_desc.desc.primary_export.envelope,
1072 )),
1073 DataSourceDesc::IngestionExport { data_config, .. } => {
1074 Some(envelope_string(&data_config.envelope))
1075 }
1076 DataSourceDesc::Introspection(_)
1077 | DataSourceDesc::Webhook { .. }
1078 | DataSourceDesc::Progress => None,
1079 }
1080 }
1081}
1082
1083#[derive(Debug, Clone, Serialize)]
1084pub struct Source {
1085 pub create_sql: Option<String>,
1087 pub global_id: GlobalId,
1089 #[serde(skip)]
1091 pub data_source: DataSourceDesc,
1092 pub desc: RelationDesc,
1094 pub timeline: Timeline,
1096 pub resolved_ids: ResolvedIds,
1098 pub custom_logical_compaction_window: Option<CompactionWindow>,
1102 pub is_retained_metrics_object: bool,
1105}
1106
1107impl Source {
1108 pub fn new(
1115 plan: CreateSourcePlan,
1116 global_id: GlobalId,
1117 resolved_ids: ResolvedIds,
1118 custom_logical_compaction_window: Option<CompactionWindow>,
1119 is_retained_metrics_object: bool,
1120 ) -> Source {
1121 Source {
1122 create_sql: Some(plan.source.create_sql),
1123 data_source: match plan.source.data_source {
1124 mz_sql::plan::DataSourceDesc::Ingestion(ingestion_desc) => {
1125 DataSourceDesc::Ingestion {
1126 ingestion_desc,
1127 cluster_id: plan
1128 .in_cluster
1129 .expect("ingestion-based sources must be given a cluster ID"),
1130 }
1131 }
1132 mz_sql::plan::DataSourceDesc::Progress => {
1133 assert!(
1134 plan.in_cluster.is_none(),
1135 "subsources must not have a host config or cluster_id defined"
1136 );
1137 DataSourceDesc::Progress
1138 }
1139 mz_sql::plan::DataSourceDesc::IngestionExport {
1140 ingestion_id,
1141 external_reference,
1142 details,
1143 data_config,
1144 } => {
1145 assert!(
1146 plan.in_cluster.is_none(),
1147 "subsources must not have a host config or cluster_id defined"
1148 );
1149 DataSourceDesc::IngestionExport {
1150 ingestion_id,
1151 external_reference,
1152 details,
1153 data_config,
1154 }
1155 }
1156 mz_sql::plan::DataSourceDesc::Webhook {
1157 validate_using,
1158 body_format,
1159 headers,
1160 cluster_id,
1161 } => {
1162 mz_ore::soft_assert_or_log!(
1163 cluster_id.is_none(),
1164 "cluster_id set at Source level for Webhooks"
1165 );
1166 DataSourceDesc::Webhook {
1167 validate_using,
1168 body_format,
1169 headers,
1170 cluster_id: plan
1171 .in_cluster
1172 .expect("webhook sources must be given a cluster ID"),
1173 }
1174 }
1175 },
1176 desc: plan.source.desc,
1177 global_id,
1178 timeline: plan.timeline,
1179 resolved_ids,
1180 custom_logical_compaction_window: plan
1181 .source
1182 .compaction_window
1183 .or(custom_logical_compaction_window),
1184 is_retained_metrics_object,
1185 }
1186 }
1187
1188 pub fn source_type(&self) -> &str {
1190 match &self.data_source {
1191 DataSourceDesc::Ingestion { ingestion_desc, .. } => {
1192 ingestion_desc.desc.connection.name()
1193 }
1194 DataSourceDesc::Progress => "progress",
1195 DataSourceDesc::IngestionExport { .. } => "subsource",
1196 DataSourceDesc::Introspection(_) => "source",
1197 DataSourceDesc::Webhook { .. } => "webhook",
1198 }
1199 }
1200
1201 pub fn connection_id(&self) -> Option<CatalogItemId> {
1203 match &self.data_source {
1204 DataSourceDesc::Ingestion { ingestion_desc, .. } => {
1205 ingestion_desc.desc.connection.connection_id()
1206 }
1207 DataSourceDesc::IngestionExport { .. }
1208 | DataSourceDesc::Introspection(_)
1209 | DataSourceDesc::Webhook { .. }
1210 | DataSourceDesc::Progress => None,
1211 }
1212 }
1213
1214 pub fn global_id(&self) -> GlobalId {
1216 self.global_id
1217 }
1218
1219 pub fn user_controllable_persist_shard_count(&self) -> i64 {
1227 match &self.data_source {
1228 DataSourceDesc::Ingestion { ingestion_desc, .. } => {
1229 match &ingestion_desc.desc.connection {
1230 GenericSourceConnection::Postgres(_)
1234 | GenericSourceConnection::MySql(_)
1235 | GenericSourceConnection::SqlServer(_) => 0,
1236 GenericSourceConnection::LoadGenerator(lg) => {
1237 if lg.load_generator.views().is_empty() {
1239 1
1241 } else {
1242 0
1245 }
1246 }
1247 GenericSourceConnection::Kafka(_) => 1,
1248 }
1249 }
1250 DataSourceDesc::IngestionExport { .. } => 1,
1253 DataSourceDesc::Webhook { .. } => 1,
1254 DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => 0,
1257 }
1258 }
1259}
1260
1261#[derive(Debug, Clone, Serialize)]
1262pub struct Log {
1263 pub variant: LogVariant,
1265 pub global_id: GlobalId,
1267}
1268
1269impl Log {
1270 pub fn global_id(&self) -> GlobalId {
1272 self.global_id
1273 }
1274}
1275
1276#[derive(Debug, Clone, Serialize)]
1277pub struct Sink {
1278 pub create_sql: String,
1280 pub global_id: GlobalId,
1282 pub from: GlobalId,
1284 pub connection: StorageSinkConnection<ReferencedConnection>,
1286 pub envelope: SinkEnvelope,
1290 pub with_snapshot: bool,
1292 pub version: u64,
1294 pub resolved_ids: ResolvedIds,
1296 pub cluster_id: ClusterId,
1298}
1299
1300impl Sink {
1301 pub fn sink_type(&self) -> &str {
1302 self.connection.name()
1303 }
1304
1305 pub fn envelope(&self) -> Option<&str> {
1307 match &self.envelope {
1308 SinkEnvelope::Debezium => Some("debezium"),
1309 SinkEnvelope::Upsert => Some("upsert"),
1310 }
1311 }
1312
1313 pub fn combined_format(&self) -> Cow<'_, str> {
1318 let StorageSinkConnection::Kafka(connection) = &self.connection;
1319 connection.format.get_format_name()
1320 }
1321
1322 pub fn formats(&self) -> (Option<&str>, &str) {
1324 let StorageSinkConnection::Kafka(connection) = &self.connection;
1325 let key_format = connection
1326 .format
1327 .key_format
1328 .as_ref()
1329 .map(|format| format.get_format_name());
1330 let value_format = connection.format.value_format.get_format_name();
1331 (key_format, value_format)
1332 }
1333
1334 pub fn connection_id(&self) -> Option<CatalogItemId> {
1335 self.connection.connection_id()
1336 }
1337
1338 pub fn global_id(&self) -> GlobalId {
1340 self.global_id
1341 }
1342}
1343
1344#[derive(Debug, Clone, Serialize)]
1345pub struct View {
1346 pub create_sql: String,
1348 pub global_id: GlobalId,
1350 pub raw_expr: Arc<HirRelationExpr>,
1352 pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1354 pub desc: RelationDesc,
1356 pub conn_id: Option<ConnectionId>,
1358 pub resolved_ids: ResolvedIds,
1360 pub dependencies: DependencyIds,
1362}
1363
1364impl View {
1365 pub fn global_id(&self) -> GlobalId {
1367 self.global_id
1368 }
1369}
1370
1371#[derive(Debug, Clone, Serialize)]
1372pub struct MaterializedView {
1373 pub create_sql: String,
1375 pub global_id: GlobalId,
1377 pub raw_expr: Arc<HirRelationExpr>,
1379 pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1381 pub desc: RelationDesc,
1383 pub resolved_ids: ResolvedIds,
1385 pub dependencies: DependencyIds,
1387 pub cluster_id: ClusterId,
1389 pub non_null_assertions: Vec<usize>,
1393 pub custom_logical_compaction_window: Option<CompactionWindow>,
1395 pub refresh_schedule: Option<RefreshSchedule>,
1397 pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1402}
1403
1404impl MaterializedView {
1405 pub fn global_id(&self) -> GlobalId {
1407 self.global_id
1408 }
1409}
1410
1411#[derive(Debug, Clone, Serialize)]
1412pub struct Index {
1413 pub create_sql: String,
1415 pub global_id: GlobalId,
1417 pub on: GlobalId,
1419 pub keys: Arc<[MirScalarExpr]>,
1421 pub conn_id: Option<ConnectionId>,
1423 pub resolved_ids: ResolvedIds,
1425 pub cluster_id: ClusterId,
1427 pub custom_logical_compaction_window: Option<CompactionWindow>,
1429 pub is_retained_metrics_object: bool,
1434}
1435
1436impl Index {
1437 pub fn global_id(&self) -> GlobalId {
1439 self.global_id
1440 }
1441}
1442
1443#[derive(Debug, Clone, Serialize)]
1444pub struct Type {
1445 pub create_sql: Option<String>,
1447 pub global_id: GlobalId,
1449 #[serde(skip)]
1450 pub details: CatalogTypeDetails<IdReference>,
1451 pub desc: Option<RelationDesc>,
1452 pub resolved_ids: ResolvedIds,
1454}
1455
1456#[derive(Debug, Clone, Serialize)]
1457pub struct Func {
1458 #[serde(skip)]
1460 pub inner: &'static mz_sql::func::Func,
1461 pub global_id: GlobalId,
1463}
1464
1465#[derive(Debug, Clone, Serialize)]
1466pub struct Secret {
1467 pub create_sql: String,
1469 pub global_id: GlobalId,
1471}
1472
1473#[derive(Debug, Clone, Serialize)]
1474pub struct Connection {
1475 pub create_sql: String,
1477 pub global_id: GlobalId,
1479 pub details: ConnectionDetails,
1481 pub resolved_ids: ResolvedIds,
1483}
1484
1485impl Connection {
1486 pub fn global_id(&self) -> GlobalId {
1488 self.global_id
1489 }
1490}
1491
1492#[derive(Debug, Clone, Serialize)]
1493pub struct ContinualTask {
1494 pub create_sql: String,
1496 pub global_id: GlobalId,
1498 pub input_id: GlobalId,
1500 pub with_snapshot: bool,
1501 pub raw_expr: Arc<HirRelationExpr>,
1506 pub desc: RelationDesc,
1508 pub resolved_ids: ResolvedIds,
1510 pub dependencies: DependencyIds,
1512 pub cluster_id: ClusterId,
1514 pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1516}
1517
1518impl ContinualTask {
1519 pub fn global_id(&self) -> GlobalId {
1521 self.global_id
1522 }
1523}
1524
1525#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
1526pub struct NetworkPolicy {
1527 pub name: String,
1528 pub id: NetworkPolicyId,
1529 pub oid: u32,
1530 pub rules: Vec<NetworkPolicyRule>,
1531 pub owner_id: RoleId,
1532 pub privileges: PrivilegeMap,
1533}
1534
1535impl From<NetworkPolicy> for durable::NetworkPolicy {
1536 fn from(policy: NetworkPolicy) -> durable::NetworkPolicy {
1537 durable::NetworkPolicy {
1538 id: policy.id,
1539 oid: policy.oid,
1540 name: policy.name,
1541 rules: policy.rules,
1542 owner_id: policy.owner_id,
1543 privileges: policy.privileges.into_all_values().collect(),
1544 }
1545 }
1546}
1547
1548impl From<durable::NetworkPolicy> for NetworkPolicy {
1549 fn from(
1550 durable::NetworkPolicy {
1551 id,
1552 oid,
1553 name,
1554 rules,
1555 owner_id,
1556 privileges,
1557 }: durable::NetworkPolicy,
1558 ) -> Self {
1559 NetworkPolicy {
1560 id,
1561 oid,
1562 name,
1563 rules,
1564 owner_id,
1565 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1566 }
1567 }
1568}
1569
1570impl UpdateFrom<durable::NetworkPolicy> for NetworkPolicy {
1571 fn update_from(
1572 &mut self,
1573 durable::NetworkPolicy {
1574 id,
1575 oid,
1576 name,
1577 rules,
1578 owner_id,
1579 privileges,
1580 }: durable::NetworkPolicy,
1581 ) {
1582 self.id = id;
1583 self.oid = oid;
1584 self.name = name;
1585 self.rules = rules;
1586 self.owner_id = owner_id;
1587 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1588 }
1589}
1590
1591impl CatalogItem {
1592 pub fn typ(&self) -> mz_sql::catalog::CatalogItemType {
1594 match self {
1595 CatalogItem::Table(_) => mz_sql::catalog::CatalogItemType::Table,
1596 CatalogItem::Source(_) => mz_sql::catalog::CatalogItemType::Source,
1597 CatalogItem::Log(_) => mz_sql::catalog::CatalogItemType::Source,
1598 CatalogItem::Sink(_) => mz_sql::catalog::CatalogItemType::Sink,
1599 CatalogItem::View(_) => mz_sql::catalog::CatalogItemType::View,
1600 CatalogItem::MaterializedView(_) => mz_sql::catalog::CatalogItemType::MaterializedView,
1601 CatalogItem::Index(_) => mz_sql::catalog::CatalogItemType::Index,
1602 CatalogItem::Type(_) => mz_sql::catalog::CatalogItemType::Type,
1603 CatalogItem::Func(_) => mz_sql::catalog::CatalogItemType::Func,
1604 CatalogItem::Secret(_) => mz_sql::catalog::CatalogItemType::Secret,
1605 CatalogItem::Connection(_) => mz_sql::catalog::CatalogItemType::Connection,
1606 CatalogItem::ContinualTask(_) => mz_sql::catalog::CatalogItemType::ContinualTask,
1607 }
1608 }
1609
1610 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1612 let gid = match self {
1613 CatalogItem::Source(source) => source.global_id,
1614 CatalogItem::Log(log) => log.global_id,
1615 CatalogItem::Sink(sink) => sink.global_id,
1616 CatalogItem::View(view) => view.global_id,
1617 CatalogItem::MaterializedView(mv) => mv.global_id,
1618 CatalogItem::ContinualTask(ct) => ct.global_id,
1619 CatalogItem::Index(index) => index.global_id,
1620 CatalogItem::Func(func) => func.global_id,
1621 CatalogItem::Type(ty) => ty.global_id,
1622 CatalogItem::Secret(secret) => secret.global_id,
1623 CatalogItem::Connection(conn) => conn.global_id,
1624 CatalogItem::Table(table) => {
1625 return itertools::Either::Left(table.collections.values().copied());
1626 }
1627 };
1628 itertools::Either::Right(std::iter::once(gid))
1629 }
1630
1631 pub fn latest_global_id(&self) -> GlobalId {
1635 match self {
1636 CatalogItem::Source(source) => source.global_id,
1637 CatalogItem::Log(log) => log.global_id,
1638 CatalogItem::Sink(sink) => sink.global_id,
1639 CatalogItem::View(view) => view.global_id,
1640 CatalogItem::MaterializedView(mv) => mv.global_id,
1641 CatalogItem::ContinualTask(ct) => ct.global_id,
1642 CatalogItem::Index(index) => index.global_id,
1643 CatalogItem::Func(func) => func.global_id,
1644 CatalogItem::Type(ty) => ty.global_id,
1645 CatalogItem::Secret(secret) => secret.global_id,
1646 CatalogItem::Connection(conn) => conn.global_id,
1647 CatalogItem::Table(table) => table.global_id_writes(),
1648 }
1649 }
1650
1651 pub fn is_storage_collection(&self) -> bool {
1653 match self {
1654 CatalogItem::Table(_)
1655 | CatalogItem::Source(_)
1656 | CatalogItem::MaterializedView(_)
1657 | CatalogItem::Sink(_)
1658 | CatalogItem::ContinualTask(_) => true,
1659 CatalogItem::Log(_)
1660 | CatalogItem::View(_)
1661 | CatalogItem::Index(_)
1662 | CatalogItem::Type(_)
1663 | CatalogItem::Func(_)
1664 | CatalogItem::Secret(_)
1665 | CatalogItem::Connection(_) => false,
1666 }
1667 }
1668
1669 pub fn desc(
1670 &self,
1671 name: &FullItemName,
1672 version: RelationVersionSelector,
1673 ) -> Result<Cow<RelationDesc>, SqlCatalogError> {
1674 self.desc_opt(version)
1675 .ok_or_else(|| SqlCatalogError::InvalidDependency {
1676 name: name.to_string(),
1677 typ: self.typ(),
1678 })
1679 }
1680
1681 pub fn desc_opt(&self, version: RelationVersionSelector) -> Option<Cow<RelationDesc>> {
1682 match &self {
1683 CatalogItem::Source(src) => Some(Cow::Borrowed(&src.desc)),
1684 CatalogItem::Log(log) => Some(Cow::Owned(log.variant.desc())),
1685 CatalogItem::Table(tbl) => Some(Cow::Owned(tbl.desc.at_version(version))),
1686 CatalogItem::View(view) => Some(Cow::Borrowed(&view.desc)),
1687 CatalogItem::MaterializedView(mview) => Some(Cow::Borrowed(&mview.desc)),
1688 CatalogItem::Type(typ) => typ.desc.as_ref().map(Cow::Borrowed),
1689 CatalogItem::ContinualTask(ct) => Some(Cow::Borrowed(&ct.desc)),
1690 CatalogItem::Func(_)
1691 | CatalogItem::Index(_)
1692 | CatalogItem::Sink(_)
1693 | CatalogItem::Secret(_)
1694 | CatalogItem::Connection(_) => None,
1695 }
1696 }
1697
1698 pub fn func(
1699 &self,
1700 entry: &CatalogEntry,
1701 ) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
1702 match &self {
1703 CatalogItem::Func(func) => Ok(func.inner),
1704 _ => Err(SqlCatalogError::UnexpectedType {
1705 name: entry.name().item.to_string(),
1706 actual_type: entry.item_type(),
1707 expected_type: CatalogItemType::Func,
1708 }),
1709 }
1710 }
1711
1712 pub fn source_desc(
1713 &self,
1714 entry: &CatalogEntry,
1715 ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
1716 match &self {
1717 CatalogItem::Source(source) => match &source.data_source {
1718 DataSourceDesc::Ingestion { ingestion_desc, .. } => Ok(Some(&ingestion_desc.desc)),
1719 DataSourceDesc::IngestionExport { .. }
1720 | DataSourceDesc::Introspection(_)
1721 | DataSourceDesc::Webhook { .. }
1722 | DataSourceDesc::Progress => Ok(None),
1723 },
1724 _ => Err(SqlCatalogError::UnexpectedType {
1725 name: entry.name().item.to_string(),
1726 actual_type: entry.item_type(),
1727 expected_type: CatalogItemType::Source,
1728 }),
1729 }
1730 }
1731
1732 pub fn is_progress_source(&self) -> bool {
1734 matches!(
1735 self,
1736 CatalogItem::Source(Source {
1737 data_source: DataSourceDesc::Progress,
1738 ..
1739 })
1740 )
1741 }
1742
1743 pub fn references(&self) -> &ResolvedIds {
1746 static EMPTY: LazyLock<ResolvedIds> = LazyLock::new(ResolvedIds::empty);
1747 match self {
1748 CatalogItem::Func(_) => &*EMPTY,
1749 CatalogItem::Index(idx) => &idx.resolved_ids,
1750 CatalogItem::Sink(sink) => &sink.resolved_ids,
1751 CatalogItem::Source(source) => &source.resolved_ids,
1752 CatalogItem::Log(_) => &*EMPTY,
1753 CatalogItem::Table(table) => &table.resolved_ids,
1754 CatalogItem::Type(typ) => &typ.resolved_ids,
1755 CatalogItem::View(view) => &view.resolved_ids,
1756 CatalogItem::MaterializedView(mview) => &mview.resolved_ids,
1757 CatalogItem::Secret(_) => &*EMPTY,
1758 CatalogItem::Connection(connection) => &connection.resolved_ids,
1759 CatalogItem::ContinualTask(ct) => &ct.resolved_ids,
1760 }
1761 }
1762
1763 pub fn uses(&self) -> BTreeSet<CatalogItemId> {
1769 let mut uses: BTreeSet<_> = self.references().items().copied().collect();
1770 match self {
1771 CatalogItem::Func(_) => {}
1774 CatalogItem::Index(_) => {}
1775 CatalogItem::Sink(_) => {}
1776 CatalogItem::Source(_) => {}
1777 CatalogItem::Log(_) => {}
1778 CatalogItem::Table(_) => {}
1779 CatalogItem::Type(_) => {}
1780 CatalogItem::View(view) => uses.extend(view.dependencies.0.iter().copied()),
1781 CatalogItem::MaterializedView(mview) => {
1782 uses.extend(mview.dependencies.0.iter().copied())
1783 }
1784 CatalogItem::ContinualTask(ct) => uses.extend(ct.dependencies.0.iter().copied()),
1785 CatalogItem::Secret(_) => {}
1786 CatalogItem::Connection(_) => {}
1787 }
1788 uses
1789 }
1790
1791 pub fn conn_id(&self) -> Option<&ConnectionId> {
1794 match self {
1795 CatalogItem::View(view) => view.conn_id.as_ref(),
1796 CatalogItem::Index(index) => index.conn_id.as_ref(),
1797 CatalogItem::Table(table) => table.conn_id.as_ref(),
1798 CatalogItem::Log(_)
1799 | CatalogItem::Source(_)
1800 | CatalogItem::Sink(_)
1801 | CatalogItem::MaterializedView(_)
1802 | CatalogItem::Secret(_)
1803 | CatalogItem::Type(_)
1804 | CatalogItem::Func(_)
1805 | CatalogItem::Connection(_)
1806 | CatalogItem::ContinualTask(_) => None,
1807 }
1808 }
1809
1810 pub fn is_temporary(&self) -> bool {
1812 self.conn_id().is_some()
1813 }
1814
1815 pub fn rename_schema_refs(
1816 &self,
1817 database_name: &str,
1818 cur_schema_name: &str,
1819 new_schema_name: &str,
1820 ) -> Result<CatalogItem, (String, String)> {
1821 let do_rewrite = |create_sql: String| -> Result<String, (String, String)> {
1822 let mut create_stmt = mz_sql::parse::parse(&create_sql)
1823 .expect("invalid create sql persisted to catalog")
1824 .into_element()
1825 .ast;
1826
1827 mz_sql::ast::transform::create_stmt_rename_schema_refs(
1829 &mut create_stmt,
1830 database_name,
1831 cur_schema_name,
1832 new_schema_name,
1833 )?;
1834
1835 Ok(create_stmt.to_ast_string_stable())
1836 };
1837
1838 match self {
1839 CatalogItem::Table(i) => {
1840 let mut i = i.clone();
1841 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1842 Ok(CatalogItem::Table(i))
1843 }
1844 CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
1845 CatalogItem::Source(i) => {
1846 let mut i = i.clone();
1847 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1848 Ok(CatalogItem::Source(i))
1849 }
1850 CatalogItem::Sink(i) => {
1851 let mut i = i.clone();
1852 i.create_sql = do_rewrite(i.create_sql)?;
1853 Ok(CatalogItem::Sink(i))
1854 }
1855 CatalogItem::View(i) => {
1856 let mut i = i.clone();
1857 i.create_sql = do_rewrite(i.create_sql)?;
1858 Ok(CatalogItem::View(i))
1859 }
1860 CatalogItem::MaterializedView(i) => {
1861 let mut i = i.clone();
1862 i.create_sql = do_rewrite(i.create_sql)?;
1863 Ok(CatalogItem::MaterializedView(i))
1864 }
1865 CatalogItem::Index(i) => {
1866 let mut i = i.clone();
1867 i.create_sql = do_rewrite(i.create_sql)?;
1868 Ok(CatalogItem::Index(i))
1869 }
1870 CatalogItem::Secret(i) => {
1871 let mut i = i.clone();
1872 i.create_sql = do_rewrite(i.create_sql)?;
1873 Ok(CatalogItem::Secret(i))
1874 }
1875 CatalogItem::Connection(i) => {
1876 let mut i = i.clone();
1877 i.create_sql = do_rewrite(i.create_sql)?;
1878 Ok(CatalogItem::Connection(i))
1879 }
1880 CatalogItem::Type(i) => {
1881 let mut i = i.clone();
1882 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1883 Ok(CatalogItem::Type(i))
1884 }
1885 CatalogItem::Func(i) => Ok(CatalogItem::Func(i.clone())),
1886 CatalogItem::ContinualTask(i) => {
1887 let mut i = i.clone();
1888 i.create_sql = do_rewrite(i.create_sql)?;
1889 Ok(CatalogItem::ContinualTask(i))
1890 }
1891 }
1892 }
1893
1894 pub fn rename_item_refs(
1898 &self,
1899 from: FullItemName,
1900 to_item_name: String,
1901 rename_self: bool,
1902 ) -> Result<CatalogItem, String> {
1903 let do_rewrite = |create_sql: String| -> Result<String, String> {
1904 let mut create_stmt = mz_sql::parse::parse(&create_sql)
1905 .expect("invalid create sql persisted to catalog")
1906 .into_element()
1907 .ast;
1908 if rename_self {
1909 mz_sql::ast::transform::create_stmt_rename(&mut create_stmt, to_item_name.clone());
1910 }
1911 mz_sql::ast::transform::create_stmt_rename_refs(&mut create_stmt, from, to_item_name)?;
1913 Ok(create_stmt.to_ast_string_stable())
1914 };
1915
1916 match self {
1917 CatalogItem::Table(i) => {
1918 let mut i = i.clone();
1919 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1920 Ok(CatalogItem::Table(i))
1921 }
1922 CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
1923 CatalogItem::Source(i) => {
1924 let mut i = i.clone();
1925 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1926 Ok(CatalogItem::Source(i))
1927 }
1928 CatalogItem::Sink(i) => {
1929 let mut i = i.clone();
1930 i.create_sql = do_rewrite(i.create_sql)?;
1931 Ok(CatalogItem::Sink(i))
1932 }
1933 CatalogItem::View(i) => {
1934 let mut i = i.clone();
1935 i.create_sql = do_rewrite(i.create_sql)?;
1936 Ok(CatalogItem::View(i))
1937 }
1938 CatalogItem::MaterializedView(i) => {
1939 let mut i = i.clone();
1940 i.create_sql = do_rewrite(i.create_sql)?;
1941 Ok(CatalogItem::MaterializedView(i))
1942 }
1943 CatalogItem::Index(i) => {
1944 let mut i = i.clone();
1945 i.create_sql = do_rewrite(i.create_sql)?;
1946 Ok(CatalogItem::Index(i))
1947 }
1948 CatalogItem::Secret(i) => {
1949 let mut i = i.clone();
1950 i.create_sql = do_rewrite(i.create_sql)?;
1951 Ok(CatalogItem::Secret(i))
1952 }
1953 CatalogItem::Func(_) | CatalogItem::Type(_) => {
1954 unreachable!("{}s cannot be renamed", self.typ())
1955 }
1956 CatalogItem::Connection(i) => {
1957 let mut i = i.clone();
1958 i.create_sql = do_rewrite(i.create_sql)?;
1959 Ok(CatalogItem::Connection(i))
1960 }
1961 CatalogItem::ContinualTask(i) => {
1962 let mut i = i.clone();
1963 i.create_sql = do_rewrite(i.create_sql)?;
1964 Ok(CatalogItem::ContinualTask(i))
1965 }
1966 }
1967 }
1968
1969 pub fn update_retain_history(
1972 &mut self,
1973 value: Option<Value>,
1974 window: CompactionWindow,
1975 ) -> Result<Option<WithOptionValue<Raw>>, ()> {
1976 let update = |mut ast: &mut Statement<Raw>| {
1977 macro_rules! update_retain_history {
1979 ( $stmt:ident, $opt:ident, $name:ident ) => {{
1980 let pos = $stmt
1982 .with_options
1983 .iter()
1984 .rposition(|o| o.name == mz_sql_parser::ast::$name::RetainHistory);
1986 if let Some(value) = value {
1987 let next = mz_sql_parser::ast::$opt {
1988 name: mz_sql_parser::ast::$name::RetainHistory,
1989 value: Some(WithOptionValue::RetainHistoryFor(value)),
1990 };
1991 if let Some(idx) = pos {
1992 let previous = $stmt.with_options[idx].clone();
1993 $stmt.with_options[idx] = next;
1994 previous.value
1995 } else {
1996 $stmt.with_options.push(next);
1997 None
1998 }
1999 } else {
2000 if let Some(idx) = pos {
2001 $stmt.with_options.swap_remove(idx).value
2002 } else {
2003 None
2004 }
2005 }
2006 }};
2007 }
2008 let previous = match &mut ast {
2009 Statement::CreateTable(stmt) => {
2010 update_retain_history!(stmt, TableOption, TableOptionName)
2011 }
2012 Statement::CreateIndex(stmt) => {
2013 update_retain_history!(stmt, IndexOption, IndexOptionName)
2014 }
2015 Statement::CreateSource(stmt) => {
2016 update_retain_history!(stmt, CreateSourceOption, CreateSourceOptionName)
2017 }
2018 Statement::CreateMaterializedView(stmt) => {
2019 update_retain_history!(stmt, MaterializedViewOption, MaterializedViewOptionName)
2020 }
2021 _ => {
2022 return Err(());
2023 }
2024 };
2025 Ok(previous)
2026 };
2027
2028 let res = self.update_sql(update)?;
2029 let cw = self
2030 .custom_logical_compaction_window_mut()
2031 .expect("item must have compaction window");
2032 *cw = Some(window);
2033 Ok(res)
2034 }
2035
2036 pub fn add_column(
2037 &mut self,
2038 name: ColumnName,
2039 typ: ColumnType,
2040 sql: RawDataType,
2041 ) -> Result<RelationVersion, PlanError> {
2042 let CatalogItem::Table(table) = self else {
2043 return Err(PlanError::Unsupported {
2044 feature: "adding columns to a non-Table".to_string(),
2045 discussion_no: None,
2046 });
2047 };
2048 let next_version = table.desc.add_column(name.clone(), typ);
2049
2050 let update = |mut ast: &mut Statement<Raw>| match &mut ast {
2051 Statement::CreateTable(stmt) => {
2052 let version = ColumnOptionDef {
2053 name: None,
2054 option: ColumnOption::Versioned {
2055 action: ColumnVersioned::Added,
2056 version: next_version.into(),
2057 },
2058 };
2059 let column = ColumnDef {
2060 name: name.into(),
2061 data_type: sql,
2062 collation: None,
2063 options: vec![version],
2064 };
2065 stmt.columns.push(column);
2066 Ok(())
2067 }
2068 _ => Err(()),
2069 };
2070
2071 self.update_sql(update)
2072 .map_err(|()| PlanError::Unstructured("expected CREATE TABLE statement".to_string()))?;
2073 Ok(next_version)
2074 }
2075
2076 pub fn update_sql<F, T>(&mut self, f: F) -> Result<T, ()>
2079 where
2080 F: FnOnce(&mut Statement<Raw>) -> Result<T, ()>,
2081 {
2082 let create_sql = match self {
2083 CatalogItem::Table(Table { create_sql, .. })
2084 | CatalogItem::Type(Type { create_sql, .. })
2085 | CatalogItem::Source(Source { create_sql, .. }) => create_sql.as_mut(),
2086 CatalogItem::Sink(Sink { create_sql, .. })
2087 | CatalogItem::View(View { create_sql, .. })
2088 | CatalogItem::MaterializedView(MaterializedView { create_sql, .. })
2089 | CatalogItem::Index(Index { create_sql, .. })
2090 | CatalogItem::Secret(Secret { create_sql, .. })
2091 | CatalogItem::Connection(Connection { create_sql, .. })
2092 | CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => Some(create_sql),
2093 CatalogItem::Func(_) | CatalogItem::Log(_) => None,
2094 };
2095 let Some(create_sql) = create_sql else {
2096 return Err(());
2097 };
2098 let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
2099 .expect("non-system items must be parseable")
2100 .into_element()
2101 .ast;
2102 debug!("rewrite: {}", ast.to_ast_string_redacted());
2103 let t = f(&mut ast)?;
2104 *create_sql = ast.to_ast_string_stable();
2105 debug!("rewrote: {}", ast.to_ast_string_redacted());
2106 Ok(t)
2107 }
2108
2109 pub fn is_compute_object_on_cluster(&self) -> Option<ClusterId> {
2116 match self {
2117 CatalogItem::Index(index) => Some(index.cluster_id),
2118 CatalogItem::Table(_)
2119 | CatalogItem::Source(_)
2120 | CatalogItem::Log(_)
2121 | CatalogItem::View(_)
2122 | CatalogItem::MaterializedView(_)
2123 | CatalogItem::Sink(_)
2124 | CatalogItem::Type(_)
2125 | CatalogItem::Func(_)
2126 | CatalogItem::Secret(_)
2127 | CatalogItem::Connection(_)
2128 | CatalogItem::ContinualTask(_) => None,
2129 }
2130 }
2131
2132 pub fn cluster_id(&self) -> Option<ClusterId> {
2133 match self {
2134 CatalogItem::MaterializedView(mv) => Some(mv.cluster_id),
2135 CatalogItem::Index(index) => Some(index.cluster_id),
2136 CatalogItem::Source(source) => match &source.data_source {
2137 DataSourceDesc::Ingestion { cluster_id, .. } => Some(*cluster_id),
2138 DataSourceDesc::IngestionExport { .. } => None,
2142 DataSourceDesc::Webhook { cluster_id, .. } => Some(*cluster_id),
2143 DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => None,
2144 },
2145 CatalogItem::Sink(sink) => Some(sink.cluster_id),
2146 CatalogItem::ContinualTask(ct) => Some(ct.cluster_id),
2147 CatalogItem::Table(_)
2148 | CatalogItem::Log(_)
2149 | CatalogItem::View(_)
2150 | CatalogItem::Type(_)
2151 | CatalogItem::Func(_)
2152 | CatalogItem::Secret(_)
2153 | CatalogItem::Connection(_) => None,
2154 }
2155 }
2156
2157 pub fn custom_logical_compaction_window(&self) -> Option<CompactionWindow> {
2160 match self {
2161 CatalogItem::Table(table) => table.custom_logical_compaction_window,
2162 CatalogItem::Source(source) => source.custom_logical_compaction_window,
2163 CatalogItem::Index(index) => index.custom_logical_compaction_window,
2164 CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
2165 CatalogItem::Log(_)
2166 | CatalogItem::View(_)
2167 | CatalogItem::Sink(_)
2168 | CatalogItem::Type(_)
2169 | CatalogItem::Func(_)
2170 | CatalogItem::Secret(_)
2171 | CatalogItem::Connection(_)
2172 | CatalogItem::ContinualTask(_) => None,
2173 }
2174 }
2175
2176 pub fn custom_logical_compaction_window_mut(
2180 &mut self,
2181 ) -> Option<&mut Option<CompactionWindow>> {
2182 let cw = match self {
2183 CatalogItem::Table(table) => &mut table.custom_logical_compaction_window,
2184 CatalogItem::Source(source) => &mut source.custom_logical_compaction_window,
2185 CatalogItem::Index(index) => &mut index.custom_logical_compaction_window,
2186 CatalogItem::MaterializedView(mview) => &mut mview.custom_logical_compaction_window,
2187 CatalogItem::Log(_)
2188 | CatalogItem::View(_)
2189 | CatalogItem::Sink(_)
2190 | CatalogItem::Type(_)
2191 | CatalogItem::Func(_)
2192 | CatalogItem::Secret(_)
2193 | CatalogItem::Connection(_)
2194 | CatalogItem::ContinualTask(_) => return None,
2195 };
2196 Some(cw)
2197 }
2198
2199 pub fn initial_logical_compaction_window(&self) -> Option<CompactionWindow> {
2207 let custom_logical_compaction_window = match self {
2208 CatalogItem::Table(_)
2209 | CatalogItem::Source(_)
2210 | CatalogItem::Index(_)
2211 | CatalogItem::MaterializedView(_)
2212 | CatalogItem::ContinualTask(_) => self.custom_logical_compaction_window(),
2213 CatalogItem::Log(_)
2214 | CatalogItem::View(_)
2215 | CatalogItem::Sink(_)
2216 | CatalogItem::Type(_)
2217 | CatalogItem::Func(_)
2218 | CatalogItem::Secret(_)
2219 | CatalogItem::Connection(_) => return None,
2220 };
2221 Some(custom_logical_compaction_window.unwrap_or(CompactionWindow::Default))
2222 }
2223
2224 pub fn is_retained_metrics_object(&self) -> bool {
2228 match self {
2229 CatalogItem::Table(table) => table.is_retained_metrics_object,
2230 CatalogItem::Source(source) => source.is_retained_metrics_object,
2231 CatalogItem::Index(index) => index.is_retained_metrics_object,
2232 CatalogItem::Log(_)
2233 | CatalogItem::View(_)
2234 | CatalogItem::MaterializedView(_)
2235 | CatalogItem::Sink(_)
2236 | CatalogItem::Type(_)
2237 | CatalogItem::Func(_)
2238 | CatalogItem::Secret(_)
2239 | CatalogItem::Connection(_)
2240 | CatalogItem::ContinualTask(_) => false,
2241 }
2242 }
2243
2244 pub fn to_serialized(&self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2245 match self {
2246 CatalogItem::Table(table) => {
2247 let create_sql = table
2248 .create_sql
2249 .clone()
2250 .expect("builtin tables cannot be serialized");
2251 let mut collections = table.collections.clone();
2252 let global_id = collections
2253 .remove(&RelationVersion::root())
2254 .expect("at least one version");
2255 (create_sql, global_id, collections)
2256 }
2257 CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2258 CatalogItem::Source(source) => {
2259 assert!(
2260 !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2261 "cannot serialize introspection/builtin sources",
2262 );
2263 let create_sql = source
2264 .create_sql
2265 .clone()
2266 .expect("builtin sources cannot be serialized");
2267 (create_sql, source.global_id, BTreeMap::new())
2268 }
2269 CatalogItem::View(view) => (view.create_sql.clone(), view.global_id, BTreeMap::new()),
2270 CatalogItem::MaterializedView(mview) => {
2271 (mview.create_sql.clone(), mview.global_id, BTreeMap::new())
2272 }
2273 CatalogItem::Index(index) => {
2274 (index.create_sql.clone(), index.global_id, BTreeMap::new())
2275 }
2276 CatalogItem::Sink(sink) => (sink.create_sql.clone(), sink.global_id, BTreeMap::new()),
2277 CatalogItem::Type(typ) => {
2278 let create_sql = typ
2279 .create_sql
2280 .clone()
2281 .expect("builtin types cannot be serialized");
2282 (create_sql, typ.global_id, BTreeMap::new())
2283 }
2284 CatalogItem::Secret(secret) => {
2285 (secret.create_sql.clone(), secret.global_id, BTreeMap::new())
2286 }
2287 CatalogItem::Connection(connection) => (
2288 connection.create_sql.clone(),
2289 connection.global_id,
2290 BTreeMap::new(),
2291 ),
2292 CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2293 CatalogItem::ContinualTask(ct) => {
2294 (ct.create_sql.clone(), ct.global_id, BTreeMap::new())
2295 }
2296 }
2297 }
2298
2299 pub fn into_serialized(self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2300 match self {
2301 CatalogItem::Table(mut table) => {
2302 let create_sql = table
2303 .create_sql
2304 .expect("builtin tables cannot be serialized");
2305 let global_id = table
2306 .collections
2307 .remove(&RelationVersion::root())
2308 .expect("at least one version");
2309 (create_sql, global_id, table.collections)
2310 }
2311 CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2312 CatalogItem::Source(source) => {
2313 assert!(
2314 !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2315 "cannot serialize introspection/builtin sources",
2316 );
2317 let create_sql = source
2318 .create_sql
2319 .expect("builtin sources cannot be serialized");
2320 (create_sql, source.global_id, BTreeMap::new())
2321 }
2322 CatalogItem::View(view) => (view.create_sql, view.global_id, BTreeMap::new()),
2323 CatalogItem::MaterializedView(mview) => {
2324 (mview.create_sql, mview.global_id, BTreeMap::new())
2325 }
2326 CatalogItem::Index(index) => (index.create_sql, index.global_id, BTreeMap::new()),
2327 CatalogItem::Sink(sink) => (sink.create_sql, sink.global_id, BTreeMap::new()),
2328 CatalogItem::Type(typ) => {
2329 let create_sql = typ.create_sql.expect("builtin types cannot be serialized");
2330 (create_sql, typ.global_id, BTreeMap::new())
2331 }
2332 CatalogItem::Secret(secret) => (secret.create_sql, secret.global_id, BTreeMap::new()),
2333 CatalogItem::Connection(connection) => {
2334 (connection.create_sql, connection.global_id, BTreeMap::new())
2335 }
2336 CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2337 CatalogItem::ContinualTask(ct) => (ct.create_sql, ct.global_id, BTreeMap::new()),
2338 }
2339 }
2340}
2341
2342impl CatalogEntry {
2343 pub fn desc_latest(&self, name: &FullItemName) -> Result<Cow<RelationDesc>, SqlCatalogError> {
2348 self.item.desc(name, RelationVersionSelector::Latest)
2349 }
2350
2351 pub fn desc_opt_latest(&self) -> Option<Cow<RelationDesc>> {
2354 self.item.desc_opt(RelationVersionSelector::Latest)
2355 }
2356
2357 pub fn has_columns(&self) -> bool {
2359 self.desc_opt_latest().is_some()
2360 }
2361
2362 pub fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
2364 self.item.func(self)
2365 }
2366
2367 pub fn index(&self) -> Option<&Index> {
2369 match self.item() {
2370 CatalogItem::Index(idx) => Some(idx),
2371 _ => None,
2372 }
2373 }
2374
2375 pub fn materialized_view(&self) -> Option<&MaterializedView> {
2377 match self.item() {
2378 CatalogItem::MaterializedView(mv) => Some(mv),
2379 _ => None,
2380 }
2381 }
2382
2383 pub fn table(&self) -> Option<&Table> {
2385 match self.item() {
2386 CatalogItem::Table(tbl) => Some(tbl),
2387 _ => None,
2388 }
2389 }
2390
2391 pub fn source(&self) -> Option<&Source> {
2393 match self.item() {
2394 CatalogItem::Source(src) => Some(src),
2395 _ => None,
2396 }
2397 }
2398
2399 pub fn sink(&self) -> Option<&Sink> {
2401 match self.item() {
2402 CatalogItem::Sink(sink) => Some(sink),
2403 _ => None,
2404 }
2405 }
2406
2407 pub fn secret(&self) -> Option<&Secret> {
2409 match self.item() {
2410 CatalogItem::Secret(secret) => Some(secret),
2411 _ => None,
2412 }
2413 }
2414
2415 pub fn connection(&self) -> Result<&Connection, SqlCatalogError> {
2416 match self.item() {
2417 CatalogItem::Connection(connection) => Ok(connection),
2418 _ => {
2419 let db_name = match self.name().qualifiers.database_spec {
2420 ResolvedDatabaseSpecifier::Ambient => "".to_string(),
2421 ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
2422 };
2423 Err(SqlCatalogError::UnknownConnection(format!(
2424 "{}{}.{}",
2425 db_name,
2426 self.name().qualifiers.schema_spec,
2427 self.name().item
2428 )))
2429 }
2430 }
2431 }
2432
2433 pub fn source_desc(
2436 &self,
2437 ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
2438 self.item.source_desc(self)
2439 }
2440
2441 pub fn is_connection(&self) -> bool {
2443 matches!(self.item(), CatalogItem::Connection(_))
2444 }
2445
2446 pub fn is_table(&self) -> bool {
2448 matches!(self.item(), CatalogItem::Table(_))
2449 }
2450
2451 pub fn is_source(&self) -> bool {
2454 matches!(self.item(), CatalogItem::Source(_))
2455 }
2456
2457 pub fn subsource_details(
2460 &self,
2461 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
2462 match &self.item() {
2463 CatalogItem::Source(source) => match &source.data_source {
2464 DataSourceDesc::IngestionExport {
2465 ingestion_id,
2466 external_reference,
2467 details,
2468 data_config: _,
2469 } => Some((*ingestion_id, external_reference, details)),
2470 _ => None,
2471 },
2472 _ => None,
2473 }
2474 }
2475
2476 pub fn source_export_details(
2479 &self,
2480 ) -> Option<(
2481 CatalogItemId,
2482 &UnresolvedItemName,
2483 &SourceExportDetails,
2484 &SourceExportDataConfig<ReferencedConnection>,
2485 )> {
2486 match &self.item() {
2487 CatalogItem::Source(source) => match &source.data_source {
2488 DataSourceDesc::IngestionExport {
2489 ingestion_id,
2490 external_reference,
2491 details,
2492 data_config,
2493 } => Some((*ingestion_id, external_reference, details, data_config)),
2494 _ => None,
2495 },
2496 CatalogItem::Table(table) => match &table.data_source {
2497 TableDataSource::DataSource {
2498 desc:
2499 DataSourceDesc::IngestionExport {
2500 ingestion_id,
2501 external_reference,
2502 details,
2503 data_config,
2504 },
2505 timeline: _,
2506 } => Some((*ingestion_id, external_reference, details, data_config)),
2507 _ => None,
2508 },
2509 _ => None,
2510 }
2511 }
2512
2513 pub fn is_progress_source(&self) -> bool {
2515 self.item().is_progress_source()
2516 }
2517
2518 pub fn progress_id(&self) -> Option<CatalogItemId> {
2520 match &self.item() {
2521 CatalogItem::Source(source) => match &source.data_source {
2522 DataSourceDesc::Ingestion { ingestion_desc, .. } => {
2523 Some(ingestion_desc.progress_subsource)
2524 }
2525 DataSourceDesc::IngestionExport { .. }
2526 | DataSourceDesc::Introspection(_)
2527 | DataSourceDesc::Progress
2528 | DataSourceDesc::Webhook { .. } => None,
2529 },
2530 CatalogItem::Table(_)
2531 | CatalogItem::Log(_)
2532 | CatalogItem::View(_)
2533 | CatalogItem::MaterializedView(_)
2534 | CatalogItem::Sink(_)
2535 | CatalogItem::Index(_)
2536 | CatalogItem::Type(_)
2537 | CatalogItem::Func(_)
2538 | CatalogItem::Secret(_)
2539 | CatalogItem::Connection(_)
2540 | CatalogItem::ContinualTask(_) => None,
2541 }
2542 }
2543
2544 pub fn is_sink(&self) -> bool {
2546 matches!(self.item(), CatalogItem::Sink(_))
2547 }
2548
2549 pub fn is_materialized_view(&self) -> bool {
2551 matches!(self.item(), CatalogItem::MaterializedView(_))
2552 }
2553
2554 pub fn is_view(&self) -> bool {
2556 matches!(self.item(), CatalogItem::View(_))
2557 }
2558
2559 pub fn is_secret(&self) -> bool {
2561 matches!(self.item(), CatalogItem::Secret(_))
2562 }
2563
2564 pub fn is_introspection_source(&self) -> bool {
2566 matches!(self.item(), CatalogItem::Log(_))
2567 }
2568
2569 pub fn is_index(&self) -> bool {
2571 matches!(self.item(), CatalogItem::Index(_))
2572 }
2573
2574 pub fn is_continual_task(&self) -> bool {
2576 matches!(self.item(), CatalogItem::ContinualTask(_))
2577 }
2578
2579 pub fn is_relation(&self) -> bool {
2581 mz_sql::catalog::ObjectType::from(self.item_type()).is_relation()
2582 }
2583
2584 pub fn references(&self) -> &ResolvedIds {
2587 self.item.references()
2588 }
2589
2590 pub fn uses(&self) -> BTreeSet<CatalogItemId> {
2596 self.item.uses()
2597 }
2598
2599 pub fn item(&self) -> &CatalogItem {
2601 &self.item
2602 }
2603
2604 pub fn id(&self) -> CatalogItemId {
2606 self.id
2607 }
2608
2609 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2611 self.item().global_ids()
2612 }
2613
2614 pub fn latest_global_id(&self) -> GlobalId {
2615 self.item().latest_global_id()
2616 }
2617
2618 pub fn oid(&self) -> u32 {
2620 self.oid
2621 }
2622
2623 pub fn name(&self) -> &QualifiedItemName {
2625 &self.name
2626 }
2627
2628 pub fn referenced_by(&self) -> &[CatalogItemId] {
2630 &self.referenced_by
2631 }
2632
2633 pub fn used_by(&self) -> &[CatalogItemId] {
2635 &self.used_by
2636 }
2637
2638 pub fn conn_id(&self) -> Option<&ConnectionId> {
2641 self.item.conn_id()
2642 }
2643
2644 pub fn owner_id(&self) -> &RoleId {
2646 &self.owner_id
2647 }
2648
2649 pub fn privileges(&self) -> &PrivilegeMap {
2651 &self.privileges
2652 }
2653}
2654
2655#[derive(Debug, Clone, Default)]
2656pub struct CommentsMap {
2657 map: BTreeMap<CommentObjectId, BTreeMap<Option<usize>, String>>,
2658}
2659
2660impl CommentsMap {
2661 pub fn update_comment(
2662 &mut self,
2663 object_id: CommentObjectId,
2664 sub_component: Option<usize>,
2665 comment: Option<String>,
2666 ) -> Option<String> {
2667 let object_comments = self.map.entry(object_id).or_default();
2668
2669 let (empty, prev) = if let Some(comment) = comment {
2671 let prev = object_comments.insert(sub_component, comment);
2672 (false, prev)
2673 } else {
2674 let prev = object_comments.remove(&sub_component);
2675 (object_comments.is_empty(), prev)
2676 };
2677
2678 if empty {
2680 self.map.remove(&object_id);
2681 }
2682
2683 prev
2685 }
2686
2687 pub fn drop_comments(
2693 &mut self,
2694 object_ids: &BTreeSet<CommentObjectId>,
2695 ) -> Vec<(CommentObjectId, Option<usize>, String)> {
2696 let mut removed_comments = Vec::new();
2697
2698 for object_id in object_ids {
2699 if let Some(comments) = self.map.remove(object_id) {
2700 let removed = comments
2701 .into_iter()
2702 .map(|(sub_comp, comment)| (object_id.clone(), sub_comp, comment));
2703 removed_comments.extend(removed);
2704 }
2705 }
2706
2707 removed_comments
2708 }
2709
2710 pub fn iter(&self) -> impl Iterator<Item = (CommentObjectId, Option<usize>, &str)> {
2711 self.map
2712 .iter()
2713 .map(|(id, comments)| {
2714 comments
2715 .iter()
2716 .map(|(pos, comment)| (*id, *pos, comment.as_str()))
2717 })
2718 .flatten()
2719 }
2720
2721 pub fn get_object_comments(
2722 &self,
2723 object_id: CommentObjectId,
2724 ) -> Option<&BTreeMap<Option<usize>, String>> {
2725 self.map.get(&object_id)
2726 }
2727}
2728
2729impl Serialize for CommentsMap {
2730 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2731 where
2732 S: serde::Serializer,
2733 {
2734 let comment_count = self
2735 .map
2736 .iter()
2737 .map(|(_object_id, comments)| comments.len())
2738 .sum();
2739
2740 let mut seq = serializer.serialize_seq(Some(comment_count))?;
2741 for (object_id, sub) in &self.map {
2742 for (sub_component, comment) in sub {
2743 seq.serialize_element(&(
2744 format!("{object_id:?}"),
2745 format!("{sub_component:?}"),
2746 comment,
2747 ))?;
2748 }
2749 }
2750 seq.end()
2751 }
2752}
2753
2754#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
2755pub struct DefaultPrivileges {
2756 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
2757 privileges: BTreeMap<DefaultPrivilegeObject, RoleDefaultPrivileges>,
2758}
2759
2760#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
2763struct RoleDefaultPrivileges(
2764 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
2766 BTreeMap<RoleId, DefaultPrivilegeAclItem>,
2767);
2768
2769impl Deref for RoleDefaultPrivileges {
2770 type Target = BTreeMap<RoleId, DefaultPrivilegeAclItem>;
2771
2772 fn deref(&self) -> &Self::Target {
2773 &self.0
2774 }
2775}
2776
2777impl DerefMut for RoleDefaultPrivileges {
2778 fn deref_mut(&mut self) -> &mut Self::Target {
2779 &mut self.0
2780 }
2781}
2782
2783impl DefaultPrivileges {
2784 pub fn grant(&mut self, object: DefaultPrivilegeObject, privilege: DefaultPrivilegeAclItem) {
2786 if privilege.acl_mode.is_empty() {
2787 return;
2788 }
2789
2790 let privileges = self.privileges.entry(object).or_default();
2791 if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
2792 default_privilege.acl_mode |= privilege.acl_mode;
2793 } else {
2794 privileges.insert(privilege.grantee, privilege);
2795 }
2796 }
2797
2798 pub fn revoke(&mut self, object: &DefaultPrivilegeObject, privilege: &DefaultPrivilegeAclItem) {
2800 if let Some(privileges) = self.privileges.get_mut(object) {
2801 if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
2802 default_privilege.acl_mode =
2803 default_privilege.acl_mode.difference(privilege.acl_mode);
2804 if default_privilege.acl_mode.is_empty() {
2805 privileges.remove(&privilege.grantee);
2806 }
2807 }
2808 if privileges.is_empty() {
2809 self.privileges.remove(object);
2810 }
2811 }
2812 }
2813
2814 pub fn get_privileges_for_grantee(
2817 &self,
2818 object: &DefaultPrivilegeObject,
2819 grantee: &RoleId,
2820 ) -> Option<&AclMode> {
2821 self.privileges
2822 .get(object)
2823 .and_then(|privileges| privileges.get(grantee))
2824 .map(|privilege| &privilege.acl_mode)
2825 }
2826
2827 pub fn get_applicable_privileges(
2829 &self,
2830 role_id: RoleId,
2831 database_id: Option<DatabaseId>,
2832 schema_id: Option<SchemaId>,
2833 object_type: mz_sql::catalog::ObjectType,
2834 ) -> impl Iterator<Item = DefaultPrivilegeAclItem> + '_ {
2835 let privilege_object_type = if object_type.is_relation() {
2839 mz_sql::catalog::ObjectType::Table
2840 } else {
2841 object_type
2842 };
2843 let valid_acl_mode = rbac::all_object_privileges(SystemObjectType::Object(object_type));
2844
2845 [
2849 DefaultPrivilegeObject {
2850 role_id,
2851 database_id,
2852 schema_id,
2853 object_type: privilege_object_type,
2854 },
2855 DefaultPrivilegeObject {
2856 role_id,
2857 database_id,
2858 schema_id: None,
2859 object_type: privilege_object_type,
2860 },
2861 DefaultPrivilegeObject {
2862 role_id,
2863 database_id: None,
2864 schema_id: None,
2865 object_type: privilege_object_type,
2866 },
2867 DefaultPrivilegeObject {
2868 role_id: RoleId::Public,
2869 database_id,
2870 schema_id,
2871 object_type: privilege_object_type,
2872 },
2873 DefaultPrivilegeObject {
2874 role_id: RoleId::Public,
2875 database_id,
2876 schema_id: None,
2877 object_type: privilege_object_type,
2878 },
2879 DefaultPrivilegeObject {
2880 role_id: RoleId::Public,
2881 database_id: None,
2882 schema_id: None,
2883 object_type: privilege_object_type,
2884 },
2885 ]
2886 .into_iter()
2887 .filter_map(|object| self.privileges.get(&object))
2888 .flat_map(|acl_map| acl_map.values())
2889 .fold(
2891 BTreeMap::new(),
2892 |mut accum, DefaultPrivilegeAclItem { grantee, acl_mode }| {
2893 let accum_acl_mode = accum.entry(grantee).or_insert_with(AclMode::empty);
2894 *accum_acl_mode |= *acl_mode;
2895 accum
2896 },
2897 )
2898 .into_iter()
2899 .map(move |(grantee, acl_mode)| (grantee, acl_mode & valid_acl_mode))
2904 .filter(|(_, acl_mode)| !acl_mode.is_empty())
2906 .map(|(grantee, acl_mode)| DefaultPrivilegeAclItem {
2907 grantee: *grantee,
2908 acl_mode,
2909 })
2910 }
2911
2912 pub fn iter(
2913 &self,
2914 ) -> impl Iterator<
2915 Item = (
2916 &DefaultPrivilegeObject,
2917 impl Iterator<Item = &DefaultPrivilegeAclItem>,
2918 ),
2919 > {
2920 self.privileges
2921 .iter()
2922 .map(|(object, acl_map)| (object, acl_map.values()))
2923 }
2924}
2925
2926#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
2927pub struct ClusterConfig {
2928 pub variant: ClusterVariant,
2929 pub workload_class: Option<String>,
2930}
2931
2932impl ClusterConfig {
2933 pub fn features(&self) -> Option<&OptimizerFeatureOverrides> {
2934 match &self.variant {
2935 ClusterVariant::Managed(managed) => Some(&managed.optimizer_feature_overrides),
2936 ClusterVariant::Unmanaged => None,
2937 }
2938 }
2939}
2940
2941impl From<ClusterConfig> for durable::ClusterConfig {
2942 fn from(config: ClusterConfig) -> Self {
2943 Self {
2944 variant: config.variant.into(),
2945 workload_class: config.workload_class,
2946 }
2947 }
2948}
2949
2950impl From<durable::ClusterConfig> for ClusterConfig {
2951 fn from(config: durable::ClusterConfig) -> Self {
2952 Self {
2953 variant: config.variant.into(),
2954 workload_class: config.workload_class,
2955 }
2956 }
2957}
2958
2959#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
2960pub struct ClusterVariantManaged {
2961 pub size: String,
2962 pub availability_zones: Vec<String>,
2963 pub logging: ReplicaLogging,
2964 pub replication_factor: u32,
2965 pub disk: bool,
2966 pub optimizer_feature_overrides: OptimizerFeatureOverrides,
2967 pub schedule: ClusterSchedule,
2968}
2969
2970impl From<ClusterVariantManaged> for durable::ClusterVariantManaged {
2971 fn from(managed: ClusterVariantManaged) -> Self {
2972 Self {
2973 size: managed.size,
2974 availability_zones: managed.availability_zones,
2975 logging: managed.logging,
2976 replication_factor: managed.replication_factor,
2977 disk: managed.disk,
2978 optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
2979 schedule: managed.schedule,
2980 }
2981 }
2982}
2983
2984impl From<durable::ClusterVariantManaged> for ClusterVariantManaged {
2985 fn from(managed: durable::ClusterVariantManaged) -> Self {
2986 Self {
2987 size: managed.size,
2988 availability_zones: managed.availability_zones,
2989 logging: managed.logging,
2990 replication_factor: managed.replication_factor,
2991 disk: managed.disk,
2992 optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
2993 schedule: managed.schedule,
2994 }
2995 }
2996}
2997
2998#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
2999pub enum ClusterVariant {
3000 Managed(ClusterVariantManaged),
3001 Unmanaged,
3002}
3003
3004impl From<ClusterVariant> for durable::ClusterVariant {
3005 fn from(variant: ClusterVariant) -> Self {
3006 match variant {
3007 ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3008 ClusterVariant::Unmanaged => Self::Unmanaged,
3009 }
3010 }
3011}
3012
3013impl From<durable::ClusterVariant> for ClusterVariant {
3014 fn from(variant: durable::ClusterVariant) -> Self {
3015 match variant {
3016 durable::ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3017 durable::ClusterVariant::Unmanaged => Self::Unmanaged,
3018 }
3019 }
3020}
3021
3022impl mz_sql::catalog::CatalogDatabase for Database {
3023 fn name(&self) -> &str {
3024 &self.name
3025 }
3026
3027 fn id(&self) -> DatabaseId {
3028 self.id
3029 }
3030
3031 fn has_schemas(&self) -> bool {
3032 !self.schemas_by_name.is_empty()
3033 }
3034
3035 fn schema_ids(&self) -> &BTreeMap<String, SchemaId> {
3036 &self.schemas_by_name
3037 }
3038
3039 #[allow(clippy::as_conversions)]
3041 fn schemas(&self) -> Vec<&dyn CatalogSchema> {
3042 self.schemas_by_id
3043 .values()
3044 .map(|schema| schema as &dyn CatalogSchema)
3045 .collect()
3046 }
3047
3048 fn owner_id(&self) -> RoleId {
3049 self.owner_id
3050 }
3051
3052 fn privileges(&self) -> &PrivilegeMap {
3053 &self.privileges
3054 }
3055}
3056
3057impl mz_sql::catalog::CatalogSchema for Schema {
3058 fn database(&self) -> &ResolvedDatabaseSpecifier {
3059 &self.name.database
3060 }
3061
3062 fn name(&self) -> &QualifiedSchemaName {
3063 &self.name
3064 }
3065
3066 fn id(&self) -> &SchemaSpecifier {
3067 &self.id
3068 }
3069
3070 fn has_items(&self) -> bool {
3071 !self.items.is_empty()
3072 }
3073
3074 fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_> {
3075 Box::new(
3076 self.items
3077 .values()
3078 .chain(self.functions.values())
3079 .chain(self.types.values())
3080 .copied(),
3081 )
3082 }
3083
3084 fn owner_id(&self) -> RoleId {
3085 self.owner_id
3086 }
3087
3088 fn privileges(&self) -> &PrivilegeMap {
3089 &self.privileges
3090 }
3091}
3092
3093impl mz_sql::catalog::CatalogRole for Role {
3094 fn name(&self) -> &str {
3095 &self.name
3096 }
3097
3098 fn id(&self) -> RoleId {
3099 self.id
3100 }
3101
3102 fn membership(&self) -> &BTreeMap<RoleId, RoleId> {
3103 &self.membership.map
3104 }
3105
3106 fn attributes(&self) -> &RoleAttributes {
3107 &self.attributes
3108 }
3109
3110 fn vars(&self) -> &BTreeMap<String, OwnedVarInput> {
3111 &self.vars.map
3112 }
3113}
3114
3115impl mz_sql::catalog::CatalogNetworkPolicy for NetworkPolicy {
3116 fn name(&self) -> &str {
3117 &self.name
3118 }
3119
3120 fn id(&self) -> NetworkPolicyId {
3121 self.id
3122 }
3123
3124 fn owner_id(&self) -> RoleId {
3125 self.owner_id
3126 }
3127
3128 fn privileges(&self) -> &PrivilegeMap {
3129 &self.privileges
3130 }
3131}
3132
3133impl mz_sql::catalog::CatalogCluster<'_> for Cluster {
3134 fn name(&self) -> &str {
3135 &self.name
3136 }
3137
3138 fn id(&self) -> ClusterId {
3139 self.id
3140 }
3141
3142 fn bound_objects(&self) -> &BTreeSet<CatalogItemId> {
3143 &self.bound_objects
3144 }
3145
3146 fn replica_ids(&self) -> &BTreeMap<String, ReplicaId> {
3147 &self.replica_id_by_name_
3148 }
3149
3150 #[allow(clippy::as_conversions)]
3152 fn replicas(&self) -> Vec<&dyn CatalogClusterReplica> {
3153 self.replicas()
3154 .map(|replica| replica as &dyn CatalogClusterReplica)
3155 .collect()
3156 }
3157
3158 fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica {
3159 self.replica(id).expect("catalog out of sync")
3160 }
3161
3162 fn owner_id(&self) -> RoleId {
3163 self.owner_id
3164 }
3165
3166 fn privileges(&self) -> &PrivilegeMap {
3167 &self.privileges
3168 }
3169
3170 fn is_managed(&self) -> bool {
3171 self.is_managed()
3172 }
3173
3174 fn managed_size(&self) -> Option<&str> {
3175 match &self.config.variant {
3176 ClusterVariant::Managed(ClusterVariantManaged { size, .. }) => Some(size),
3177 _ => None,
3178 }
3179 }
3180
3181 fn schedule(&self) -> Option<&ClusterSchedule> {
3182 match &self.config.variant {
3183 ClusterVariant::Managed(ClusterVariantManaged { schedule, .. }) => Some(schedule),
3184 _ => None,
3185 }
3186 }
3187
3188 fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
3189 self.try_to_plan()
3190 }
3191}
3192
3193impl mz_sql::catalog::CatalogClusterReplica<'_> for ClusterReplica {
3194 fn name(&self) -> &str {
3195 &self.name
3196 }
3197
3198 fn cluster_id(&self) -> ClusterId {
3199 self.cluster_id
3200 }
3201
3202 fn replica_id(&self) -> ReplicaId {
3203 self.replica_id
3204 }
3205
3206 fn owner_id(&self) -> RoleId {
3207 self.owner_id
3208 }
3209
3210 fn internal(&self) -> bool {
3211 self.config.location.internal()
3212 }
3213}
3214
3215impl mz_sql::catalog::CatalogItem for CatalogEntry {
3216 fn name(&self) -> &QualifiedItemName {
3217 self.name()
3218 }
3219
3220 fn id(&self) -> CatalogItemId {
3221 self.id()
3222 }
3223
3224 fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
3225 Box::new(self.global_ids())
3226 }
3227
3228 fn oid(&self) -> u32 {
3229 self.oid()
3230 }
3231
3232 fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
3233 self.func()
3234 }
3235
3236 fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
3237 self.source_desc()
3238 }
3239
3240 fn connection(
3241 &self,
3242 ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
3243 {
3244 Ok(self.connection()?.details.to_connection())
3245 }
3246
3247 fn create_sql(&self) -> &str {
3248 match self.item() {
3249 CatalogItem::Table(Table { create_sql, .. }) => {
3250 create_sql.as_deref().unwrap_or("<builtin>")
3251 }
3252 CatalogItem::Source(Source { create_sql, .. }) => {
3253 create_sql.as_deref().unwrap_or("<builtin>")
3254 }
3255 CatalogItem::Sink(Sink { create_sql, .. }) => create_sql,
3256 CatalogItem::View(View { create_sql, .. }) => create_sql,
3257 CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) => create_sql,
3258 CatalogItem::Index(Index { create_sql, .. }) => create_sql,
3259 CatalogItem::Type(Type { create_sql, .. }) => {
3260 create_sql.as_deref().unwrap_or("<builtin>")
3261 }
3262 CatalogItem::Secret(Secret { create_sql, .. }) => create_sql,
3263 CatalogItem::Connection(Connection { create_sql, .. }) => create_sql,
3264 CatalogItem::Func(_) => "<builtin>",
3265 CatalogItem::Log(_) => "<builtin>",
3266 CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => create_sql,
3267 }
3268 }
3269
3270 fn item_type(&self) -> SqlCatalogItemType {
3271 self.item().typ()
3272 }
3273
3274 fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
3275 if let CatalogItem::Index(Index { keys, on, .. }) = self.item() {
3276 Some((keys, *on))
3277 } else {
3278 None
3279 }
3280 }
3281
3282 fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
3283 if let CatalogItem::Table(Table {
3284 data_source: TableDataSource::TableWrites { defaults },
3285 ..
3286 }) = self.item()
3287 {
3288 Some(defaults.as_slice())
3289 } else {
3290 None
3291 }
3292 }
3293
3294 fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
3295 if let CatalogItem::Type(Type { details, .. }) = self.item() {
3296 Some(details)
3297 } else {
3298 None
3299 }
3300 }
3301
3302 fn references(&self) -> &ResolvedIds {
3303 self.references()
3304 }
3305
3306 fn uses(&self) -> BTreeSet<CatalogItemId> {
3307 self.uses()
3308 }
3309
3310 fn referenced_by(&self) -> &[CatalogItemId] {
3311 self.referenced_by()
3312 }
3313
3314 fn used_by(&self) -> &[CatalogItemId] {
3315 self.used_by()
3316 }
3317
3318 fn subsource_details(
3319 &self,
3320 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
3321 self.subsource_details()
3322 }
3323
3324 fn source_export_details(
3325 &self,
3326 ) -> Option<(
3327 CatalogItemId,
3328 &UnresolvedItemName,
3329 &SourceExportDetails,
3330 &SourceExportDataConfig<ReferencedConnection>,
3331 )> {
3332 self.source_export_details()
3333 }
3334
3335 fn is_progress_source(&self) -> bool {
3336 self.is_progress_source()
3337 }
3338
3339 fn progress_id(&self) -> Option<CatalogItemId> {
3340 self.progress_id()
3341 }
3342
3343 fn owner_id(&self) -> RoleId {
3344 self.owner_id
3345 }
3346
3347 fn privileges(&self) -> &PrivilegeMap {
3348 &self.privileges
3349 }
3350
3351 fn cluster_id(&self) -> Option<ClusterId> {
3352 self.item().cluster_id()
3353 }
3354
3355 fn at_version(
3356 &self,
3357 version: RelationVersionSelector,
3358 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
3359 Box::new(CatalogCollectionEntry {
3360 entry: self.clone(),
3361 version,
3362 })
3363 }
3364
3365 fn latest_version(&self) -> Option<RelationVersion> {
3366 self.table().map(|t| t.desc.latest_version())
3367 }
3368}
3369
3370#[derive(Debug)]
3372pub struct StateUpdate {
3373 pub kind: StateUpdateKind,
3374 pub ts: Timestamp,
3375 pub diff: StateDiff,
3376}
3377
3378#[derive(Debug, Clone)]
3382pub enum StateUpdateKind {
3383 Role(durable::objects::Role),
3384 RoleAuth(durable::objects::RoleAuth),
3385 Database(durable::objects::Database),
3386 Schema(durable::objects::Schema),
3387 DefaultPrivilege(durable::objects::DefaultPrivilege),
3388 SystemPrivilege(MzAclItem),
3389 SystemConfiguration(durable::objects::SystemConfiguration),
3390 Cluster(durable::objects::Cluster),
3391 NetworkPolicy(durable::objects::NetworkPolicy),
3392 IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3393 ClusterReplica(durable::objects::ClusterReplica),
3394 SourceReferences(durable::objects::SourceReferences),
3395 SystemObjectMapping(durable::objects::SystemObjectMapping),
3396 TemporaryItem(TemporaryItem),
3399 Item(durable::objects::Item),
3400 Comment(durable::objects::Comment),
3401 AuditLog(durable::objects::AuditLog),
3402 StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3404 UnfinalizedShard(durable::objects::UnfinalizedShard),
3405}
3406
3407#[derive(Debug, Clone, Copy, Eq, PartialEq)]
3409pub enum StateDiff {
3410 Retraction,
3411 Addition,
3412}
3413
3414impl From<StateDiff> for Diff {
3415 fn from(diff: StateDiff) -> Self {
3416 match diff {
3417 StateDiff::Retraction => Diff::MINUS_ONE,
3418 StateDiff::Addition => Diff::ONE,
3419 }
3420 }
3421}
3422impl TryFrom<Diff> for StateDiff {
3423 type Error = String;
3424
3425 fn try_from(diff: Diff) -> Result<Self, Self::Error> {
3426 match diff {
3427 Diff::MINUS_ONE => Ok(Self::Retraction),
3428 Diff::ONE => Ok(Self::Addition),
3429 diff => Err(format!("invalid diff {diff}")),
3430 }
3431 }
3432}
3433
3434#[derive(Debug, Clone)]
3436pub struct TemporaryItem {
3437 pub id: CatalogItemId,
3438 pub oid: u32,
3439 pub name: QualifiedItemName,
3440 pub item: CatalogItem,
3441 pub owner_id: RoleId,
3442 pub privileges: PrivilegeMap,
3443}
3444
3445impl From<CatalogEntry> for TemporaryItem {
3446 fn from(entry: CatalogEntry) -> Self {
3447 TemporaryItem {
3448 id: entry.id,
3449 oid: entry.oid,
3450 name: entry.name,
3451 item: entry.item,
3452 owner_id: entry.owner_id,
3453 privileges: entry.privileges,
3454 }
3455 }
3456}
3457
3458#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
3460pub enum BootstrapStateUpdateKind {
3461 Role(durable::objects::Role),
3462 RoleAuth(durable::objects::RoleAuth),
3463 Database(durable::objects::Database),
3464 Schema(durable::objects::Schema),
3465 DefaultPrivilege(durable::objects::DefaultPrivilege),
3466 SystemPrivilege(MzAclItem),
3467 SystemConfiguration(durable::objects::SystemConfiguration),
3468 Cluster(durable::objects::Cluster),
3469 NetworkPolicy(durable::objects::NetworkPolicy),
3470 IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3471 ClusterReplica(durable::objects::ClusterReplica),
3472 SourceReferences(durable::objects::SourceReferences),
3473 SystemObjectMapping(durable::objects::SystemObjectMapping),
3474 Item(durable::objects::Item),
3475 Comment(durable::objects::Comment),
3476 AuditLog(durable::objects::AuditLog),
3477 StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3479 UnfinalizedShard(durable::objects::UnfinalizedShard),
3480}
3481
3482impl From<BootstrapStateUpdateKind> for StateUpdateKind {
3483 fn from(value: BootstrapStateUpdateKind) -> Self {
3484 match value {
3485 BootstrapStateUpdateKind::Role(kind) => StateUpdateKind::Role(kind),
3486 BootstrapStateUpdateKind::RoleAuth(kind) => StateUpdateKind::RoleAuth(kind),
3487 BootstrapStateUpdateKind::Database(kind) => StateUpdateKind::Database(kind),
3488 BootstrapStateUpdateKind::Schema(kind) => StateUpdateKind::Schema(kind),
3489 BootstrapStateUpdateKind::DefaultPrivilege(kind) => {
3490 StateUpdateKind::DefaultPrivilege(kind)
3491 }
3492 BootstrapStateUpdateKind::SystemPrivilege(kind) => {
3493 StateUpdateKind::SystemPrivilege(kind)
3494 }
3495 BootstrapStateUpdateKind::SystemConfiguration(kind) => {
3496 StateUpdateKind::SystemConfiguration(kind)
3497 }
3498 BootstrapStateUpdateKind::SourceReferences(kind) => {
3499 StateUpdateKind::SourceReferences(kind)
3500 }
3501 BootstrapStateUpdateKind::Cluster(kind) => StateUpdateKind::Cluster(kind),
3502 BootstrapStateUpdateKind::NetworkPolicy(kind) => StateUpdateKind::NetworkPolicy(kind),
3503 BootstrapStateUpdateKind::IntrospectionSourceIndex(kind) => {
3504 StateUpdateKind::IntrospectionSourceIndex(kind)
3505 }
3506 BootstrapStateUpdateKind::ClusterReplica(kind) => StateUpdateKind::ClusterReplica(kind),
3507 BootstrapStateUpdateKind::SystemObjectMapping(kind) => {
3508 StateUpdateKind::SystemObjectMapping(kind)
3509 }
3510 BootstrapStateUpdateKind::Item(kind) => StateUpdateKind::Item(kind),
3511 BootstrapStateUpdateKind::Comment(kind) => StateUpdateKind::Comment(kind),
3512 BootstrapStateUpdateKind::AuditLog(kind) => StateUpdateKind::AuditLog(kind),
3513 BootstrapStateUpdateKind::StorageCollectionMetadata(kind) => {
3514 StateUpdateKind::StorageCollectionMetadata(kind)
3515 }
3516 BootstrapStateUpdateKind::UnfinalizedShard(kind) => {
3517 StateUpdateKind::UnfinalizedShard(kind)
3518 }
3519 }
3520 }
3521}
3522
3523impl TryFrom<StateUpdateKind> for BootstrapStateUpdateKind {
3524 type Error = TemporaryItem;
3525
3526 fn try_from(value: StateUpdateKind) -> Result<Self, Self::Error> {
3527 match value {
3528 StateUpdateKind::Role(kind) => Ok(BootstrapStateUpdateKind::Role(kind)),
3529 StateUpdateKind::RoleAuth(kind) => Ok(BootstrapStateUpdateKind::RoleAuth(kind)),
3530 StateUpdateKind::Database(kind) => Ok(BootstrapStateUpdateKind::Database(kind)),
3531 StateUpdateKind::Schema(kind) => Ok(BootstrapStateUpdateKind::Schema(kind)),
3532 StateUpdateKind::DefaultPrivilege(kind) => {
3533 Ok(BootstrapStateUpdateKind::DefaultPrivilege(kind))
3534 }
3535 StateUpdateKind::SystemPrivilege(kind) => {
3536 Ok(BootstrapStateUpdateKind::SystemPrivilege(kind))
3537 }
3538 StateUpdateKind::SystemConfiguration(kind) => {
3539 Ok(BootstrapStateUpdateKind::SystemConfiguration(kind))
3540 }
3541 StateUpdateKind::Cluster(kind) => Ok(BootstrapStateUpdateKind::Cluster(kind)),
3542 StateUpdateKind::NetworkPolicy(kind) => {
3543 Ok(BootstrapStateUpdateKind::NetworkPolicy(kind))
3544 }
3545 StateUpdateKind::IntrospectionSourceIndex(kind) => {
3546 Ok(BootstrapStateUpdateKind::IntrospectionSourceIndex(kind))
3547 }
3548 StateUpdateKind::ClusterReplica(kind) => {
3549 Ok(BootstrapStateUpdateKind::ClusterReplica(kind))
3550 }
3551 StateUpdateKind::SourceReferences(kind) => {
3552 Ok(BootstrapStateUpdateKind::SourceReferences(kind))
3553 }
3554 StateUpdateKind::SystemObjectMapping(kind) => {
3555 Ok(BootstrapStateUpdateKind::SystemObjectMapping(kind))
3556 }
3557 StateUpdateKind::TemporaryItem(kind) => Err(kind),
3558 StateUpdateKind::Item(kind) => Ok(BootstrapStateUpdateKind::Item(kind)),
3559 StateUpdateKind::Comment(kind) => Ok(BootstrapStateUpdateKind::Comment(kind)),
3560 StateUpdateKind::AuditLog(kind) => Ok(BootstrapStateUpdateKind::AuditLog(kind)),
3561 StateUpdateKind::StorageCollectionMetadata(kind) => {
3562 Ok(BootstrapStateUpdateKind::StorageCollectionMetadata(kind))
3563 }
3564 StateUpdateKind::UnfinalizedShard(kind) => {
3565 Ok(BootstrapStateUpdateKind::UnfinalizedShard(kind))
3566 }
3567 }
3568 }
3569}