1use std::borrow::Cow;
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::{Arc, LazyLock};
18
19use chrono::{DateTime, Utc};
20use mz_adapter_types::compaction::CompactionWindow;
21use mz_adapter_types::connection::ConnectionId;
22use mz_compute_client::logging::LogVariant;
23use mz_controller::clusters::{ClusterRole, ClusterStatus, ReplicaConfig, ReplicaLogging};
24use mz_controller_types::{ClusterId, ReplicaId};
25use mz_expr::{MirScalarExpr, OptimizedMirRelationExpr};
26use mz_ore::collections::CollectionExt;
27use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
28use mz_repr::network_policy_id::NetworkPolicyId;
29use mz_repr::optimize::OptimizerFeatureOverrides;
30use mz_repr::refresh_schedule::RefreshSchedule;
31use mz_repr::role_id::RoleId;
32use mz_repr::{
33 CatalogItemId, ColumnName, Diff, GlobalId, RelationDesc, RelationVersion,
34 RelationVersionSelector, SqlColumnType, Timestamp, VersionedRelationDesc,
35};
36use mz_sql::ast::display::AstDisplay;
37use mz_sql::ast::{
38 ColumnDef, ColumnOption, ColumnOptionDef, ColumnVersioned, Expr, Raw, RawDataType, Statement,
39 UnresolvedItemName, Value, WithOptionValue,
40};
41use mz_sql::catalog::{
42 CatalogClusterReplica, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem,
43 CatalogItemType as SqlCatalogItemType, CatalogItemType, CatalogSchema, CatalogTypeDetails,
44 DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, RoleAttributes, RoleMembership,
45 RoleVars, SystemObjectType,
46};
47use mz_sql::names::{
48 Aug, CommentObjectId, DatabaseId, DependencyIds, FullItemName, QualifiedItemName,
49 QualifiedSchemaName, ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier,
50};
51use mz_sql::plan::{
52 ClusterSchedule, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, ConnectionDetails,
53 CreateClusterManagedPlan, CreateClusterPlan, CreateClusterVariant, CreateSourcePlan,
54 HirRelationExpr, Ingestion as PlanIngestion, NetworkPolicyRule, PlanError, WebhookBodyFormat,
55 WebhookHeaders, WebhookValidation,
56};
57use mz_sql::rbac;
58use mz_sql::session::vars::OwnedVarInput;
59use mz_storage_client::controller::IntrospectionType;
60use mz_storage_types::connections::inline::ReferencedConnection;
61use mz_storage_types::sinks::{SinkEnvelope, StorageSinkConnection};
62use mz_storage_types::sources::{
63 GenericSourceConnection, SourceConnection, SourceDesc, SourceEnvelope, SourceExportDataConfig,
64 SourceExportDetails, Timeline,
65};
66use serde::ser::SerializeSeq;
67use serde::{Deserialize, Serialize};
68use timely::progress::Antichain;
69use tracing::debug;
70
71use crate::builtin::{MZ_CATALOG_SERVER_CLUSTER, MZ_SYSTEM_CLUSTER};
72use crate::durable;
73
74pub trait UpdateFrom<T>: From<T> {
76 fn update_from(&mut self, from: T);
77}
78
79#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
80pub struct Database {
81 pub name: String,
82 pub id: DatabaseId,
83 pub oid: u32,
84 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
85 pub schemas_by_id: BTreeMap<SchemaId, Schema>,
86 pub schemas_by_name: BTreeMap<String, SchemaId>,
87 pub owner_id: RoleId,
88 pub privileges: PrivilegeMap,
89}
90
91impl From<Database> for durable::Database {
92 fn from(database: Database) -> durable::Database {
93 durable::Database {
94 id: database.id,
95 oid: database.oid,
96 name: database.name,
97 owner_id: database.owner_id,
98 privileges: database.privileges.into_all_values().collect(),
99 }
100 }
101}
102
103impl From<durable::Database> for Database {
104 fn from(
105 durable::Database {
106 id,
107 oid,
108 name,
109 owner_id,
110 privileges,
111 }: durable::Database,
112 ) -> Database {
113 Database {
114 id,
115 oid,
116 schemas_by_id: BTreeMap::new(),
117 schemas_by_name: BTreeMap::new(),
118 name,
119 owner_id,
120 privileges: PrivilegeMap::from_mz_acl_items(privileges),
121 }
122 }
123}
124
125impl UpdateFrom<durable::Database> for Database {
126 fn update_from(
127 &mut self,
128 durable::Database {
129 id,
130 oid,
131 name,
132 owner_id,
133 privileges,
134 }: durable::Database,
135 ) {
136 self.id = id;
137 self.oid = oid;
138 self.name = name;
139 self.owner_id = owner_id;
140 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
141 }
142}
143
144#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
145pub struct Schema {
146 pub name: QualifiedSchemaName,
147 pub id: SchemaSpecifier,
148 pub oid: u32,
149 pub items: BTreeMap<String, CatalogItemId>,
150 pub functions: BTreeMap<String, CatalogItemId>,
151 pub types: BTreeMap<String, CatalogItemId>,
152 pub owner_id: RoleId,
153 pub privileges: PrivilegeMap,
154}
155
156impl From<Schema> for durable::Schema {
157 fn from(schema: Schema) -> durable::Schema {
158 durable::Schema {
159 id: schema.id.into(),
160 oid: schema.oid,
161 name: schema.name.schema,
162 database_id: schema.name.database.id(),
163 owner_id: schema.owner_id,
164 privileges: schema.privileges.into_all_values().collect(),
165 }
166 }
167}
168
169impl From<durable::Schema> for Schema {
170 fn from(
171 durable::Schema {
172 id,
173 oid,
174 name,
175 database_id,
176 owner_id,
177 privileges,
178 }: durable::Schema,
179 ) -> Schema {
180 Schema {
181 name: QualifiedSchemaName {
182 database: database_id.into(),
183 schema: name,
184 },
185 id: id.into(),
186 oid,
187 items: BTreeMap::new(),
188 functions: BTreeMap::new(),
189 types: BTreeMap::new(),
190 owner_id,
191 privileges: PrivilegeMap::from_mz_acl_items(privileges),
192 }
193 }
194}
195
196impl UpdateFrom<durable::Schema> for Schema {
197 fn update_from(
198 &mut self,
199 durable::Schema {
200 id,
201 oid,
202 name,
203 database_id,
204 owner_id,
205 privileges,
206 }: durable::Schema,
207 ) {
208 self.name = QualifiedSchemaName {
209 database: database_id.into(),
210 schema: name,
211 };
212 self.id = id.into();
213 self.oid = oid;
214 self.owner_id = owner_id;
215 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
216 }
217}
218
219#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
220pub struct Role {
221 pub name: String,
222 pub id: RoleId,
223 pub oid: u32,
224 pub attributes: RoleAttributes,
225 pub membership: RoleMembership,
226 pub vars: RoleVars,
227}
228
229impl Role {
230 pub fn is_user(&self) -> bool {
231 self.id.is_user()
232 }
233
234 pub fn vars<'a>(&'a self) -> impl Iterator<Item = (&'a str, &'a OwnedVarInput)> {
235 self.vars.map.iter().map(|(name, val)| (name.as_str(), val))
236 }
237}
238
239impl From<Role> for durable::Role {
240 fn from(role: Role) -> durable::Role {
241 durable::Role {
242 id: role.id,
243 oid: role.oid,
244 name: role.name,
245 attributes: role.attributes,
246 membership: role.membership,
247 vars: role.vars,
248 }
249 }
250}
251
252impl From<durable::Role> for Role {
253 fn from(
254 durable::Role {
255 id,
256 oid,
257 name,
258 attributes,
259 membership,
260 vars,
261 }: durable::Role,
262 ) -> Self {
263 Role {
264 name,
265 id,
266 oid,
267 attributes,
268 membership,
269 vars,
270 }
271 }
272}
273
274impl UpdateFrom<durable::Role> for Role {
275 fn update_from(
276 &mut self,
277 durable::Role {
278 id,
279 oid,
280 name,
281 attributes,
282 membership,
283 vars,
284 }: durable::Role,
285 ) {
286 self.id = id;
287 self.oid = oid;
288 self.name = name;
289 self.attributes = attributes;
290 self.membership = membership;
291 self.vars = vars;
292 }
293}
294
295#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
296pub struct RoleAuth {
297 pub role_id: RoleId,
298 pub password_hash: Option<String>,
299 pub updated_at: u64,
300}
301
302impl From<RoleAuth> for durable::RoleAuth {
303 fn from(role_auth: RoleAuth) -> durable::RoleAuth {
304 durable::RoleAuth {
305 role_id: role_auth.role_id,
306 password_hash: role_auth.password_hash,
307 updated_at: role_auth.updated_at,
308 }
309 }
310}
311
312impl From<durable::RoleAuth> for RoleAuth {
313 fn from(
314 durable::RoleAuth {
315 role_id,
316 password_hash,
317 updated_at,
318 }: durable::RoleAuth,
319 ) -> RoleAuth {
320 RoleAuth {
321 role_id,
322 password_hash,
323 updated_at,
324 }
325 }
326}
327
328impl UpdateFrom<durable::RoleAuth> for RoleAuth {
329 fn update_from(&mut self, from: durable::RoleAuth) {
330 self.role_id = from.role_id;
331 self.password_hash = from.password_hash;
332 }
333}
334
335#[derive(Debug, Serialize, Clone, PartialEq)]
336pub struct Cluster {
337 pub name: String,
338 pub id: ClusterId,
339 pub config: ClusterConfig,
340 #[serde(skip)]
341 pub log_indexes: BTreeMap<LogVariant, GlobalId>,
342 pub bound_objects: BTreeSet<CatalogItemId>,
345 pub replica_id_by_name_: BTreeMap<String, ReplicaId>,
346 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
347 pub replicas_by_id_: BTreeMap<ReplicaId, ClusterReplica>,
348 pub owner_id: RoleId,
349 pub privileges: PrivilegeMap,
350}
351
352impl Cluster {
353 pub fn role(&self) -> ClusterRole {
355 if self.name == MZ_SYSTEM_CLUSTER.name {
358 ClusterRole::SystemCritical
359 } else if self.name == MZ_CATALOG_SERVER_CLUSTER.name {
360 ClusterRole::System
361 } else {
362 ClusterRole::User
363 }
364 }
365
366 pub fn is_managed(&self) -> bool {
368 matches!(self.config.variant, ClusterVariant::Managed { .. })
369 }
370
371 pub fn user_replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
373 self.replicas().filter(|r| !r.config.location.internal())
374 }
375
376 pub fn replicas(&self) -> impl Iterator<Item = &ClusterReplica> {
378 self.replicas_by_id_.values()
379 }
380
381 pub fn replica(&self, replica_id: ReplicaId) -> Option<&ClusterReplica> {
383 self.replicas_by_id_.get(&replica_id)
384 }
385
386 pub fn replica_id(&self, name: &str) -> Option<ReplicaId> {
388 self.replica_id_by_name_.get(name).copied()
389 }
390
391 pub fn availability_zones(&self) -> Option<&[String]> {
393 match &self.config.variant {
394 ClusterVariant::Managed(managed) => Some(&managed.availability_zones),
395 ClusterVariant::Unmanaged => None,
396 }
397 }
398
399 pub fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
400 let name = self.name.clone();
401 let variant = match &self.config.variant {
402 ClusterVariant::Managed(ClusterVariantManaged {
403 size,
404 availability_zones,
405 logging,
406 replication_factor,
407 optimizer_feature_overrides,
408 schedule,
409 }) => {
410 let introspection = match logging {
411 ReplicaLogging {
412 log_logging,
413 interval: Some(interval),
414 } => Some(ComputeReplicaIntrospectionConfig {
415 debugging: *log_logging,
416 interval: interval.clone(),
417 }),
418 ReplicaLogging {
419 log_logging: _,
420 interval: None,
421 } => None,
422 };
423 let compute = ComputeReplicaConfig { introspection };
424 CreateClusterVariant::Managed(CreateClusterManagedPlan {
425 replication_factor: replication_factor.clone(),
426 size: size.clone(),
427 availability_zones: availability_zones.clone(),
428 compute,
429 optimizer_feature_overrides: optimizer_feature_overrides.clone(),
430 schedule: schedule.clone(),
431 })
432 }
433 ClusterVariant::Unmanaged => {
434 return Err(PlanError::Unsupported {
437 feature: "SHOW CREATE for unmanaged clusters".to_string(),
438 discussion_no: None,
439 });
440 }
441 };
442 let workload_class = self.config.workload_class.clone();
443 Ok(CreateClusterPlan {
444 name,
445 variant,
446 workload_class,
447 })
448 }
449}
450
451impl From<Cluster> for durable::Cluster {
452 fn from(cluster: Cluster) -> durable::Cluster {
453 durable::Cluster {
454 id: cluster.id,
455 name: cluster.name,
456 owner_id: cluster.owner_id,
457 privileges: cluster.privileges.into_all_values().collect(),
458 config: cluster.config.into(),
459 }
460 }
461}
462
463impl From<durable::Cluster> for Cluster {
464 fn from(
465 durable::Cluster {
466 id,
467 name,
468 owner_id,
469 privileges,
470 config,
471 }: durable::Cluster,
472 ) -> Self {
473 Cluster {
474 name: name.clone(),
475 id,
476 bound_objects: BTreeSet::new(),
477 log_indexes: BTreeMap::new(),
478 replica_id_by_name_: BTreeMap::new(),
479 replicas_by_id_: BTreeMap::new(),
480 owner_id,
481 privileges: PrivilegeMap::from_mz_acl_items(privileges),
482 config: config.into(),
483 }
484 }
485}
486
487impl UpdateFrom<durable::Cluster> for Cluster {
488 fn update_from(
489 &mut self,
490 durable::Cluster {
491 id,
492 name,
493 owner_id,
494 privileges,
495 config,
496 }: durable::Cluster,
497 ) {
498 self.id = id;
499 self.name = name;
500 self.owner_id = owner_id;
501 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
502 self.config = config.into();
503 }
504}
505
506#[derive(Debug, Serialize, Clone, PartialEq)]
507pub struct ClusterReplica {
508 pub name: String,
509 pub cluster_id: ClusterId,
510 pub replica_id: ReplicaId,
511 pub config: ReplicaConfig,
512 pub owner_id: RoleId,
513}
514
515impl From<ClusterReplica> for durable::ClusterReplica {
516 fn from(replica: ClusterReplica) -> durable::ClusterReplica {
517 durable::ClusterReplica {
518 cluster_id: replica.cluster_id,
519 replica_id: replica.replica_id,
520 name: replica.name,
521 config: replica.config.into(),
522 owner_id: replica.owner_id,
523 }
524 }
525}
526
527#[derive(Debug, Serialize, Clone, PartialEq, Eq)]
528pub struct ClusterReplicaProcessStatus {
529 pub status: ClusterStatus,
530 pub time: DateTime<Utc>,
531}
532
533#[derive(Debug, Serialize, Clone, PartialEq)]
534pub struct SourceReferences {
535 pub updated_at: u64,
536 pub references: Vec<SourceReference>,
537}
538
539#[derive(Debug, Serialize, Clone, PartialEq)]
540pub struct SourceReference {
541 pub name: String,
542 pub namespace: Option<String>,
543 pub columns: Vec<String>,
544}
545
546impl From<SourceReference> for durable::SourceReference {
547 fn from(source_reference: SourceReference) -> durable::SourceReference {
548 durable::SourceReference {
549 name: source_reference.name,
550 namespace: source_reference.namespace,
551 columns: source_reference.columns,
552 }
553 }
554}
555
556impl SourceReferences {
557 pub fn to_durable(self, source_id: CatalogItemId) -> durable::SourceReferences {
558 durable::SourceReferences {
559 source_id,
560 updated_at: self.updated_at,
561 references: self.references.into_iter().map(Into::into).collect(),
562 }
563 }
564}
565
566impl From<durable::SourceReference> for SourceReference {
567 fn from(source_reference: durable::SourceReference) -> SourceReference {
568 SourceReference {
569 name: source_reference.name,
570 namespace: source_reference.namespace,
571 columns: source_reference.columns,
572 }
573 }
574}
575
576impl From<durable::SourceReferences> for SourceReferences {
577 fn from(source_references: durable::SourceReferences) -> SourceReferences {
578 SourceReferences {
579 updated_at: source_references.updated_at,
580 references: source_references
581 .references
582 .into_iter()
583 .map(|source_reference| source_reference.into())
584 .collect(),
585 }
586 }
587}
588
589impl From<mz_sql::plan::SourceReference> for SourceReference {
590 fn from(source_reference: mz_sql::plan::SourceReference) -> SourceReference {
591 SourceReference {
592 name: source_reference.name,
593 namespace: source_reference.namespace,
594 columns: source_reference.columns,
595 }
596 }
597}
598
599impl From<mz_sql::plan::SourceReferences> for SourceReferences {
600 fn from(source_references: mz_sql::plan::SourceReferences) -> SourceReferences {
601 SourceReferences {
602 updated_at: source_references.updated_at,
603 references: source_references
604 .references
605 .into_iter()
606 .map(|source_reference| source_reference.into())
607 .collect(),
608 }
609 }
610}
611
612impl From<SourceReferences> for mz_sql::plan::SourceReferences {
613 fn from(source_references: SourceReferences) -> mz_sql::plan::SourceReferences {
614 mz_sql::plan::SourceReferences {
615 updated_at: source_references.updated_at,
616 references: source_references
617 .references
618 .into_iter()
619 .map(|source_reference| source_reference.into())
620 .collect(),
621 }
622 }
623}
624
625impl From<SourceReference> for mz_sql::plan::SourceReference {
626 fn from(source_reference: SourceReference) -> mz_sql::plan::SourceReference {
627 mz_sql::plan::SourceReference {
628 name: source_reference.name,
629 namespace: source_reference.namespace,
630 columns: source_reference.columns,
631 }
632 }
633}
634
635#[derive(Clone, Debug, Serialize)]
636pub struct CatalogEntry {
637 pub item: CatalogItem,
638 #[serde(skip)]
639 pub referenced_by: Vec<CatalogItemId>,
640 #[serde(skip)]
644 pub used_by: Vec<CatalogItemId>,
645 pub id: CatalogItemId,
646 pub oid: u32,
647 pub name: QualifiedItemName,
648 pub owner_id: RoleId,
649 pub privileges: PrivilegeMap,
650}
651
652#[derive(Clone, Debug)]
670pub struct CatalogCollectionEntry {
671 pub entry: CatalogEntry,
672 pub version: RelationVersionSelector,
673}
674
675impl CatalogCollectionEntry {
676 pub fn desc(&self, name: &FullItemName) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
677 self.item().desc(name, self.version)
678 }
679
680 pub fn desc_opt(&self) -> Option<Cow<'_, RelationDesc>> {
681 self.item().desc_opt(self.version)
682 }
683}
684
685impl mz_sql::catalog::CatalogCollectionItem for CatalogCollectionEntry {
686 fn desc(&self, name: &FullItemName) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
687 CatalogCollectionEntry::desc(self, name)
688 }
689
690 fn global_id(&self) -> GlobalId {
691 match self.entry.item() {
692 CatalogItem::Source(source) => source.global_id,
693 CatalogItem::Log(log) => log.global_id,
694 CatalogItem::View(view) => view.global_id,
695 CatalogItem::MaterializedView(mv) => mv.global_id,
696 CatalogItem::Sink(sink) => sink.global_id,
697 CatalogItem::Index(index) => index.global_id,
698 CatalogItem::Type(ty) => ty.global_id,
699 CatalogItem::Func(func) => func.global_id,
700 CatalogItem::Secret(secret) => secret.global_id,
701 CatalogItem::Connection(conn) => conn.global_id,
702 CatalogItem::ContinualTask(ct) => ct.global_id,
703 CatalogItem::Table(table) => match self.version {
704 RelationVersionSelector::Latest => {
705 let (_version, gid) = table
706 .collections
707 .last_key_value()
708 .expect("at least one version");
709 *gid
710 }
711 RelationVersionSelector::Specific(version) => table
712 .collections
713 .get(&version)
714 .expect("catalog corruption, missing version!")
715 .clone(),
716 },
717 }
718 }
719}
720
721impl Deref for CatalogCollectionEntry {
722 type Target = CatalogEntry;
723
724 fn deref(&self) -> &CatalogEntry {
725 &self.entry
726 }
727}
728
729impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry {
730 fn name(&self) -> &QualifiedItemName {
731 self.entry.name()
732 }
733
734 fn id(&self) -> CatalogItemId {
735 self.entry.id()
736 }
737
738 fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
739 Box::new(self.entry.global_ids())
740 }
741
742 fn oid(&self) -> u32 {
743 self.entry.oid()
744 }
745
746 fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
747 self.entry.func()
748 }
749
750 fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
751 self.entry.source_desc()
752 }
753
754 fn connection(
755 &self,
756 ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
757 {
758 mz_sql::catalog::CatalogItem::connection(&self.entry)
759 }
760
761 fn create_sql(&self) -> &str {
762 self.entry.create_sql()
763 }
764
765 fn item_type(&self) -> SqlCatalogItemType {
766 self.entry.item_type()
767 }
768
769 fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
770 self.entry.index_details()
771 }
772
773 fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
774 self.entry.writable_table_details()
775 }
776
777 fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
778 self.entry.type_details()
779 }
780
781 fn references(&self) -> &ResolvedIds {
782 self.entry.references()
783 }
784
785 fn uses(&self) -> BTreeSet<CatalogItemId> {
786 self.entry.uses()
787 }
788
789 fn referenced_by(&self) -> &[CatalogItemId] {
790 self.entry.referenced_by()
791 }
792
793 fn used_by(&self) -> &[CatalogItemId] {
794 self.entry.used_by()
795 }
796
797 fn subsource_details(
798 &self,
799 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
800 self.entry.subsource_details()
801 }
802
803 fn source_export_details(
804 &self,
805 ) -> Option<(
806 CatalogItemId,
807 &UnresolvedItemName,
808 &SourceExportDetails,
809 &SourceExportDataConfig<ReferencedConnection>,
810 )> {
811 self.entry.source_export_details()
812 }
813
814 fn is_progress_source(&self) -> bool {
815 self.entry.is_progress_source()
816 }
817
818 fn progress_id(&self) -> Option<CatalogItemId> {
819 self.entry.progress_id()
820 }
821
822 fn owner_id(&self) -> RoleId {
823 *self.entry.owner_id()
824 }
825
826 fn privileges(&self) -> &PrivilegeMap {
827 self.entry.privileges()
828 }
829
830 fn cluster_id(&self) -> Option<ClusterId> {
831 self.entry.item().cluster_id()
832 }
833
834 fn at_version(
835 &self,
836 version: RelationVersionSelector,
837 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
838 Box::new(CatalogCollectionEntry {
839 entry: self.entry.clone(),
840 version,
841 })
842 }
843
844 fn latest_version(&self) -> Option<RelationVersion> {
845 self.entry.latest_version()
846 }
847}
848
849#[derive(Debug, Clone, Serialize)]
850pub enum CatalogItem {
851 Table(Table),
852 Source(Source),
853 Log(Log),
854 View(View),
855 MaterializedView(MaterializedView),
856 Sink(Sink),
857 Index(Index),
858 Type(Type),
859 Func(Func),
860 Secret(Secret),
861 Connection(Connection),
862 ContinualTask(ContinualTask),
863}
864
865impl From<CatalogEntry> for durable::Item {
866 fn from(entry: CatalogEntry) -> durable::Item {
867 let (create_sql, global_id, extra_versions) = entry.item.into_serialized();
868 durable::Item {
869 id: entry.id,
870 oid: entry.oid,
871 global_id,
872 schema_id: entry.name.qualifiers.schema_spec.into(),
873 name: entry.name.item,
874 create_sql,
875 owner_id: entry.owner_id,
876 privileges: entry.privileges.into_all_values().collect(),
877 extra_versions,
878 }
879 }
880}
881
882#[derive(Debug, Clone, Serialize)]
883pub struct Table {
884 pub create_sql: Option<String>,
886 pub desc: VersionedRelationDesc,
888 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
890 pub collections: BTreeMap<RelationVersion, GlobalId>,
891 #[serde(skip)]
893 pub conn_id: Option<ConnectionId>,
894 pub resolved_ids: ResolvedIds,
896 pub custom_logical_compaction_window: Option<CompactionWindow>,
898 pub is_retained_metrics_object: bool,
903 pub data_source: TableDataSource,
905}
906
907impl Table {
908 pub fn timeline(&self) -> Timeline {
909 match &self.data_source {
910 TableDataSource::TableWrites { .. } => Timeline::EpochMilliseconds,
913 TableDataSource::DataSource { timeline, .. } => timeline.clone(),
914 }
915 }
916
917 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
919 self.collections.values().copied()
920 }
921
922 pub fn global_id_writes(&self) -> GlobalId {
924 self.collections
925 .last_key_value()
926 .expect("at least one version of a table")
927 .1
928 .clone()
929 }
930
931 pub fn collection_descs(
933 &self,
934 ) -> impl Iterator<Item = (GlobalId, RelationVersion, RelationDesc)> + '_ {
935 self.collections.iter().map(|(version, gid)| {
936 let desc = self
937 .desc
938 .at_version(RelationVersionSelector::Specific(*version));
939 (*gid, *version, desc)
940 })
941 }
942
943 pub fn desc_for(&self, id: &GlobalId) -> RelationDesc {
945 let (version, _gid) = self
946 .collections
947 .iter()
948 .find(|(_version, gid)| *gid == id)
949 .expect("GlobalId to exist");
950 self.desc
951 .at_version(RelationVersionSelector::Specific(*version))
952 }
953}
954
955#[derive(Clone, Debug, Serialize)]
956pub enum TableDataSource {
957 TableWrites {
959 #[serde(skip)]
960 defaults: Vec<Expr<Aug>>,
961 },
962
963 DataSource {
966 desc: DataSourceDesc,
967 timeline: Timeline,
968 },
969}
970
971#[derive(Debug, Clone, Serialize)]
972pub enum DataSourceDesc {
973 Ingestion {
975 ingestion_desc: PlanIngestion,
976 cluster_id: ClusterId,
977 },
978 IngestionExport {
986 ingestion_id: CatalogItemId,
987 external_reference: UnresolvedItemName,
988 details: SourceExportDetails,
989 data_config: SourceExportDataConfig<ReferencedConnection>,
990 },
991 Introspection(IntrospectionType),
993 Progress,
995 Webhook {
997 validate_using: Option<WebhookValidation>,
999 body_format: WebhookBodyFormat,
1001 headers: WebhookHeaders,
1003 cluster_id: ClusterId,
1005 },
1006}
1007
1008impl DataSourceDesc {
1009 pub fn formats(&self) -> (Option<&str>, Option<&str>) {
1011 match &self {
1012 DataSourceDesc::Ingestion { ingestion_desc, .. } => {
1013 match &ingestion_desc.desc.primary_export.encoding.as_ref() {
1014 Some(encoding) => match &encoding.key {
1015 Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1016 None => (None, Some(encoding.value.type_())),
1017 },
1018 None => (None, None),
1019 }
1020 }
1021 DataSourceDesc::IngestionExport { data_config, .. } => match &data_config.encoding {
1022 Some(encoding) => match &encoding.key {
1023 Some(key) => (Some(key.type_()), Some(encoding.value.type_())),
1024 None => (None, Some(encoding.value.type_())),
1025 },
1026 None => (None, None),
1027 },
1028 DataSourceDesc::Introspection(_)
1029 | DataSourceDesc::Webhook { .. }
1030 | DataSourceDesc::Progress => (None, None),
1031 }
1032 }
1033
1034 pub fn envelope(&self) -> Option<&str> {
1036 fn envelope_string(envelope: &SourceEnvelope) -> &str {
1041 match envelope {
1042 SourceEnvelope::None(_) => "none",
1043 SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
1044 mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert",
1045 mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
1046 "debezium"
1050 }
1051 mz_storage_types::sources::envelope::UpsertStyle::ValueErrInline { .. } => {
1052 "upsert-value-err-inline"
1053 }
1054 },
1055 SourceEnvelope::CdcV2 => {
1056 "materialize"
1059 }
1060 }
1061 }
1062
1063 match self {
1064 DataSourceDesc::Ingestion { ingestion_desc, .. } => Some(envelope_string(
1069 &ingestion_desc.desc.primary_export.envelope,
1070 )),
1071 DataSourceDesc::IngestionExport { data_config, .. } => {
1072 Some(envelope_string(&data_config.envelope))
1073 }
1074 DataSourceDesc::Introspection(_)
1075 | DataSourceDesc::Webhook { .. }
1076 | DataSourceDesc::Progress => None,
1077 }
1078 }
1079}
1080
1081#[derive(Debug, Clone, Serialize)]
1082pub struct Source {
1083 pub create_sql: Option<String>,
1085 pub global_id: GlobalId,
1087 #[serde(skip)]
1089 pub data_source: DataSourceDesc,
1090 pub desc: RelationDesc,
1092 pub timeline: Timeline,
1094 pub resolved_ids: ResolvedIds,
1096 pub custom_logical_compaction_window: Option<CompactionWindow>,
1100 pub is_retained_metrics_object: bool,
1103}
1104
1105impl Source {
1106 pub fn new(
1113 plan: CreateSourcePlan,
1114 global_id: GlobalId,
1115 resolved_ids: ResolvedIds,
1116 custom_logical_compaction_window: Option<CompactionWindow>,
1117 is_retained_metrics_object: bool,
1118 ) -> Source {
1119 Source {
1120 create_sql: Some(plan.source.create_sql),
1121 data_source: match plan.source.data_source {
1122 mz_sql::plan::DataSourceDesc::Ingestion(ingestion_desc) => {
1123 DataSourceDesc::Ingestion {
1124 ingestion_desc,
1125 cluster_id: plan
1126 .in_cluster
1127 .expect("ingestion-based sources must be given a cluster ID"),
1128 }
1129 }
1130 mz_sql::plan::DataSourceDesc::Progress => {
1131 assert!(
1132 plan.in_cluster.is_none(),
1133 "subsources must not have a host config or cluster_id defined"
1134 );
1135 DataSourceDesc::Progress
1136 }
1137 mz_sql::plan::DataSourceDesc::IngestionExport {
1138 ingestion_id,
1139 external_reference,
1140 details,
1141 data_config,
1142 } => {
1143 assert!(
1144 plan.in_cluster.is_none(),
1145 "subsources must not have a host config or cluster_id defined"
1146 );
1147 DataSourceDesc::IngestionExport {
1148 ingestion_id,
1149 external_reference,
1150 details,
1151 data_config,
1152 }
1153 }
1154 mz_sql::plan::DataSourceDesc::Webhook {
1155 validate_using,
1156 body_format,
1157 headers,
1158 cluster_id,
1159 } => {
1160 mz_ore::soft_assert_or_log!(
1161 cluster_id.is_none(),
1162 "cluster_id set at Source level for Webhooks"
1163 );
1164 DataSourceDesc::Webhook {
1165 validate_using,
1166 body_format,
1167 headers,
1168 cluster_id: plan
1169 .in_cluster
1170 .expect("webhook sources must be given a cluster ID"),
1171 }
1172 }
1173 },
1174 desc: plan.source.desc,
1175 global_id,
1176 timeline: plan.timeline,
1177 resolved_ids,
1178 custom_logical_compaction_window: plan
1179 .source
1180 .compaction_window
1181 .or(custom_logical_compaction_window),
1182 is_retained_metrics_object,
1183 }
1184 }
1185
1186 pub fn source_type(&self) -> &str {
1188 match &self.data_source {
1189 DataSourceDesc::Ingestion { ingestion_desc, .. } => {
1190 ingestion_desc.desc.connection.name()
1191 }
1192 DataSourceDesc::Progress => "progress",
1193 DataSourceDesc::IngestionExport { .. } => "subsource",
1194 DataSourceDesc::Introspection(_) => "source",
1195 DataSourceDesc::Webhook { .. } => "webhook",
1196 }
1197 }
1198
1199 pub fn connection_id(&self) -> Option<CatalogItemId> {
1201 match &self.data_source {
1202 DataSourceDesc::Ingestion { ingestion_desc, .. } => {
1203 ingestion_desc.desc.connection.connection_id()
1204 }
1205 DataSourceDesc::IngestionExport { .. }
1206 | DataSourceDesc::Introspection(_)
1207 | DataSourceDesc::Webhook { .. }
1208 | DataSourceDesc::Progress => None,
1209 }
1210 }
1211
1212 pub fn global_id(&self) -> GlobalId {
1214 self.global_id
1215 }
1216
1217 pub fn user_controllable_persist_shard_count(&self) -> i64 {
1225 match &self.data_source {
1226 DataSourceDesc::Ingestion { ingestion_desc, .. } => {
1227 match &ingestion_desc.desc.connection {
1228 GenericSourceConnection::Postgres(_)
1232 | GenericSourceConnection::MySql(_)
1233 | GenericSourceConnection::SqlServer(_) => 0,
1234 GenericSourceConnection::LoadGenerator(lg) => {
1235 if lg.load_generator.views().is_empty() {
1237 1
1239 } else {
1240 0
1243 }
1244 }
1245 GenericSourceConnection::Kafka(_) => 1,
1246 }
1247 }
1248 DataSourceDesc::IngestionExport { .. } => 1,
1251 DataSourceDesc::Webhook { .. } => 1,
1252 DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => 0,
1255 }
1256 }
1257}
1258
1259#[derive(Debug, Clone, Serialize)]
1260pub struct Log {
1261 pub variant: LogVariant,
1263 pub global_id: GlobalId,
1265}
1266
1267impl Log {
1268 pub fn global_id(&self) -> GlobalId {
1270 self.global_id
1271 }
1272}
1273
1274#[derive(Debug, Clone, Serialize)]
1275pub struct Sink {
1276 pub create_sql: String,
1278 pub global_id: GlobalId,
1280 pub from: GlobalId,
1282 pub connection: StorageSinkConnection<ReferencedConnection>,
1284 pub envelope: SinkEnvelope,
1288 pub with_snapshot: bool,
1290 pub version: u64,
1292 pub resolved_ids: ResolvedIds,
1294 pub cluster_id: ClusterId,
1296}
1297
1298impl Sink {
1299 pub fn sink_type(&self) -> &str {
1300 self.connection.name()
1301 }
1302
1303 pub fn envelope(&self) -> Option<&str> {
1305 match &self.envelope {
1306 SinkEnvelope::Debezium => Some("debezium"),
1307 SinkEnvelope::Upsert => Some("upsert"),
1308 }
1309 }
1310
1311 pub fn combined_format(&self) -> Cow<'_, str> {
1316 let StorageSinkConnection::Kafka(connection) = &self.connection;
1317 connection.format.get_format_name()
1318 }
1319
1320 pub fn formats(&self) -> (Option<&str>, &str) {
1322 let StorageSinkConnection::Kafka(connection) = &self.connection;
1323 let key_format = connection
1324 .format
1325 .key_format
1326 .as_ref()
1327 .map(|format| format.get_format_name());
1328 let value_format = connection.format.value_format.get_format_name();
1329 (key_format, value_format)
1330 }
1331
1332 pub fn connection_id(&self) -> Option<CatalogItemId> {
1333 self.connection.connection_id()
1334 }
1335
1336 pub fn global_id(&self) -> GlobalId {
1338 self.global_id
1339 }
1340}
1341
1342#[derive(Debug, Clone, Serialize)]
1343pub struct View {
1344 pub create_sql: String,
1346 pub global_id: GlobalId,
1348 pub raw_expr: Arc<HirRelationExpr>,
1350 pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1352 pub desc: RelationDesc,
1354 pub conn_id: Option<ConnectionId>,
1356 pub resolved_ids: ResolvedIds,
1358 pub dependencies: DependencyIds,
1360}
1361
1362impl View {
1363 pub fn global_id(&self) -> GlobalId {
1365 self.global_id
1366 }
1367}
1368
1369#[derive(Debug, Clone, Serialize)]
1370pub struct MaterializedView {
1371 pub create_sql: String,
1373 pub global_id: GlobalId,
1375 pub raw_expr: Arc<HirRelationExpr>,
1377 pub optimized_expr: Arc<OptimizedMirRelationExpr>,
1379 pub desc: RelationDesc,
1381 pub resolved_ids: ResolvedIds,
1383 pub dependencies: DependencyIds,
1385 pub cluster_id: ClusterId,
1387 pub non_null_assertions: Vec<usize>,
1391 pub custom_logical_compaction_window: Option<CompactionWindow>,
1393 pub refresh_schedule: Option<RefreshSchedule>,
1395 pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1400}
1401
1402impl MaterializedView {
1403 pub fn global_id(&self) -> GlobalId {
1405 self.global_id
1406 }
1407}
1408
1409#[derive(Debug, Clone, Serialize)]
1410pub struct Index {
1411 pub create_sql: String,
1413 pub global_id: GlobalId,
1415 pub on: GlobalId,
1417 pub keys: Arc<[MirScalarExpr]>,
1419 pub conn_id: Option<ConnectionId>,
1421 pub resolved_ids: ResolvedIds,
1423 pub cluster_id: ClusterId,
1425 pub custom_logical_compaction_window: Option<CompactionWindow>,
1427 pub is_retained_metrics_object: bool,
1432}
1433
1434impl Index {
1435 pub fn global_id(&self) -> GlobalId {
1437 self.global_id
1438 }
1439}
1440
1441#[derive(Debug, Clone, Serialize)]
1442pub struct Type {
1443 pub create_sql: Option<String>,
1445 pub global_id: GlobalId,
1447 #[serde(skip)]
1448 pub details: CatalogTypeDetails<IdReference>,
1449 pub desc: Option<RelationDesc>,
1450 pub resolved_ids: ResolvedIds,
1452}
1453
1454#[derive(Debug, Clone, Serialize)]
1455pub struct Func {
1456 #[serde(skip)]
1458 pub inner: &'static mz_sql::func::Func,
1459 pub global_id: GlobalId,
1461}
1462
1463#[derive(Debug, Clone, Serialize)]
1464pub struct Secret {
1465 pub create_sql: String,
1467 pub global_id: GlobalId,
1469}
1470
1471#[derive(Debug, Clone, Serialize)]
1472pub struct Connection {
1473 pub create_sql: String,
1475 pub global_id: GlobalId,
1477 pub details: ConnectionDetails,
1479 pub resolved_ids: ResolvedIds,
1481}
1482
1483impl Connection {
1484 pub fn global_id(&self) -> GlobalId {
1486 self.global_id
1487 }
1488}
1489
1490#[derive(Debug, Clone, Serialize)]
1491pub struct ContinualTask {
1492 pub create_sql: String,
1494 pub global_id: GlobalId,
1496 pub input_id: GlobalId,
1498 pub with_snapshot: bool,
1499 pub raw_expr: Arc<HirRelationExpr>,
1504 pub desc: RelationDesc,
1506 pub resolved_ids: ResolvedIds,
1508 pub dependencies: DependencyIds,
1510 pub cluster_id: ClusterId,
1512 pub initial_as_of: Option<Antichain<mz_repr::Timestamp>>,
1514}
1515
1516impl ContinualTask {
1517 pub fn global_id(&self) -> GlobalId {
1519 self.global_id
1520 }
1521}
1522
1523#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
1524pub struct NetworkPolicy {
1525 pub name: String,
1526 pub id: NetworkPolicyId,
1527 pub oid: u32,
1528 pub rules: Vec<NetworkPolicyRule>,
1529 pub owner_id: RoleId,
1530 pub privileges: PrivilegeMap,
1531}
1532
1533impl From<NetworkPolicy> for durable::NetworkPolicy {
1534 fn from(policy: NetworkPolicy) -> durable::NetworkPolicy {
1535 durable::NetworkPolicy {
1536 id: policy.id,
1537 oid: policy.oid,
1538 name: policy.name,
1539 rules: policy.rules,
1540 owner_id: policy.owner_id,
1541 privileges: policy.privileges.into_all_values().collect(),
1542 }
1543 }
1544}
1545
1546impl From<durable::NetworkPolicy> for NetworkPolicy {
1547 fn from(
1548 durable::NetworkPolicy {
1549 id,
1550 oid,
1551 name,
1552 rules,
1553 owner_id,
1554 privileges,
1555 }: durable::NetworkPolicy,
1556 ) -> Self {
1557 NetworkPolicy {
1558 id,
1559 oid,
1560 name,
1561 rules,
1562 owner_id,
1563 privileges: PrivilegeMap::from_mz_acl_items(privileges),
1564 }
1565 }
1566}
1567
1568impl UpdateFrom<durable::NetworkPolicy> for NetworkPolicy {
1569 fn update_from(
1570 &mut self,
1571 durable::NetworkPolicy {
1572 id,
1573 oid,
1574 name,
1575 rules,
1576 owner_id,
1577 privileges,
1578 }: durable::NetworkPolicy,
1579 ) {
1580 self.id = id;
1581 self.oid = oid;
1582 self.name = name;
1583 self.rules = rules;
1584 self.owner_id = owner_id;
1585 self.privileges = PrivilegeMap::from_mz_acl_items(privileges);
1586 }
1587}
1588
1589impl CatalogItem {
1590 pub fn typ(&self) -> mz_sql::catalog::CatalogItemType {
1592 match self {
1593 CatalogItem::Table(_) => mz_sql::catalog::CatalogItemType::Table,
1594 CatalogItem::Source(_) => mz_sql::catalog::CatalogItemType::Source,
1595 CatalogItem::Log(_) => mz_sql::catalog::CatalogItemType::Source,
1596 CatalogItem::Sink(_) => mz_sql::catalog::CatalogItemType::Sink,
1597 CatalogItem::View(_) => mz_sql::catalog::CatalogItemType::View,
1598 CatalogItem::MaterializedView(_) => mz_sql::catalog::CatalogItemType::MaterializedView,
1599 CatalogItem::Index(_) => mz_sql::catalog::CatalogItemType::Index,
1600 CatalogItem::Type(_) => mz_sql::catalog::CatalogItemType::Type,
1601 CatalogItem::Func(_) => mz_sql::catalog::CatalogItemType::Func,
1602 CatalogItem::Secret(_) => mz_sql::catalog::CatalogItemType::Secret,
1603 CatalogItem::Connection(_) => mz_sql::catalog::CatalogItemType::Connection,
1604 CatalogItem::ContinualTask(_) => mz_sql::catalog::CatalogItemType::ContinualTask,
1605 }
1606 }
1607
1608 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
1610 let gid = match self {
1611 CatalogItem::Source(source) => source.global_id,
1612 CatalogItem::Log(log) => log.global_id,
1613 CatalogItem::Sink(sink) => sink.global_id,
1614 CatalogItem::View(view) => view.global_id,
1615 CatalogItem::MaterializedView(mv) => mv.global_id,
1616 CatalogItem::ContinualTask(ct) => ct.global_id,
1617 CatalogItem::Index(index) => index.global_id,
1618 CatalogItem::Func(func) => func.global_id,
1619 CatalogItem::Type(ty) => ty.global_id,
1620 CatalogItem::Secret(secret) => secret.global_id,
1621 CatalogItem::Connection(conn) => conn.global_id,
1622 CatalogItem::Table(table) => {
1623 return itertools::Either::Left(table.collections.values().copied());
1624 }
1625 };
1626 itertools::Either::Right(std::iter::once(gid))
1627 }
1628
1629 pub fn latest_global_id(&self) -> GlobalId {
1633 match self {
1634 CatalogItem::Source(source) => source.global_id,
1635 CatalogItem::Log(log) => log.global_id,
1636 CatalogItem::Sink(sink) => sink.global_id,
1637 CatalogItem::View(view) => view.global_id,
1638 CatalogItem::MaterializedView(mv) => mv.global_id,
1639 CatalogItem::ContinualTask(ct) => ct.global_id,
1640 CatalogItem::Index(index) => index.global_id,
1641 CatalogItem::Func(func) => func.global_id,
1642 CatalogItem::Type(ty) => ty.global_id,
1643 CatalogItem::Secret(secret) => secret.global_id,
1644 CatalogItem::Connection(conn) => conn.global_id,
1645 CatalogItem::Table(table) => table.global_id_writes(),
1646 }
1647 }
1648
1649 pub fn is_storage_collection(&self) -> bool {
1651 match self {
1652 CatalogItem::Table(_)
1653 | CatalogItem::Source(_)
1654 | CatalogItem::MaterializedView(_)
1655 | CatalogItem::Sink(_)
1656 | CatalogItem::ContinualTask(_) => true,
1657 CatalogItem::Log(_)
1658 | CatalogItem::View(_)
1659 | CatalogItem::Index(_)
1660 | CatalogItem::Type(_)
1661 | CatalogItem::Func(_)
1662 | CatalogItem::Secret(_)
1663 | CatalogItem::Connection(_) => false,
1664 }
1665 }
1666
1667 pub fn desc(
1668 &self,
1669 name: &FullItemName,
1670 version: RelationVersionSelector,
1671 ) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
1672 self.desc_opt(version)
1673 .ok_or_else(|| SqlCatalogError::InvalidDependency {
1674 name: name.to_string(),
1675 typ: self.typ(),
1676 })
1677 }
1678
1679 pub fn desc_opt(&self, version: RelationVersionSelector) -> Option<Cow<'_, RelationDesc>> {
1680 match &self {
1681 CatalogItem::Source(src) => Some(Cow::Borrowed(&src.desc)),
1682 CatalogItem::Log(log) => Some(Cow::Owned(log.variant.desc())),
1683 CatalogItem::Table(tbl) => Some(Cow::Owned(tbl.desc.at_version(version))),
1684 CatalogItem::View(view) => Some(Cow::Borrowed(&view.desc)),
1685 CatalogItem::MaterializedView(mview) => Some(Cow::Borrowed(&mview.desc)),
1686 CatalogItem::Type(typ) => typ.desc.as_ref().map(Cow::Borrowed),
1687 CatalogItem::ContinualTask(ct) => Some(Cow::Borrowed(&ct.desc)),
1688 CatalogItem::Func(_)
1689 | CatalogItem::Index(_)
1690 | CatalogItem::Sink(_)
1691 | CatalogItem::Secret(_)
1692 | CatalogItem::Connection(_) => None,
1693 }
1694 }
1695
1696 pub fn func(
1697 &self,
1698 entry: &CatalogEntry,
1699 ) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
1700 match &self {
1701 CatalogItem::Func(func) => Ok(func.inner),
1702 _ => Err(SqlCatalogError::UnexpectedType {
1703 name: entry.name().item.to_string(),
1704 actual_type: entry.item_type(),
1705 expected_type: CatalogItemType::Func,
1706 }),
1707 }
1708 }
1709
1710 pub fn source_desc(
1711 &self,
1712 entry: &CatalogEntry,
1713 ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
1714 match &self {
1715 CatalogItem::Source(source) => match &source.data_source {
1716 DataSourceDesc::Ingestion { ingestion_desc, .. } => Ok(Some(&ingestion_desc.desc)),
1717 DataSourceDesc::IngestionExport { .. }
1718 | DataSourceDesc::Introspection(_)
1719 | DataSourceDesc::Webhook { .. }
1720 | DataSourceDesc::Progress => Ok(None),
1721 },
1722 _ => Err(SqlCatalogError::UnexpectedType {
1723 name: entry.name().item.to_string(),
1724 actual_type: entry.item_type(),
1725 expected_type: CatalogItemType::Source,
1726 }),
1727 }
1728 }
1729
1730 pub fn is_progress_source(&self) -> bool {
1732 matches!(
1733 self,
1734 CatalogItem::Source(Source {
1735 data_source: DataSourceDesc::Progress,
1736 ..
1737 })
1738 )
1739 }
1740
1741 pub fn references(&self) -> &ResolvedIds {
1744 static EMPTY: LazyLock<ResolvedIds> = LazyLock::new(ResolvedIds::empty);
1745 match self {
1746 CatalogItem::Func(_) => &*EMPTY,
1747 CatalogItem::Index(idx) => &idx.resolved_ids,
1748 CatalogItem::Sink(sink) => &sink.resolved_ids,
1749 CatalogItem::Source(source) => &source.resolved_ids,
1750 CatalogItem::Log(_) => &*EMPTY,
1751 CatalogItem::Table(table) => &table.resolved_ids,
1752 CatalogItem::Type(typ) => &typ.resolved_ids,
1753 CatalogItem::View(view) => &view.resolved_ids,
1754 CatalogItem::MaterializedView(mview) => &mview.resolved_ids,
1755 CatalogItem::Secret(_) => &*EMPTY,
1756 CatalogItem::Connection(connection) => &connection.resolved_ids,
1757 CatalogItem::ContinualTask(ct) => &ct.resolved_ids,
1758 }
1759 }
1760
1761 pub fn uses(&self) -> BTreeSet<CatalogItemId> {
1767 let mut uses: BTreeSet<_> = self.references().items().copied().collect();
1768 match self {
1769 CatalogItem::Func(_) => {}
1772 CatalogItem::Index(_) => {}
1773 CatalogItem::Sink(_) => {}
1774 CatalogItem::Source(_) => {}
1775 CatalogItem::Log(_) => {}
1776 CatalogItem::Table(_) => {}
1777 CatalogItem::Type(_) => {}
1778 CatalogItem::View(view) => uses.extend(view.dependencies.0.iter().copied()),
1779 CatalogItem::MaterializedView(mview) => {
1780 uses.extend(mview.dependencies.0.iter().copied())
1781 }
1782 CatalogItem::ContinualTask(ct) => uses.extend(ct.dependencies.0.iter().copied()),
1783 CatalogItem::Secret(_) => {}
1784 CatalogItem::Connection(_) => {}
1785 }
1786 uses
1787 }
1788
1789 pub fn conn_id(&self) -> Option<&ConnectionId> {
1792 match self {
1793 CatalogItem::View(view) => view.conn_id.as_ref(),
1794 CatalogItem::Index(index) => index.conn_id.as_ref(),
1795 CatalogItem::Table(table) => table.conn_id.as_ref(),
1796 CatalogItem::Log(_)
1797 | CatalogItem::Source(_)
1798 | CatalogItem::Sink(_)
1799 | CatalogItem::MaterializedView(_)
1800 | CatalogItem::Secret(_)
1801 | CatalogItem::Type(_)
1802 | CatalogItem::Func(_)
1803 | CatalogItem::Connection(_)
1804 | CatalogItem::ContinualTask(_) => None,
1805 }
1806 }
1807
1808 pub fn is_temporary(&self) -> bool {
1810 self.conn_id().is_some()
1811 }
1812
1813 pub fn rename_schema_refs(
1814 &self,
1815 database_name: &str,
1816 cur_schema_name: &str,
1817 new_schema_name: &str,
1818 ) -> Result<CatalogItem, (String, String)> {
1819 let do_rewrite = |create_sql: String| -> Result<String, (String, String)> {
1820 let mut create_stmt = mz_sql::parse::parse(&create_sql)
1821 .expect("invalid create sql persisted to catalog")
1822 .into_element()
1823 .ast;
1824
1825 mz_sql::ast::transform::create_stmt_rename_schema_refs(
1827 &mut create_stmt,
1828 database_name,
1829 cur_schema_name,
1830 new_schema_name,
1831 )?;
1832
1833 Ok(create_stmt.to_ast_string_stable())
1834 };
1835
1836 match self {
1837 CatalogItem::Table(i) => {
1838 let mut i = i.clone();
1839 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1840 Ok(CatalogItem::Table(i))
1841 }
1842 CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
1843 CatalogItem::Source(i) => {
1844 let mut i = i.clone();
1845 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1846 Ok(CatalogItem::Source(i))
1847 }
1848 CatalogItem::Sink(i) => {
1849 let mut i = i.clone();
1850 i.create_sql = do_rewrite(i.create_sql)?;
1851 Ok(CatalogItem::Sink(i))
1852 }
1853 CatalogItem::View(i) => {
1854 let mut i = i.clone();
1855 i.create_sql = do_rewrite(i.create_sql)?;
1856 Ok(CatalogItem::View(i))
1857 }
1858 CatalogItem::MaterializedView(i) => {
1859 let mut i = i.clone();
1860 i.create_sql = do_rewrite(i.create_sql)?;
1861 Ok(CatalogItem::MaterializedView(i))
1862 }
1863 CatalogItem::Index(i) => {
1864 let mut i = i.clone();
1865 i.create_sql = do_rewrite(i.create_sql)?;
1866 Ok(CatalogItem::Index(i))
1867 }
1868 CatalogItem::Secret(i) => {
1869 let mut i = i.clone();
1870 i.create_sql = do_rewrite(i.create_sql)?;
1871 Ok(CatalogItem::Secret(i))
1872 }
1873 CatalogItem::Connection(i) => {
1874 let mut i = i.clone();
1875 i.create_sql = do_rewrite(i.create_sql)?;
1876 Ok(CatalogItem::Connection(i))
1877 }
1878 CatalogItem::Type(i) => {
1879 let mut i = i.clone();
1880 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1881 Ok(CatalogItem::Type(i))
1882 }
1883 CatalogItem::Func(i) => Ok(CatalogItem::Func(i.clone())),
1884 CatalogItem::ContinualTask(i) => {
1885 let mut i = i.clone();
1886 i.create_sql = do_rewrite(i.create_sql)?;
1887 Ok(CatalogItem::ContinualTask(i))
1888 }
1889 }
1890 }
1891
1892 pub fn rename_item_refs(
1896 &self,
1897 from: FullItemName,
1898 to_item_name: String,
1899 rename_self: bool,
1900 ) -> Result<CatalogItem, String> {
1901 let do_rewrite = |create_sql: String| -> Result<String, String> {
1902 let mut create_stmt = mz_sql::parse::parse(&create_sql)
1903 .expect("invalid create sql persisted to catalog")
1904 .into_element()
1905 .ast;
1906 if rename_self {
1907 mz_sql::ast::transform::create_stmt_rename(&mut create_stmt, to_item_name.clone());
1908 }
1909 mz_sql::ast::transform::create_stmt_rename_refs(&mut create_stmt, from, to_item_name)?;
1911 Ok(create_stmt.to_ast_string_stable())
1912 };
1913
1914 match self {
1915 CatalogItem::Table(i) => {
1916 let mut i = i.clone();
1917 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1918 Ok(CatalogItem::Table(i))
1919 }
1920 CatalogItem::Log(i) => Ok(CatalogItem::Log(i.clone())),
1921 CatalogItem::Source(i) => {
1922 let mut i = i.clone();
1923 i.create_sql = i.create_sql.map(do_rewrite).transpose()?;
1924 Ok(CatalogItem::Source(i))
1925 }
1926 CatalogItem::Sink(i) => {
1927 let mut i = i.clone();
1928 i.create_sql = do_rewrite(i.create_sql)?;
1929 Ok(CatalogItem::Sink(i))
1930 }
1931 CatalogItem::View(i) => {
1932 let mut i = i.clone();
1933 i.create_sql = do_rewrite(i.create_sql)?;
1934 Ok(CatalogItem::View(i))
1935 }
1936 CatalogItem::MaterializedView(i) => {
1937 let mut i = i.clone();
1938 i.create_sql = do_rewrite(i.create_sql)?;
1939 Ok(CatalogItem::MaterializedView(i))
1940 }
1941 CatalogItem::Index(i) => {
1942 let mut i = i.clone();
1943 i.create_sql = do_rewrite(i.create_sql)?;
1944 Ok(CatalogItem::Index(i))
1945 }
1946 CatalogItem::Secret(i) => {
1947 let mut i = i.clone();
1948 i.create_sql = do_rewrite(i.create_sql)?;
1949 Ok(CatalogItem::Secret(i))
1950 }
1951 CatalogItem::Func(_) | CatalogItem::Type(_) => {
1952 unreachable!("{}s cannot be renamed", self.typ())
1953 }
1954 CatalogItem::Connection(i) => {
1955 let mut i = i.clone();
1956 i.create_sql = do_rewrite(i.create_sql)?;
1957 Ok(CatalogItem::Connection(i))
1958 }
1959 CatalogItem::ContinualTask(i) => {
1960 let mut i = i.clone();
1961 i.create_sql = do_rewrite(i.create_sql)?;
1962 Ok(CatalogItem::ContinualTask(i))
1963 }
1964 }
1965 }
1966
1967 pub fn update_retain_history(
1970 &mut self,
1971 value: Option<Value>,
1972 window: CompactionWindow,
1973 ) -> Result<Option<WithOptionValue<Raw>>, ()> {
1974 let update = |mut ast: &mut Statement<Raw>| {
1975 macro_rules! update_retain_history {
1977 ( $stmt:ident, $opt:ident, $name:ident ) => {{
1978 let pos = $stmt
1980 .with_options
1981 .iter()
1982 .rposition(|o| o.name == mz_sql_parser::ast::$name::RetainHistory);
1984 if let Some(value) = value {
1985 let next = mz_sql_parser::ast::$opt {
1986 name: mz_sql_parser::ast::$name::RetainHistory,
1987 value: Some(WithOptionValue::RetainHistoryFor(value)),
1988 };
1989 if let Some(idx) = pos {
1990 let previous = $stmt.with_options[idx].clone();
1991 $stmt.with_options[idx] = next;
1992 previous.value
1993 } else {
1994 $stmt.with_options.push(next);
1995 None
1996 }
1997 } else {
1998 if let Some(idx) = pos {
1999 $stmt.with_options.swap_remove(idx).value
2000 } else {
2001 None
2002 }
2003 }
2004 }};
2005 }
2006 let previous = match &mut ast {
2007 Statement::CreateTable(stmt) => {
2008 update_retain_history!(stmt, TableOption, TableOptionName)
2009 }
2010 Statement::CreateIndex(stmt) => {
2011 update_retain_history!(stmt, IndexOption, IndexOptionName)
2012 }
2013 Statement::CreateSource(stmt) => {
2014 update_retain_history!(stmt, CreateSourceOption, CreateSourceOptionName)
2015 }
2016 Statement::CreateMaterializedView(stmt) => {
2017 update_retain_history!(stmt, MaterializedViewOption, MaterializedViewOptionName)
2018 }
2019 _ => {
2020 return Err(());
2021 }
2022 };
2023 Ok(previous)
2024 };
2025
2026 let res = self.update_sql(update)?;
2027 let cw = self
2028 .custom_logical_compaction_window_mut()
2029 .expect("item must have compaction window");
2030 *cw = Some(window);
2031 Ok(res)
2032 }
2033
2034 pub fn add_column(
2035 &mut self,
2036 name: ColumnName,
2037 typ: SqlColumnType,
2038 sql: RawDataType,
2039 ) -> Result<RelationVersion, PlanError> {
2040 let CatalogItem::Table(table) = self else {
2041 return Err(PlanError::Unsupported {
2042 feature: "adding columns to a non-Table".to_string(),
2043 discussion_no: None,
2044 });
2045 };
2046 let next_version = table.desc.add_column(name.clone(), typ);
2047
2048 let update = |mut ast: &mut Statement<Raw>| match &mut ast {
2049 Statement::CreateTable(stmt) => {
2050 let version = ColumnOptionDef {
2051 name: None,
2052 option: ColumnOption::Versioned {
2053 action: ColumnVersioned::Added,
2054 version: next_version.into(),
2055 },
2056 };
2057 let column = ColumnDef {
2058 name: name.into(),
2059 data_type: sql,
2060 collation: None,
2061 options: vec![version],
2062 };
2063 stmt.columns.push(column);
2064 Ok(())
2065 }
2066 _ => Err(()),
2067 };
2068
2069 self.update_sql(update)
2070 .map_err(|()| PlanError::Unstructured("expected CREATE TABLE statement".to_string()))?;
2071 Ok(next_version)
2072 }
2073
2074 pub fn update_sql<F, T>(&mut self, f: F) -> Result<T, ()>
2077 where
2078 F: FnOnce(&mut Statement<Raw>) -> Result<T, ()>,
2079 {
2080 let create_sql = match self {
2081 CatalogItem::Table(Table { create_sql, .. })
2082 | CatalogItem::Type(Type { create_sql, .. })
2083 | CatalogItem::Source(Source { create_sql, .. }) => create_sql.as_mut(),
2084 CatalogItem::Sink(Sink { create_sql, .. })
2085 | CatalogItem::View(View { create_sql, .. })
2086 | CatalogItem::MaterializedView(MaterializedView { create_sql, .. })
2087 | CatalogItem::Index(Index { create_sql, .. })
2088 | CatalogItem::Secret(Secret { create_sql, .. })
2089 | CatalogItem::Connection(Connection { create_sql, .. })
2090 | CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => Some(create_sql),
2091 CatalogItem::Func(_) | CatalogItem::Log(_) => None,
2092 };
2093 let Some(create_sql) = create_sql else {
2094 return Err(());
2095 };
2096 let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
2097 .expect("non-system items must be parseable")
2098 .into_element()
2099 .ast;
2100 debug!("rewrite: {}", ast.to_ast_string_redacted());
2101 let t = f(&mut ast)?;
2102 *create_sql = ast.to_ast_string_stable();
2103 debug!("rewrote: {}", ast.to_ast_string_redacted());
2104 Ok(t)
2105 }
2106
2107 pub fn is_compute_object_on_cluster(&self) -> Option<ClusterId> {
2114 match self {
2115 CatalogItem::Index(index) => Some(index.cluster_id),
2116 CatalogItem::Table(_)
2117 | CatalogItem::Source(_)
2118 | CatalogItem::Log(_)
2119 | CatalogItem::View(_)
2120 | CatalogItem::MaterializedView(_)
2121 | CatalogItem::Sink(_)
2122 | CatalogItem::Type(_)
2123 | CatalogItem::Func(_)
2124 | CatalogItem::Secret(_)
2125 | CatalogItem::Connection(_)
2126 | CatalogItem::ContinualTask(_) => None,
2127 }
2128 }
2129
2130 pub fn cluster_id(&self) -> Option<ClusterId> {
2131 match self {
2132 CatalogItem::MaterializedView(mv) => Some(mv.cluster_id),
2133 CatalogItem::Index(index) => Some(index.cluster_id),
2134 CatalogItem::Source(source) => match &source.data_source {
2135 DataSourceDesc::Ingestion { cluster_id, .. } => Some(*cluster_id),
2136 DataSourceDesc::IngestionExport { .. } => None,
2140 DataSourceDesc::Webhook { cluster_id, .. } => Some(*cluster_id),
2141 DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => None,
2142 },
2143 CatalogItem::Sink(sink) => Some(sink.cluster_id),
2144 CatalogItem::ContinualTask(ct) => Some(ct.cluster_id),
2145 CatalogItem::Table(_)
2146 | CatalogItem::Log(_)
2147 | CatalogItem::View(_)
2148 | CatalogItem::Type(_)
2149 | CatalogItem::Func(_)
2150 | CatalogItem::Secret(_)
2151 | CatalogItem::Connection(_) => None,
2152 }
2153 }
2154
2155 pub fn custom_logical_compaction_window(&self) -> Option<CompactionWindow> {
2158 match self {
2159 CatalogItem::Table(table) => table.custom_logical_compaction_window,
2160 CatalogItem::Source(source) => source.custom_logical_compaction_window,
2161 CatalogItem::Index(index) => index.custom_logical_compaction_window,
2162 CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
2163 CatalogItem::Log(_)
2164 | CatalogItem::View(_)
2165 | CatalogItem::Sink(_)
2166 | CatalogItem::Type(_)
2167 | CatalogItem::Func(_)
2168 | CatalogItem::Secret(_)
2169 | CatalogItem::Connection(_)
2170 | CatalogItem::ContinualTask(_) => None,
2171 }
2172 }
2173
2174 pub fn custom_logical_compaction_window_mut(
2178 &mut self,
2179 ) -> Option<&mut Option<CompactionWindow>> {
2180 let cw = match self {
2181 CatalogItem::Table(table) => &mut table.custom_logical_compaction_window,
2182 CatalogItem::Source(source) => &mut source.custom_logical_compaction_window,
2183 CatalogItem::Index(index) => &mut index.custom_logical_compaction_window,
2184 CatalogItem::MaterializedView(mview) => &mut mview.custom_logical_compaction_window,
2185 CatalogItem::Log(_)
2186 | CatalogItem::View(_)
2187 | CatalogItem::Sink(_)
2188 | CatalogItem::Type(_)
2189 | CatalogItem::Func(_)
2190 | CatalogItem::Secret(_)
2191 | CatalogItem::Connection(_)
2192 | CatalogItem::ContinualTask(_) => return None,
2193 };
2194 Some(cw)
2195 }
2196
2197 pub fn initial_logical_compaction_window(&self) -> Option<CompactionWindow> {
2205 let custom_logical_compaction_window = match self {
2206 CatalogItem::Table(_)
2207 | CatalogItem::Source(_)
2208 | CatalogItem::Index(_)
2209 | CatalogItem::MaterializedView(_)
2210 | CatalogItem::ContinualTask(_) => self.custom_logical_compaction_window(),
2211 CatalogItem::Log(_)
2212 | CatalogItem::View(_)
2213 | CatalogItem::Sink(_)
2214 | CatalogItem::Type(_)
2215 | CatalogItem::Func(_)
2216 | CatalogItem::Secret(_)
2217 | CatalogItem::Connection(_) => return None,
2218 };
2219 Some(custom_logical_compaction_window.unwrap_or(CompactionWindow::Default))
2220 }
2221
2222 pub fn is_retained_metrics_object(&self) -> bool {
2226 match self {
2227 CatalogItem::Table(table) => table.is_retained_metrics_object,
2228 CatalogItem::Source(source) => source.is_retained_metrics_object,
2229 CatalogItem::Index(index) => index.is_retained_metrics_object,
2230 CatalogItem::Log(_)
2231 | CatalogItem::View(_)
2232 | CatalogItem::MaterializedView(_)
2233 | CatalogItem::Sink(_)
2234 | CatalogItem::Type(_)
2235 | CatalogItem::Func(_)
2236 | CatalogItem::Secret(_)
2237 | CatalogItem::Connection(_)
2238 | CatalogItem::ContinualTask(_) => false,
2239 }
2240 }
2241
2242 pub fn to_serialized(&self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2243 match self {
2244 CatalogItem::Table(table) => {
2245 let create_sql = table
2246 .create_sql
2247 .clone()
2248 .expect("builtin tables cannot be serialized");
2249 let mut collections = table.collections.clone();
2250 let global_id = collections
2251 .remove(&RelationVersion::root())
2252 .expect("at least one version");
2253 (create_sql, global_id, collections)
2254 }
2255 CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2256 CatalogItem::Source(source) => {
2257 assert!(
2258 !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2259 "cannot serialize introspection/builtin sources",
2260 );
2261 let create_sql = source
2262 .create_sql
2263 .clone()
2264 .expect("builtin sources cannot be serialized");
2265 (create_sql, source.global_id, BTreeMap::new())
2266 }
2267 CatalogItem::View(view) => (view.create_sql.clone(), view.global_id, BTreeMap::new()),
2268 CatalogItem::MaterializedView(mview) => {
2269 (mview.create_sql.clone(), mview.global_id, BTreeMap::new())
2270 }
2271 CatalogItem::Index(index) => {
2272 (index.create_sql.clone(), index.global_id, BTreeMap::new())
2273 }
2274 CatalogItem::Sink(sink) => (sink.create_sql.clone(), sink.global_id, BTreeMap::new()),
2275 CatalogItem::Type(typ) => {
2276 let create_sql = typ
2277 .create_sql
2278 .clone()
2279 .expect("builtin types cannot be serialized");
2280 (create_sql, typ.global_id, BTreeMap::new())
2281 }
2282 CatalogItem::Secret(secret) => {
2283 (secret.create_sql.clone(), secret.global_id, BTreeMap::new())
2284 }
2285 CatalogItem::Connection(connection) => (
2286 connection.create_sql.clone(),
2287 connection.global_id,
2288 BTreeMap::new(),
2289 ),
2290 CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2291 CatalogItem::ContinualTask(ct) => {
2292 (ct.create_sql.clone(), ct.global_id, BTreeMap::new())
2293 }
2294 }
2295 }
2296
2297 pub fn into_serialized(self) -> (String, GlobalId, BTreeMap<RelationVersion, GlobalId>) {
2298 match self {
2299 CatalogItem::Table(mut table) => {
2300 let create_sql = table
2301 .create_sql
2302 .expect("builtin tables cannot be serialized");
2303 let global_id = table
2304 .collections
2305 .remove(&RelationVersion::root())
2306 .expect("at least one version");
2307 (create_sql, global_id, table.collections)
2308 }
2309 CatalogItem::Log(_) => unreachable!("builtin logs cannot be serialized"),
2310 CatalogItem::Source(source) => {
2311 assert!(
2312 !matches!(source.data_source, DataSourceDesc::Introspection(_)),
2313 "cannot serialize introspection/builtin sources",
2314 );
2315 let create_sql = source
2316 .create_sql
2317 .expect("builtin sources cannot be serialized");
2318 (create_sql, source.global_id, BTreeMap::new())
2319 }
2320 CatalogItem::View(view) => (view.create_sql, view.global_id, BTreeMap::new()),
2321 CatalogItem::MaterializedView(mview) => {
2322 (mview.create_sql, mview.global_id, BTreeMap::new())
2323 }
2324 CatalogItem::Index(index) => (index.create_sql, index.global_id, BTreeMap::new()),
2325 CatalogItem::Sink(sink) => (sink.create_sql, sink.global_id, BTreeMap::new()),
2326 CatalogItem::Type(typ) => {
2327 let create_sql = typ.create_sql.expect("builtin types cannot be serialized");
2328 (create_sql, typ.global_id, BTreeMap::new())
2329 }
2330 CatalogItem::Secret(secret) => (secret.create_sql, secret.global_id, BTreeMap::new()),
2331 CatalogItem::Connection(connection) => {
2332 (connection.create_sql, connection.global_id, BTreeMap::new())
2333 }
2334 CatalogItem::Func(_) => unreachable!("cannot serialize functions yet"),
2335 CatalogItem::ContinualTask(ct) => (ct.create_sql, ct.global_id, BTreeMap::new()),
2336 }
2337 }
2338}
2339
2340impl CatalogEntry {
2341 pub fn desc_latest(
2346 &self,
2347 name: &FullItemName,
2348 ) -> Result<Cow<'_, RelationDesc>, SqlCatalogError> {
2349 self.item.desc(name, RelationVersionSelector::Latest)
2350 }
2351
2352 pub fn desc_opt_latest(&self) -> Option<Cow<'_, RelationDesc>> {
2355 self.item.desc_opt(RelationVersionSelector::Latest)
2356 }
2357
2358 pub fn has_columns(&self) -> bool {
2360 self.desc_opt_latest().is_some()
2361 }
2362
2363 pub fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
2365 self.item.func(self)
2366 }
2367
2368 pub fn index(&self) -> Option<&Index> {
2370 match self.item() {
2371 CatalogItem::Index(idx) => Some(idx),
2372 _ => None,
2373 }
2374 }
2375
2376 pub fn materialized_view(&self) -> Option<&MaterializedView> {
2378 match self.item() {
2379 CatalogItem::MaterializedView(mv) => Some(mv),
2380 _ => None,
2381 }
2382 }
2383
2384 pub fn table(&self) -> Option<&Table> {
2386 match self.item() {
2387 CatalogItem::Table(tbl) => Some(tbl),
2388 _ => None,
2389 }
2390 }
2391
2392 pub fn source(&self) -> Option<&Source> {
2394 match self.item() {
2395 CatalogItem::Source(src) => Some(src),
2396 _ => None,
2397 }
2398 }
2399
2400 pub fn sink(&self) -> Option<&Sink> {
2402 match self.item() {
2403 CatalogItem::Sink(sink) => Some(sink),
2404 _ => None,
2405 }
2406 }
2407
2408 pub fn secret(&self) -> Option<&Secret> {
2410 match self.item() {
2411 CatalogItem::Secret(secret) => Some(secret),
2412 _ => None,
2413 }
2414 }
2415
2416 pub fn connection(&self) -> Result<&Connection, SqlCatalogError> {
2417 match self.item() {
2418 CatalogItem::Connection(connection) => Ok(connection),
2419 _ => {
2420 let db_name = match self.name().qualifiers.database_spec {
2421 ResolvedDatabaseSpecifier::Ambient => "".to_string(),
2422 ResolvedDatabaseSpecifier::Id(id) => format!("{id}."),
2423 };
2424 Err(SqlCatalogError::UnknownConnection(format!(
2425 "{}{}.{}",
2426 db_name,
2427 self.name().qualifiers.schema_spec,
2428 self.name().item
2429 )))
2430 }
2431 }
2432 }
2433
2434 pub fn source_desc(
2437 &self,
2438 ) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
2439 self.item.source_desc(self)
2440 }
2441
2442 pub fn is_connection(&self) -> bool {
2444 matches!(self.item(), CatalogItem::Connection(_))
2445 }
2446
2447 pub fn is_table(&self) -> bool {
2449 matches!(self.item(), CatalogItem::Table(_))
2450 }
2451
2452 pub fn is_source(&self) -> bool {
2455 matches!(self.item(), CatalogItem::Source(_))
2456 }
2457
2458 pub fn subsource_details(
2461 &self,
2462 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
2463 match &self.item() {
2464 CatalogItem::Source(source) => match &source.data_source {
2465 DataSourceDesc::IngestionExport {
2466 ingestion_id,
2467 external_reference,
2468 details,
2469 data_config: _,
2470 } => Some((*ingestion_id, external_reference, details)),
2471 _ => None,
2472 },
2473 _ => None,
2474 }
2475 }
2476
2477 pub fn source_export_details(
2480 &self,
2481 ) -> Option<(
2482 CatalogItemId,
2483 &UnresolvedItemName,
2484 &SourceExportDetails,
2485 &SourceExportDataConfig<ReferencedConnection>,
2486 )> {
2487 match &self.item() {
2488 CatalogItem::Source(source) => match &source.data_source {
2489 DataSourceDesc::IngestionExport {
2490 ingestion_id,
2491 external_reference,
2492 details,
2493 data_config,
2494 } => Some((*ingestion_id, external_reference, details, data_config)),
2495 _ => None,
2496 },
2497 CatalogItem::Table(table) => match &table.data_source {
2498 TableDataSource::DataSource {
2499 desc:
2500 DataSourceDesc::IngestionExport {
2501 ingestion_id,
2502 external_reference,
2503 details,
2504 data_config,
2505 },
2506 timeline: _,
2507 } => Some((*ingestion_id, external_reference, details, data_config)),
2508 _ => None,
2509 },
2510 _ => None,
2511 }
2512 }
2513
2514 pub fn is_progress_source(&self) -> bool {
2516 self.item().is_progress_source()
2517 }
2518
2519 pub fn progress_id(&self) -> Option<CatalogItemId> {
2521 match &self.item() {
2522 CatalogItem::Source(source) => match &source.data_source {
2523 DataSourceDesc::Ingestion { ingestion_desc, .. } => {
2524 Some(ingestion_desc.progress_subsource)
2525 }
2526 DataSourceDesc::IngestionExport { .. }
2527 | DataSourceDesc::Introspection(_)
2528 | DataSourceDesc::Progress
2529 | DataSourceDesc::Webhook { .. } => None,
2530 },
2531 CatalogItem::Table(_)
2532 | CatalogItem::Log(_)
2533 | CatalogItem::View(_)
2534 | CatalogItem::MaterializedView(_)
2535 | CatalogItem::Sink(_)
2536 | CatalogItem::Index(_)
2537 | CatalogItem::Type(_)
2538 | CatalogItem::Func(_)
2539 | CatalogItem::Secret(_)
2540 | CatalogItem::Connection(_)
2541 | CatalogItem::ContinualTask(_) => None,
2542 }
2543 }
2544
2545 pub fn is_sink(&self) -> bool {
2547 matches!(self.item(), CatalogItem::Sink(_))
2548 }
2549
2550 pub fn is_materialized_view(&self) -> bool {
2552 matches!(self.item(), CatalogItem::MaterializedView(_))
2553 }
2554
2555 pub fn is_view(&self) -> bool {
2557 matches!(self.item(), CatalogItem::View(_))
2558 }
2559
2560 pub fn is_secret(&self) -> bool {
2562 matches!(self.item(), CatalogItem::Secret(_))
2563 }
2564
2565 pub fn is_introspection_source(&self) -> bool {
2567 matches!(self.item(), CatalogItem::Log(_))
2568 }
2569
2570 pub fn is_index(&self) -> bool {
2572 matches!(self.item(), CatalogItem::Index(_))
2573 }
2574
2575 pub fn is_continual_task(&self) -> bool {
2577 matches!(self.item(), CatalogItem::ContinualTask(_))
2578 }
2579
2580 pub fn is_relation(&self) -> bool {
2582 mz_sql::catalog::ObjectType::from(self.item_type()).is_relation()
2583 }
2584
2585 pub fn references(&self) -> &ResolvedIds {
2588 self.item.references()
2589 }
2590
2591 pub fn uses(&self) -> BTreeSet<CatalogItemId> {
2597 self.item.uses()
2598 }
2599
2600 pub fn item(&self) -> &CatalogItem {
2602 &self.item
2603 }
2604
2605 pub fn id(&self) -> CatalogItemId {
2607 self.id
2608 }
2609
2610 pub fn global_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
2612 self.item().global_ids()
2613 }
2614
2615 pub fn latest_global_id(&self) -> GlobalId {
2616 self.item().latest_global_id()
2617 }
2618
2619 pub fn oid(&self) -> u32 {
2621 self.oid
2622 }
2623
2624 pub fn name(&self) -> &QualifiedItemName {
2626 &self.name
2627 }
2628
2629 pub fn referenced_by(&self) -> &[CatalogItemId] {
2631 &self.referenced_by
2632 }
2633
2634 pub fn used_by(&self) -> &[CatalogItemId] {
2636 &self.used_by
2637 }
2638
2639 pub fn conn_id(&self) -> Option<&ConnectionId> {
2642 self.item.conn_id()
2643 }
2644
2645 pub fn owner_id(&self) -> &RoleId {
2647 &self.owner_id
2648 }
2649
2650 pub fn privileges(&self) -> &PrivilegeMap {
2652 &self.privileges
2653 }
2654}
2655
2656#[derive(Debug, Clone, Default)]
2657pub struct CommentsMap {
2658 map: BTreeMap<CommentObjectId, BTreeMap<Option<usize>, String>>,
2659}
2660
2661impl CommentsMap {
2662 pub fn update_comment(
2663 &mut self,
2664 object_id: CommentObjectId,
2665 sub_component: Option<usize>,
2666 comment: Option<String>,
2667 ) -> Option<String> {
2668 let object_comments = self.map.entry(object_id).or_default();
2669
2670 let (empty, prev) = if let Some(comment) = comment {
2672 let prev = object_comments.insert(sub_component, comment);
2673 (false, prev)
2674 } else {
2675 let prev = object_comments.remove(&sub_component);
2676 (object_comments.is_empty(), prev)
2677 };
2678
2679 if empty {
2681 self.map.remove(&object_id);
2682 }
2683
2684 prev
2686 }
2687
2688 pub fn drop_comments(
2694 &mut self,
2695 object_ids: &BTreeSet<CommentObjectId>,
2696 ) -> Vec<(CommentObjectId, Option<usize>, String)> {
2697 let mut removed_comments = Vec::new();
2698
2699 for object_id in object_ids {
2700 if let Some(comments) = self.map.remove(object_id) {
2701 let removed = comments
2702 .into_iter()
2703 .map(|(sub_comp, comment)| (object_id.clone(), sub_comp, comment));
2704 removed_comments.extend(removed);
2705 }
2706 }
2707
2708 removed_comments
2709 }
2710
2711 pub fn iter(&self) -> impl Iterator<Item = (CommentObjectId, Option<usize>, &str)> {
2712 self.map
2713 .iter()
2714 .map(|(id, comments)| {
2715 comments
2716 .iter()
2717 .map(|(pos, comment)| (*id, *pos, comment.as_str()))
2718 })
2719 .flatten()
2720 }
2721
2722 pub fn get_object_comments(
2723 &self,
2724 object_id: CommentObjectId,
2725 ) -> Option<&BTreeMap<Option<usize>, String>> {
2726 self.map.get(&object_id)
2727 }
2728}
2729
2730impl Serialize for CommentsMap {
2731 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2732 where
2733 S: serde::Serializer,
2734 {
2735 let comment_count = self
2736 .map
2737 .iter()
2738 .map(|(_object_id, comments)| comments.len())
2739 .sum();
2740
2741 let mut seq = serializer.serialize_seq(Some(comment_count))?;
2742 for (object_id, sub) in &self.map {
2743 for (sub_component, comment) in sub {
2744 seq.serialize_element(&(
2745 format!("{object_id:?}"),
2746 format!("{sub_component:?}"),
2747 comment,
2748 ))?;
2749 }
2750 }
2751 seq.end()
2752 }
2753}
2754
2755#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
2756pub struct DefaultPrivileges {
2757 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
2758 privileges: BTreeMap<DefaultPrivilegeObject, RoleDefaultPrivileges>,
2759}
2760
2761#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Default)]
2764struct RoleDefaultPrivileges(
2765 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
2767 BTreeMap<RoleId, DefaultPrivilegeAclItem>,
2768);
2769
2770impl Deref for RoleDefaultPrivileges {
2771 type Target = BTreeMap<RoleId, DefaultPrivilegeAclItem>;
2772
2773 fn deref(&self) -> &Self::Target {
2774 &self.0
2775 }
2776}
2777
2778impl DerefMut for RoleDefaultPrivileges {
2779 fn deref_mut(&mut self) -> &mut Self::Target {
2780 &mut self.0
2781 }
2782}
2783
2784impl DefaultPrivileges {
2785 pub fn grant(&mut self, object: DefaultPrivilegeObject, privilege: DefaultPrivilegeAclItem) {
2787 if privilege.acl_mode.is_empty() {
2788 return;
2789 }
2790
2791 let privileges = self.privileges.entry(object).or_default();
2792 if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
2793 default_privilege.acl_mode |= privilege.acl_mode;
2794 } else {
2795 privileges.insert(privilege.grantee, privilege);
2796 }
2797 }
2798
2799 pub fn revoke(&mut self, object: &DefaultPrivilegeObject, privilege: &DefaultPrivilegeAclItem) {
2801 if let Some(privileges) = self.privileges.get_mut(object) {
2802 if let Some(default_privilege) = privileges.get_mut(&privilege.grantee) {
2803 default_privilege.acl_mode =
2804 default_privilege.acl_mode.difference(privilege.acl_mode);
2805 if default_privilege.acl_mode.is_empty() {
2806 privileges.remove(&privilege.grantee);
2807 }
2808 }
2809 if privileges.is_empty() {
2810 self.privileges.remove(object);
2811 }
2812 }
2813 }
2814
2815 pub fn get_privileges_for_grantee(
2818 &self,
2819 object: &DefaultPrivilegeObject,
2820 grantee: &RoleId,
2821 ) -> Option<&AclMode> {
2822 self.privileges
2823 .get(object)
2824 .and_then(|privileges| privileges.get(grantee))
2825 .map(|privilege| &privilege.acl_mode)
2826 }
2827
2828 pub fn get_applicable_privileges(
2830 &self,
2831 role_id: RoleId,
2832 database_id: Option<DatabaseId>,
2833 schema_id: Option<SchemaId>,
2834 object_type: mz_sql::catalog::ObjectType,
2835 ) -> impl Iterator<Item = DefaultPrivilegeAclItem> + '_ {
2836 let privilege_object_type = if object_type.is_relation() {
2840 mz_sql::catalog::ObjectType::Table
2841 } else {
2842 object_type
2843 };
2844 let valid_acl_mode = rbac::all_object_privileges(SystemObjectType::Object(object_type));
2845
2846 [
2850 DefaultPrivilegeObject {
2851 role_id,
2852 database_id,
2853 schema_id,
2854 object_type: privilege_object_type,
2855 },
2856 DefaultPrivilegeObject {
2857 role_id,
2858 database_id,
2859 schema_id: None,
2860 object_type: privilege_object_type,
2861 },
2862 DefaultPrivilegeObject {
2863 role_id,
2864 database_id: None,
2865 schema_id: None,
2866 object_type: privilege_object_type,
2867 },
2868 DefaultPrivilegeObject {
2869 role_id: RoleId::Public,
2870 database_id,
2871 schema_id,
2872 object_type: privilege_object_type,
2873 },
2874 DefaultPrivilegeObject {
2875 role_id: RoleId::Public,
2876 database_id,
2877 schema_id: None,
2878 object_type: privilege_object_type,
2879 },
2880 DefaultPrivilegeObject {
2881 role_id: RoleId::Public,
2882 database_id: None,
2883 schema_id: None,
2884 object_type: privilege_object_type,
2885 },
2886 ]
2887 .into_iter()
2888 .filter_map(|object| self.privileges.get(&object))
2889 .flat_map(|acl_map| acl_map.values())
2890 .fold(
2892 BTreeMap::new(),
2893 |mut accum, DefaultPrivilegeAclItem { grantee, acl_mode }| {
2894 let accum_acl_mode = accum.entry(grantee).or_insert_with(AclMode::empty);
2895 *accum_acl_mode |= *acl_mode;
2896 accum
2897 },
2898 )
2899 .into_iter()
2900 .map(move |(grantee, acl_mode)| (grantee, acl_mode & valid_acl_mode))
2905 .filter(|(_, acl_mode)| !acl_mode.is_empty())
2907 .map(|(grantee, acl_mode)| DefaultPrivilegeAclItem {
2908 grantee: *grantee,
2909 acl_mode,
2910 })
2911 }
2912
2913 pub fn iter(
2914 &self,
2915 ) -> impl Iterator<
2916 Item = (
2917 &DefaultPrivilegeObject,
2918 impl Iterator<Item = &DefaultPrivilegeAclItem>,
2919 ),
2920 > {
2921 self.privileges
2922 .iter()
2923 .map(|(object, acl_map)| (object, acl_map.values()))
2924 }
2925}
2926
2927#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
2928pub struct ClusterConfig {
2929 pub variant: ClusterVariant,
2930 pub workload_class: Option<String>,
2931}
2932
2933impl ClusterConfig {
2934 pub fn features(&self) -> Option<&OptimizerFeatureOverrides> {
2935 match &self.variant {
2936 ClusterVariant::Managed(managed) => Some(&managed.optimizer_feature_overrides),
2937 ClusterVariant::Unmanaged => None,
2938 }
2939 }
2940}
2941
2942impl From<ClusterConfig> for durable::ClusterConfig {
2943 fn from(config: ClusterConfig) -> Self {
2944 Self {
2945 variant: config.variant.into(),
2946 workload_class: config.workload_class,
2947 }
2948 }
2949}
2950
2951impl From<durable::ClusterConfig> for ClusterConfig {
2952 fn from(config: durable::ClusterConfig) -> Self {
2953 Self {
2954 variant: config.variant.into(),
2955 workload_class: config.workload_class,
2956 }
2957 }
2958}
2959
2960#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
2961pub struct ClusterVariantManaged {
2962 pub size: String,
2963 pub availability_zones: Vec<String>,
2964 pub logging: ReplicaLogging,
2965 pub replication_factor: u32,
2966 pub optimizer_feature_overrides: OptimizerFeatureOverrides,
2967 pub schedule: ClusterSchedule,
2968}
2969
2970impl From<ClusterVariantManaged> for durable::ClusterVariantManaged {
2971 fn from(managed: ClusterVariantManaged) -> Self {
2972 Self {
2973 size: managed.size,
2974 availability_zones: managed.availability_zones,
2975 logging: managed.logging,
2976 replication_factor: managed.replication_factor,
2977 optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
2978 schedule: managed.schedule,
2979 }
2980 }
2981}
2982
2983impl From<durable::ClusterVariantManaged> for ClusterVariantManaged {
2984 fn from(managed: durable::ClusterVariantManaged) -> Self {
2985 Self {
2986 size: managed.size,
2987 availability_zones: managed.availability_zones,
2988 logging: managed.logging,
2989 replication_factor: managed.replication_factor,
2990 optimizer_feature_overrides: managed.optimizer_feature_overrides.into(),
2991 schedule: managed.schedule,
2992 }
2993 }
2994}
2995
2996#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
2997pub enum ClusterVariant {
2998 Managed(ClusterVariantManaged),
2999 Unmanaged,
3000}
3001
3002impl From<ClusterVariant> for durable::ClusterVariant {
3003 fn from(variant: ClusterVariant) -> Self {
3004 match variant {
3005 ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3006 ClusterVariant::Unmanaged => Self::Unmanaged,
3007 }
3008 }
3009}
3010
3011impl From<durable::ClusterVariant> for ClusterVariant {
3012 fn from(variant: durable::ClusterVariant) -> Self {
3013 match variant {
3014 durable::ClusterVariant::Managed(managed) => Self::Managed(managed.into()),
3015 durable::ClusterVariant::Unmanaged => Self::Unmanaged,
3016 }
3017 }
3018}
3019
3020impl mz_sql::catalog::CatalogDatabase for Database {
3021 fn name(&self) -> &str {
3022 &self.name
3023 }
3024
3025 fn id(&self) -> DatabaseId {
3026 self.id
3027 }
3028
3029 fn has_schemas(&self) -> bool {
3030 !self.schemas_by_name.is_empty()
3031 }
3032
3033 fn schema_ids(&self) -> &BTreeMap<String, SchemaId> {
3034 &self.schemas_by_name
3035 }
3036
3037 #[allow(clippy::as_conversions)]
3039 fn schemas(&self) -> Vec<&dyn CatalogSchema> {
3040 self.schemas_by_id
3041 .values()
3042 .map(|schema| schema as &dyn CatalogSchema)
3043 .collect()
3044 }
3045
3046 fn owner_id(&self) -> RoleId {
3047 self.owner_id
3048 }
3049
3050 fn privileges(&self) -> &PrivilegeMap {
3051 &self.privileges
3052 }
3053}
3054
3055impl mz_sql::catalog::CatalogSchema for Schema {
3056 fn database(&self) -> &ResolvedDatabaseSpecifier {
3057 &self.name.database
3058 }
3059
3060 fn name(&self) -> &QualifiedSchemaName {
3061 &self.name
3062 }
3063
3064 fn id(&self) -> &SchemaSpecifier {
3065 &self.id
3066 }
3067
3068 fn has_items(&self) -> bool {
3069 !self.items.is_empty()
3070 }
3071
3072 fn item_ids(&self) -> Box<dyn Iterator<Item = CatalogItemId> + '_> {
3073 Box::new(
3074 self.items
3075 .values()
3076 .chain(self.functions.values())
3077 .chain(self.types.values())
3078 .copied(),
3079 )
3080 }
3081
3082 fn owner_id(&self) -> RoleId {
3083 self.owner_id
3084 }
3085
3086 fn privileges(&self) -> &PrivilegeMap {
3087 &self.privileges
3088 }
3089}
3090
3091impl mz_sql::catalog::CatalogRole for Role {
3092 fn name(&self) -> &str {
3093 &self.name
3094 }
3095
3096 fn id(&self) -> RoleId {
3097 self.id
3098 }
3099
3100 fn membership(&self) -> &BTreeMap<RoleId, RoleId> {
3101 &self.membership.map
3102 }
3103
3104 fn attributes(&self) -> &RoleAttributes {
3105 &self.attributes
3106 }
3107
3108 fn vars(&self) -> &BTreeMap<String, OwnedVarInput> {
3109 &self.vars.map
3110 }
3111}
3112
3113impl mz_sql::catalog::CatalogNetworkPolicy for NetworkPolicy {
3114 fn name(&self) -> &str {
3115 &self.name
3116 }
3117
3118 fn id(&self) -> NetworkPolicyId {
3119 self.id
3120 }
3121
3122 fn owner_id(&self) -> RoleId {
3123 self.owner_id
3124 }
3125
3126 fn privileges(&self) -> &PrivilegeMap {
3127 &self.privileges
3128 }
3129}
3130
3131impl mz_sql::catalog::CatalogCluster<'_> for Cluster {
3132 fn name(&self) -> &str {
3133 &self.name
3134 }
3135
3136 fn id(&self) -> ClusterId {
3137 self.id
3138 }
3139
3140 fn bound_objects(&self) -> &BTreeSet<CatalogItemId> {
3141 &self.bound_objects
3142 }
3143
3144 fn replica_ids(&self) -> &BTreeMap<String, ReplicaId> {
3145 &self.replica_id_by_name_
3146 }
3147
3148 #[allow(clippy::as_conversions)]
3150 fn replicas(&self) -> Vec<&dyn CatalogClusterReplica<'_>> {
3151 self.replicas()
3152 .map(|replica| replica as &dyn CatalogClusterReplica)
3153 .collect()
3154 }
3155
3156 fn replica(&self, id: ReplicaId) -> &dyn CatalogClusterReplica<'_> {
3157 self.replica(id).expect("catalog out of sync")
3158 }
3159
3160 fn owner_id(&self) -> RoleId {
3161 self.owner_id
3162 }
3163
3164 fn privileges(&self) -> &PrivilegeMap {
3165 &self.privileges
3166 }
3167
3168 fn is_managed(&self) -> bool {
3169 self.is_managed()
3170 }
3171
3172 fn managed_size(&self) -> Option<&str> {
3173 match &self.config.variant {
3174 ClusterVariant::Managed(ClusterVariantManaged { size, .. }) => Some(size),
3175 _ => None,
3176 }
3177 }
3178
3179 fn schedule(&self) -> Option<&ClusterSchedule> {
3180 match &self.config.variant {
3181 ClusterVariant::Managed(ClusterVariantManaged { schedule, .. }) => Some(schedule),
3182 _ => None,
3183 }
3184 }
3185
3186 fn try_to_plan(&self) -> Result<CreateClusterPlan, PlanError> {
3187 self.try_to_plan()
3188 }
3189}
3190
3191impl mz_sql::catalog::CatalogClusterReplica<'_> for ClusterReplica {
3192 fn name(&self) -> &str {
3193 &self.name
3194 }
3195
3196 fn cluster_id(&self) -> ClusterId {
3197 self.cluster_id
3198 }
3199
3200 fn replica_id(&self) -> ReplicaId {
3201 self.replica_id
3202 }
3203
3204 fn owner_id(&self) -> RoleId {
3205 self.owner_id
3206 }
3207
3208 fn internal(&self) -> bool {
3209 self.config.location.internal()
3210 }
3211}
3212
3213impl mz_sql::catalog::CatalogItem for CatalogEntry {
3214 fn name(&self) -> &QualifiedItemName {
3215 self.name()
3216 }
3217
3218 fn id(&self) -> CatalogItemId {
3219 self.id()
3220 }
3221
3222 fn global_ids(&self) -> Box<dyn Iterator<Item = GlobalId> + '_> {
3223 Box::new(self.global_ids())
3224 }
3225
3226 fn oid(&self) -> u32 {
3227 self.oid()
3228 }
3229
3230 fn func(&self) -> Result<&'static mz_sql::func::Func, SqlCatalogError> {
3231 self.func()
3232 }
3233
3234 fn source_desc(&self) -> Result<Option<&SourceDesc<ReferencedConnection>>, SqlCatalogError> {
3235 self.source_desc()
3236 }
3237
3238 fn connection(
3239 &self,
3240 ) -> Result<mz_storage_types::connections::Connection<ReferencedConnection>, SqlCatalogError>
3241 {
3242 Ok(self.connection()?.details.to_connection())
3243 }
3244
3245 fn create_sql(&self) -> &str {
3246 match self.item() {
3247 CatalogItem::Table(Table { create_sql, .. }) => {
3248 create_sql.as_deref().unwrap_or("<builtin>")
3249 }
3250 CatalogItem::Source(Source { create_sql, .. }) => {
3251 create_sql.as_deref().unwrap_or("<builtin>")
3252 }
3253 CatalogItem::Sink(Sink { create_sql, .. }) => create_sql,
3254 CatalogItem::View(View { create_sql, .. }) => create_sql,
3255 CatalogItem::MaterializedView(MaterializedView { create_sql, .. }) => create_sql,
3256 CatalogItem::Index(Index { create_sql, .. }) => create_sql,
3257 CatalogItem::Type(Type { create_sql, .. }) => {
3258 create_sql.as_deref().unwrap_or("<builtin>")
3259 }
3260 CatalogItem::Secret(Secret { create_sql, .. }) => create_sql,
3261 CatalogItem::Connection(Connection { create_sql, .. }) => create_sql,
3262 CatalogItem::Func(_) => "<builtin>",
3263 CatalogItem::Log(_) => "<builtin>",
3264 CatalogItem::ContinualTask(ContinualTask { create_sql, .. }) => create_sql,
3265 }
3266 }
3267
3268 fn item_type(&self) -> SqlCatalogItemType {
3269 self.item().typ()
3270 }
3271
3272 fn index_details(&self) -> Option<(&[MirScalarExpr], GlobalId)> {
3273 if let CatalogItem::Index(Index { keys, on, .. }) = self.item() {
3274 Some((keys, *on))
3275 } else {
3276 None
3277 }
3278 }
3279
3280 fn writable_table_details(&self) -> Option<&[Expr<Aug>]> {
3281 if let CatalogItem::Table(Table {
3282 data_source: TableDataSource::TableWrites { defaults },
3283 ..
3284 }) = self.item()
3285 {
3286 Some(defaults.as_slice())
3287 } else {
3288 None
3289 }
3290 }
3291
3292 fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
3293 if let CatalogItem::Type(Type { details, .. }) = self.item() {
3294 Some(details)
3295 } else {
3296 None
3297 }
3298 }
3299
3300 fn references(&self) -> &ResolvedIds {
3301 self.references()
3302 }
3303
3304 fn uses(&self) -> BTreeSet<CatalogItemId> {
3305 self.uses()
3306 }
3307
3308 fn referenced_by(&self) -> &[CatalogItemId] {
3309 self.referenced_by()
3310 }
3311
3312 fn used_by(&self) -> &[CatalogItemId] {
3313 self.used_by()
3314 }
3315
3316 fn subsource_details(
3317 &self,
3318 ) -> Option<(CatalogItemId, &UnresolvedItemName, &SourceExportDetails)> {
3319 self.subsource_details()
3320 }
3321
3322 fn source_export_details(
3323 &self,
3324 ) -> Option<(
3325 CatalogItemId,
3326 &UnresolvedItemName,
3327 &SourceExportDetails,
3328 &SourceExportDataConfig<ReferencedConnection>,
3329 )> {
3330 self.source_export_details()
3331 }
3332
3333 fn is_progress_source(&self) -> bool {
3334 self.is_progress_source()
3335 }
3336
3337 fn progress_id(&self) -> Option<CatalogItemId> {
3338 self.progress_id()
3339 }
3340
3341 fn owner_id(&self) -> RoleId {
3342 self.owner_id
3343 }
3344
3345 fn privileges(&self) -> &PrivilegeMap {
3346 &self.privileges
3347 }
3348
3349 fn cluster_id(&self) -> Option<ClusterId> {
3350 self.item().cluster_id()
3351 }
3352
3353 fn at_version(
3354 &self,
3355 version: RelationVersionSelector,
3356 ) -> Box<dyn mz_sql::catalog::CatalogCollectionItem> {
3357 Box::new(CatalogCollectionEntry {
3358 entry: self.clone(),
3359 version,
3360 })
3361 }
3362
3363 fn latest_version(&self) -> Option<RelationVersion> {
3364 self.table().map(|t| t.desc.latest_version())
3365 }
3366}
3367
3368#[derive(Debug)]
3370pub struct StateUpdate {
3371 pub kind: StateUpdateKind,
3372 pub ts: Timestamp,
3373 pub diff: StateDiff,
3374}
3375
3376#[derive(Debug, Clone)]
3380pub enum StateUpdateKind {
3381 Role(durable::objects::Role),
3382 RoleAuth(durable::objects::RoleAuth),
3383 Database(durable::objects::Database),
3384 Schema(durable::objects::Schema),
3385 DefaultPrivilege(durable::objects::DefaultPrivilege),
3386 SystemPrivilege(MzAclItem),
3387 SystemConfiguration(durable::objects::SystemConfiguration),
3388 Cluster(durable::objects::Cluster),
3389 NetworkPolicy(durable::objects::NetworkPolicy),
3390 IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3391 ClusterReplica(durable::objects::ClusterReplica),
3392 SourceReferences(durable::objects::SourceReferences),
3393 SystemObjectMapping(durable::objects::SystemObjectMapping),
3394 TemporaryItem(TemporaryItem),
3397 Item(durable::objects::Item),
3398 Comment(durable::objects::Comment),
3399 AuditLog(durable::objects::AuditLog),
3400 StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3402 UnfinalizedShard(durable::objects::UnfinalizedShard),
3403}
3404
3405#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
3407pub enum StateDiff {
3408 Retraction,
3409 Addition,
3410}
3411
3412impl From<StateDiff> for Diff {
3413 fn from(diff: StateDiff) -> Self {
3414 match diff {
3415 StateDiff::Retraction => Diff::MINUS_ONE,
3416 StateDiff::Addition => Diff::ONE,
3417 }
3418 }
3419}
3420impl TryFrom<Diff> for StateDiff {
3421 type Error = String;
3422
3423 fn try_from(diff: Diff) -> Result<Self, Self::Error> {
3424 match diff {
3425 Diff::MINUS_ONE => Ok(Self::Retraction),
3426 Diff::ONE => Ok(Self::Addition),
3427 diff => Err(format!("invalid diff {diff}")),
3428 }
3429 }
3430}
3431
3432#[derive(Debug, Clone)]
3434pub struct TemporaryItem {
3435 pub id: CatalogItemId,
3436 pub oid: u32,
3437 pub name: QualifiedItemName,
3438 pub item: CatalogItem,
3439 pub owner_id: RoleId,
3440 pub privileges: PrivilegeMap,
3441}
3442
3443impl From<CatalogEntry> for TemporaryItem {
3444 fn from(entry: CatalogEntry) -> Self {
3445 TemporaryItem {
3446 id: entry.id,
3447 oid: entry.oid,
3448 name: entry.name,
3449 item: entry.item,
3450 owner_id: entry.owner_id,
3451 privileges: entry.privileges,
3452 }
3453 }
3454}
3455
3456#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
3458pub enum BootstrapStateUpdateKind {
3459 Role(durable::objects::Role),
3460 RoleAuth(durable::objects::RoleAuth),
3461 Database(durable::objects::Database),
3462 Schema(durable::objects::Schema),
3463 DefaultPrivilege(durable::objects::DefaultPrivilege),
3464 SystemPrivilege(MzAclItem),
3465 SystemConfiguration(durable::objects::SystemConfiguration),
3466 Cluster(durable::objects::Cluster),
3467 NetworkPolicy(durable::objects::NetworkPolicy),
3468 IntrospectionSourceIndex(durable::objects::IntrospectionSourceIndex),
3469 ClusterReplica(durable::objects::ClusterReplica),
3470 SourceReferences(durable::objects::SourceReferences),
3471 SystemObjectMapping(durable::objects::SystemObjectMapping),
3472 Item(durable::objects::Item),
3473 Comment(durable::objects::Comment),
3474 AuditLog(durable::objects::AuditLog),
3475 StorageCollectionMetadata(durable::objects::StorageCollectionMetadata),
3477 UnfinalizedShard(durable::objects::UnfinalizedShard),
3478}
3479
3480impl From<BootstrapStateUpdateKind> for StateUpdateKind {
3481 fn from(value: BootstrapStateUpdateKind) -> Self {
3482 match value {
3483 BootstrapStateUpdateKind::Role(kind) => StateUpdateKind::Role(kind),
3484 BootstrapStateUpdateKind::RoleAuth(kind) => StateUpdateKind::RoleAuth(kind),
3485 BootstrapStateUpdateKind::Database(kind) => StateUpdateKind::Database(kind),
3486 BootstrapStateUpdateKind::Schema(kind) => StateUpdateKind::Schema(kind),
3487 BootstrapStateUpdateKind::DefaultPrivilege(kind) => {
3488 StateUpdateKind::DefaultPrivilege(kind)
3489 }
3490 BootstrapStateUpdateKind::SystemPrivilege(kind) => {
3491 StateUpdateKind::SystemPrivilege(kind)
3492 }
3493 BootstrapStateUpdateKind::SystemConfiguration(kind) => {
3494 StateUpdateKind::SystemConfiguration(kind)
3495 }
3496 BootstrapStateUpdateKind::SourceReferences(kind) => {
3497 StateUpdateKind::SourceReferences(kind)
3498 }
3499 BootstrapStateUpdateKind::Cluster(kind) => StateUpdateKind::Cluster(kind),
3500 BootstrapStateUpdateKind::NetworkPolicy(kind) => StateUpdateKind::NetworkPolicy(kind),
3501 BootstrapStateUpdateKind::IntrospectionSourceIndex(kind) => {
3502 StateUpdateKind::IntrospectionSourceIndex(kind)
3503 }
3504 BootstrapStateUpdateKind::ClusterReplica(kind) => StateUpdateKind::ClusterReplica(kind),
3505 BootstrapStateUpdateKind::SystemObjectMapping(kind) => {
3506 StateUpdateKind::SystemObjectMapping(kind)
3507 }
3508 BootstrapStateUpdateKind::Item(kind) => StateUpdateKind::Item(kind),
3509 BootstrapStateUpdateKind::Comment(kind) => StateUpdateKind::Comment(kind),
3510 BootstrapStateUpdateKind::AuditLog(kind) => StateUpdateKind::AuditLog(kind),
3511 BootstrapStateUpdateKind::StorageCollectionMetadata(kind) => {
3512 StateUpdateKind::StorageCollectionMetadata(kind)
3513 }
3514 BootstrapStateUpdateKind::UnfinalizedShard(kind) => {
3515 StateUpdateKind::UnfinalizedShard(kind)
3516 }
3517 }
3518 }
3519}
3520
3521impl TryFrom<StateUpdateKind> for BootstrapStateUpdateKind {
3522 type Error = TemporaryItem;
3523
3524 fn try_from(value: StateUpdateKind) -> Result<Self, Self::Error> {
3525 match value {
3526 StateUpdateKind::Role(kind) => Ok(BootstrapStateUpdateKind::Role(kind)),
3527 StateUpdateKind::RoleAuth(kind) => Ok(BootstrapStateUpdateKind::RoleAuth(kind)),
3528 StateUpdateKind::Database(kind) => Ok(BootstrapStateUpdateKind::Database(kind)),
3529 StateUpdateKind::Schema(kind) => Ok(BootstrapStateUpdateKind::Schema(kind)),
3530 StateUpdateKind::DefaultPrivilege(kind) => {
3531 Ok(BootstrapStateUpdateKind::DefaultPrivilege(kind))
3532 }
3533 StateUpdateKind::SystemPrivilege(kind) => {
3534 Ok(BootstrapStateUpdateKind::SystemPrivilege(kind))
3535 }
3536 StateUpdateKind::SystemConfiguration(kind) => {
3537 Ok(BootstrapStateUpdateKind::SystemConfiguration(kind))
3538 }
3539 StateUpdateKind::Cluster(kind) => Ok(BootstrapStateUpdateKind::Cluster(kind)),
3540 StateUpdateKind::NetworkPolicy(kind) => {
3541 Ok(BootstrapStateUpdateKind::NetworkPolicy(kind))
3542 }
3543 StateUpdateKind::IntrospectionSourceIndex(kind) => {
3544 Ok(BootstrapStateUpdateKind::IntrospectionSourceIndex(kind))
3545 }
3546 StateUpdateKind::ClusterReplica(kind) => {
3547 Ok(BootstrapStateUpdateKind::ClusterReplica(kind))
3548 }
3549 StateUpdateKind::SourceReferences(kind) => {
3550 Ok(BootstrapStateUpdateKind::SourceReferences(kind))
3551 }
3552 StateUpdateKind::SystemObjectMapping(kind) => {
3553 Ok(BootstrapStateUpdateKind::SystemObjectMapping(kind))
3554 }
3555 StateUpdateKind::TemporaryItem(kind) => Err(kind),
3556 StateUpdateKind::Item(kind) => Ok(BootstrapStateUpdateKind::Item(kind)),
3557 StateUpdateKind::Comment(kind) => Ok(BootstrapStateUpdateKind::Comment(kind)),
3558 StateUpdateKind::AuditLog(kind) => Ok(BootstrapStateUpdateKind::AuditLog(kind)),
3559 StateUpdateKind::StorageCollectionMetadata(kind) => {
3560 Ok(BootstrapStateUpdateKind::StorageCollectionMetadata(kind))
3561 }
3562 StateUpdateKind::UnfinalizedShard(kind) => {
3563 Ok(BootstrapStateUpdateKind::UnfinalizedShard(kind))
3564 }
3565 }
3566 }
3567}