1use mz_controller_types::{ClusterId, ReplicaId};
16use mz_repr::role_id::RoleId;
17use mz_repr::{CatalogItemId, GlobalId};
18use mz_sql::catalog::{CatalogItem, DefaultPrivilegeObject};
19use mz_sql::names::{
20 CommentObjectId, DatabaseId, QualifiedItemName, ResolvedDatabaseSpecifier, SchemaId,
21 SchemaSpecifier,
22};
23use mz_sql_parser::ast::{self, Statement};
24use mz_sql_parser::parser::ParserStatementError;
25use serde::Serialize;
26
27use super::CatalogState;
29
30#[derive(Debug, Default, Clone, Serialize, PartialEq)]
31pub struct CatalogInconsistencies {
32 internal_fields: Vec<InternalFieldsInconsistency>,
34 roles: Vec<RoleInconsistency>,
36 comments: Vec<CommentInconsistency>,
38 object_dependencies: Vec<ObjectDependencyInconsistency>,
40 items: Vec<ItemInconsistency>,
42}
43
44impl CatalogInconsistencies {
45 pub fn is_empty(&self) -> bool {
46 let CatalogInconsistencies {
47 internal_fields,
48 roles,
49 comments,
50 object_dependencies,
51 items,
52 } = self;
53 internal_fields.is_empty()
54 && roles.is_empty()
55 && comments.is_empty()
56 && object_dependencies.is_empty()
57 && items.is_empty()
58 }
59}
60
61impl CatalogState {
62 pub fn check_consistency(&self) -> Result<(), Box<CatalogInconsistencies>> {
64 let mut inconsistencies = CatalogInconsistencies::default();
65
66 if let Err(internal_fields) = self.check_internal_fields() {
67 inconsistencies.internal_fields = internal_fields;
68 }
69 if let Err(roles) = self.check_roles() {
70 inconsistencies.roles = roles;
71 }
72 if let Err(comments) = self.check_comments() {
73 inconsistencies.comments = comments;
74 }
75 if let Err(dependencies) = self.check_object_dependencies() {
76 inconsistencies.object_dependencies = dependencies;
77 }
78 if let Err(items) = self.check_items() {
79 inconsistencies.items = items;
80 }
81
82 if inconsistencies.is_empty() {
83 Ok(())
84 } else {
85 Err(Box::new(inconsistencies))
86 }
87 }
88
89 fn check_internal_fields(&self) -> Result<(), Vec<InternalFieldsInconsistency>> {
96 let mut inconsistencies = Vec::new();
97 for (name, id) in &self.database_by_name {
98 if !self.database_by_id.contains_key(id) {
99 inconsistencies.push(InternalFieldsInconsistency::Database(name.clone(), *id));
100 }
101 }
102 for (name, id) in &self.ambient_schemas_by_name {
103 if !self.ambient_schemas_by_id.contains_key(id) {
104 inconsistencies.push(InternalFieldsInconsistency::AmbientSchema(
105 name.clone(),
106 *id,
107 ));
108 }
109 }
110 for (name, id) in &self.clusters_by_name {
111 if !self.clusters_by_id.contains_key(id) {
112 inconsistencies.push(InternalFieldsInconsistency::Cluster(name.clone(), *id));
113 }
114 }
115 for (name, role_id) in &self.roles_by_name {
116 if !self.roles_by_id.contains_key(role_id) {
117 inconsistencies.push(InternalFieldsInconsistency::Role(name.clone(), *role_id))
118 }
119 }
120
121 for (source_id, _references) in &self.source_references {
122 if !self.entry_by_id.contains_key(source_id) {
123 inconsistencies.push(InternalFieldsInconsistency::SourceReferences(*source_id));
124 }
125 }
126
127 for (item_id, entry) in &self.entry_by_id {
128 let missing_gids: Vec<_> = entry
129 .global_ids()
130 .filter(|gid| !self.entry_by_global_id.contains_key(gid))
131 .collect();
132 if !missing_gids.is_empty() {
133 inconsistencies.push(InternalFieldsInconsistency::EntryMissingGlobalIds(
134 *item_id,
135 missing_gids,
136 ));
137 }
138 }
139 for (gid, item_id) in &self.entry_by_global_id {
140 if !self.entry_by_id.contains_key(item_id) {
141 inconsistencies.push(InternalFieldsInconsistency::GlobalIdsMissingEntry(
142 *gid, *item_id,
143 ));
144 }
145 }
146
147 if inconsistencies.is_empty() {
148 Ok(())
149 } else {
150 Err(inconsistencies)
151 }
152 }
153
154 fn check_roles(&self) -> Result<(), Vec<RoleInconsistency>> {
159 let mut inconsistencies = Vec::new();
160 for (database_id, database) in &self.database_by_id {
161 if !self.roles_by_id.contains_key(&database.owner_id) {
162 inconsistencies.push(RoleInconsistency::Database(*database_id, database.owner_id));
163 }
164 for (schema_id, schema) in &database.schemas_by_id {
165 if !self.roles_by_id.contains_key(&schema.owner_id) {
166 inconsistencies.push(RoleInconsistency::Schema(*schema_id, schema.owner_id));
167 }
168 }
169 }
170 for (item_id, entry) in &self.entry_by_id {
171 if !self.roles_by_id.contains_key(entry.owner_id()) {
172 inconsistencies.push(RoleInconsistency::Entry(*item_id, entry.owner_id().clone()));
173 }
174 }
175 for (cluster_id, cluster) in &self.clusters_by_id {
176 if !self.roles_by_id.contains_key(&cluster.owner_id) {
177 inconsistencies.push(RoleInconsistency::Cluster(*cluster_id, cluster.owner_id));
178 }
179 for replica in cluster.replicas() {
180 if !self.roles_by_id.contains_key(&replica.owner_id) {
181 inconsistencies.push(RoleInconsistency::ClusterReplica(
182 *cluster_id,
183 replica.replica_id,
184 cluster.owner_id,
185 ));
186 }
187 }
188 }
189 for (role_id, _) in &self.role_auth_by_id {
190 if !self.roles_by_id.contains_key(role_id) {
191 inconsistencies.push(RoleInconsistency::RoleAuth(role_id.clone()));
192 }
193 }
194 for (default_priv, privileges) in self.default_privileges.iter() {
195 if !self.roles_by_id.contains_key(&default_priv.role_id) {
196 inconsistencies.push(RoleInconsistency::DefaultPrivilege(default_priv.clone()));
197 }
198 for acl_item in privileges {
199 if !self.roles_by_id.contains_key(&acl_item.grantee) {
200 inconsistencies.push(RoleInconsistency::DefaultPrivilegeItem {
201 grantor: default_priv.role_id,
202 grantee: acl_item.grantee,
203 });
204 }
205 }
206 }
207 for acl in self.system_privileges.all_values() {
208 let grantor = self.roles_by_id.get(&acl.grantor);
209 let grantee = self.roles_by_id.get(&acl.grantee);
210
211 let inconsistency = match (grantor, grantee) {
212 (None, None) => RoleInconsistency::SystemPrivilege {
213 grantor: Some(acl.grantor),
214 grantee: Some(acl.grantee),
215 },
216 (Some(_), None) => RoleInconsistency::SystemPrivilege {
217 grantor: None,
218 grantee: Some(acl.grantee),
219 },
220 (None, Some(_)) => RoleInconsistency::SystemPrivilege {
221 grantor: Some(acl.grantor),
222 grantee: None,
223 },
224 (Some(_), Some(_)) => continue,
225 };
226 inconsistencies.push(inconsistency);
227 }
228 for role in self.roles_by_id.values() {
229 for (parent_id, grantor_id) in &role.membership.map {
230 let parent = self.roles_by_id.get(parent_id);
231 let grantor = self.roles_by_id.get(grantor_id);
232 let inconsistency = match (parent, grantor) {
233 (None, None) => RoleInconsistency::Membership {
234 parent: Some(*parent_id),
235 grantor: Some(*grantor_id),
236 },
237 (Some(_), None) => RoleInconsistency::Membership {
238 parent: None,
239 grantor: Some(*grantor_id),
240 },
241 (None, Some(_)) => RoleInconsistency::Membership {
242 parent: Some(*parent_id),
243 grantor: None,
244 },
245 (Some(_), Some(_)) => continue,
246 };
247 inconsistencies.push(inconsistency);
248 }
249 }
250
251 if inconsistencies.is_empty() {
252 Ok(())
253 } else {
254 Err(inconsistencies)
255 }
256 }
257
258 fn check_comments(&self) -> Result<(), Vec<CommentInconsistency>> {
264 let mut comment_inconsistencies = Vec::new();
265 for (comment_object_id, col_pos, _comment) in self.comments.iter() {
266 match comment_object_id {
267 CommentObjectId::Table(item_id)
268 | CommentObjectId::View(item_id)
269 | CommentObjectId::MaterializedView(item_id)
270 | CommentObjectId::Source(item_id)
271 | CommentObjectId::Sink(item_id)
272 | CommentObjectId::Index(item_id)
273 | CommentObjectId::Func(item_id)
274 | CommentObjectId::Connection(item_id)
275 | CommentObjectId::Type(item_id)
276 | CommentObjectId::Secret(item_id)
277 | CommentObjectId::ContinualTask(item_id) => {
278 let entry = self.entry_by_id.get(&item_id);
279 match entry {
280 None => comment_inconsistencies
281 .push(CommentInconsistency::Dangling(comment_object_id)),
282 Some(entry) => {
283 #[allow(clippy::unnecessary_unwrap)]
285 if !entry.has_columns() && col_pos.is_some() {
286 let col_pos = col_pos.expect("checked above");
287 comment_inconsistencies.push(CommentInconsistency::NonRelation(
288 comment_object_id,
289 col_pos,
290 ));
291 }
292 }
293 }
294 }
295 CommentObjectId::NetworkPolicy(network_policy_id) => {
296 if !self.network_policies_by_id.contains_key(&network_policy_id) {
297 comment_inconsistencies
298 .push(CommentInconsistency::Dangling(comment_object_id));
299 }
300 }
301
302 CommentObjectId::Role(role_id) => {
303 if !self.roles_by_id.contains_key(&role_id) {
304 comment_inconsistencies
305 .push(CommentInconsistency::Dangling(comment_object_id));
306 }
307 }
308 CommentObjectId::Database(database_id) => {
309 if !self.database_by_id.contains_key(&database_id) {
310 comment_inconsistencies
311 .push(CommentInconsistency::Dangling(comment_object_id));
312 }
313 }
314 CommentObjectId::Schema((database, schema)) => {
315 match (database, schema) {
316 (
317 ResolvedDatabaseSpecifier::Id(database_id),
318 SchemaSpecifier::Id(schema_id),
319 ) => {
320 let schema = self
321 .database_by_id
322 .get(&database_id)
323 .and_then(|database| database.schemas_by_id.get(&schema_id));
324 if schema.is_none() {
325 comment_inconsistencies
326 .push(CommentInconsistency::Dangling(comment_object_id));
327 }
328 }
329 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(schema_id)) => {
330 if !self.ambient_schemas_by_id.contains_key(&schema_id) {
331 comment_inconsistencies
332 .push(CommentInconsistency::Dangling(comment_object_id));
333 }
334 }
335 (ResolvedDatabaseSpecifier::Id(_id), SchemaSpecifier::Temporary) => (),
337 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => (),
339 }
340 }
341 CommentObjectId::Cluster(cluster_id) => {
342 if !self.clusters_by_id.contains_key(&cluster_id) {
343 comment_inconsistencies
344 .push(CommentInconsistency::Dangling(comment_object_id));
345 }
346 }
347 CommentObjectId::ClusterReplica((cluster_id, replica_id)) => {
348 let replica = self
349 .clusters_by_id
350 .get(&cluster_id)
351 .and_then(|cluster| cluster.replica(replica_id));
352 if replica.is_none() {
353 comment_inconsistencies
354 .push(CommentInconsistency::Dangling(comment_object_id));
355 }
356 }
357 }
358 }
359
360 if comment_inconsistencies.is_empty() {
361 Ok(())
362 } else {
363 Err(comment_inconsistencies)
364 }
365 }
366
367 fn check_object_dependencies(&self) -> Result<(), Vec<ObjectDependencyInconsistency>> {
375 let mut dependency_inconsistencies = vec![];
376
377 for (id, entry) in &self.entry_by_id {
378 for referenced_id in entry.references().items() {
379 let Some(referenced_entry) = self.entry_by_id.get(referenced_id) else {
380 dependency_inconsistencies.push(ObjectDependencyInconsistency::MissingUses {
381 object_a: *id,
382 object_b: *referenced_id,
383 });
384 continue;
385 };
386 if !referenced_entry.referenced_by().contains(id)
387 && (referenced_entry.id() != *id && !referenced_entry.is_continual_task())
389 {
390 dependency_inconsistencies.push(
391 ObjectDependencyInconsistency::InconsistentUsedBy {
392 object_a: *id,
393 object_b: *referenced_id,
394 },
395 );
396 }
397 }
398 for used_id in entry.uses() {
399 let Some(used_entry) = self.entry_by_id.get(&used_id) else {
400 dependency_inconsistencies.push(ObjectDependencyInconsistency::MissingUses {
401 object_a: *id,
402 object_b: used_id,
403 });
404 continue;
405 };
406 if !used_entry.used_by().contains(id)
407 && (used_entry.id() != *id && !used_entry.is_continual_task())
409 {
410 dependency_inconsistencies.push(
411 ObjectDependencyInconsistency::InconsistentUsedBy {
412 object_a: *id,
413 object_b: used_id,
414 },
415 );
416 }
417 }
418
419 for referenced_by in entry.referenced_by() {
420 let Some(referenced_by_entry) = self.entry_by_id.get(referenced_by) else {
421 dependency_inconsistencies.push(ObjectDependencyInconsistency::MissingUsedBy {
422 object_a: *id,
423 object_b: *referenced_by,
424 });
425 continue;
426 };
427 if !referenced_by_entry.references().contains_item(id) {
428 dependency_inconsistencies.push(
429 ObjectDependencyInconsistency::InconsistentUses {
430 object_a: *id,
431 object_b: *referenced_by,
432 },
433 );
434 }
435 }
436 for used_by in entry.used_by() {
437 let Some(used_by_entry) = self.entry_by_id.get(used_by) else {
438 dependency_inconsistencies.push(ObjectDependencyInconsistency::MissingUsedBy {
439 object_a: *id,
440 object_b: *used_by,
441 });
442 continue;
443 };
444 if !used_by_entry.uses().contains(id) {
445 dependency_inconsistencies.push(
446 ObjectDependencyInconsistency::InconsistentUses {
447 object_a: *id,
448 object_b: *used_by,
449 },
450 );
451 }
452 }
453 }
454
455 if dependency_inconsistencies.is_empty() {
456 Ok(())
457 } else {
458 Err(dependency_inconsistencies)
459 }
460 }
461
462 fn check_items(&self) -> Result<(), Vec<ItemInconsistency>> {
474 let mut item_inconsistencies = vec![];
475
476 for (db_id, db) in &self.database_by_id {
477 for (schema_name, schema_id) in &db.schemas_by_name {
478 let Some(schema) = db.schemas_by_id.get(schema_id) else {
480 item_inconsistencies.push(ItemInconsistency::MissingSchema {
481 db_id: *db_id,
482 schema_name: schema_name.clone(),
483 });
484 continue;
485 };
486 if schema_name != &schema.name.schema {
487 item_inconsistencies.push(ItemInconsistency::KeyedName {
488 db_schema_by_name: schema_name.clone(),
489 struct_name: schema.name.schema.clone(),
490 });
491 }
492
493 for (item_name, item_id) in &schema.items {
495 let Some(entry) = self.entry_by_id.get(item_id) else {
496 item_inconsistencies.push(ItemInconsistency::NonExistentItem {
497 db_id: *db_id,
498 schema_id: schema.id,
499 item_id: *item_id,
500 });
501 continue;
502 };
503 if item_name != &entry.name().item {
504 item_inconsistencies.push(ItemInconsistency::ItemNameMismatch {
505 item_id: *item_id,
506 map_name: item_name.clone(),
507 entry_name: entry.name().clone(),
508 });
509 }
510 let statement = match mz_sql::parse::parse(entry.create_sql()) {
511 Ok(mut statements) if statements.len() == 1 => {
512 let statement = statements.pop().expect("checked length");
513 statement.ast
514 }
515 Ok(_) => {
516 item_inconsistencies.push(ItemInconsistency::MultiCreateStatement {
517 create_sql: entry.create_sql().to_string(),
518 });
519 continue;
520 }
521 Err(e) => {
522 item_inconsistencies.push(ItemInconsistency::StatementParseFailure {
523 create_sql: entry.create_sql().to_string(),
524 e,
525 });
526 continue;
527 }
528 };
529 match statement {
530 Statement::CreateConnection(ast::CreateConnectionStatement {
531 name,
532 ..
533 })
534 | Statement::CreateWebhookSource(ast::CreateWebhookSourceStatement {
535 name,
536 ..
537 })
538 | Statement::CreateSource(ast::CreateSourceStatement { name, .. })
539 | Statement::CreateSubsource(ast::CreateSubsourceStatement {
540 name, ..
541 })
542 | Statement::CreateSink(ast::CreateSinkStatement {
543 name: Some(name),
544 ..
545 })
546 | Statement::CreateView(ast::CreateViewStatement {
547 definition: ast::ViewDefinition { name, .. },
548 ..
549 })
550 | Statement::CreateMaterializedView(
551 ast::CreateMaterializedViewStatement { name, .. },
552 )
553 | Statement::CreateTable(ast::CreateTableStatement { name, .. })
554 | Statement::CreateType(ast::CreateTypeStatement { name, .. })
555 | Statement::CreateSecret(ast::CreateSecretStatement { name, .. }) => {
556 let [db_component, schema_component, item_component] = &name.0[..]
557 else {
558 let name =
559 name.0.into_iter().map(|ident| ident.to_string()).collect();
560 item_inconsistencies.push(
561 ItemInconsistency::NonFullyQualifiedItemName {
562 create_sql: entry.create_sql().to_string(),
563 name,
564 },
565 );
566 continue;
567 };
568 if db_component.as_str() != &db.name
569 || schema_component.as_str() != &schema.name.schema
570 || item_component.as_str() != &entry.name().item
571 {
572 item_inconsistencies.push(
573 ItemInconsistency::CreateSqlItemNameMismatch {
574 item_name: vec![
575 db.name.clone(),
576 schema.name.schema.clone(),
577 entry.name().item.clone(),
578 ],
579 create_sql: entry.create_sql().to_string(),
580 },
581 );
582 }
583 }
584 Statement::CreateSchema(ast::CreateSchemaStatement { name, .. }) => {
585 let [db_component, schema_component] = &name.0[..] else {
586 let name =
587 name.0.into_iter().map(|ident| ident.to_string()).collect();
588 item_inconsistencies.push(
589 ItemInconsistency::NonFullyQualifiedSchemaName {
590 create_sql: entry.create_sql().to_string(),
591 name,
592 },
593 );
594 continue;
595 };
596 if db_component.as_str() != &db.name
597 || schema_component.as_str() != &schema.name.schema
598 {
599 item_inconsistencies.push(
600 ItemInconsistency::CreateSqlSchemaNameMismatch {
601 schema_name: vec![
602 db.name.clone(),
603 schema.name.schema.clone(),
604 ],
605 create_sql: entry.create_sql().to_string(),
606 },
607 );
608 }
609 }
610 Statement::CreateDatabase(ast::CreateDatabaseStatement {
611 name, ..
612 }) => {
613 if db.name != name.0.as_str() {
614 item_inconsistencies.push(
615 ItemInconsistency::CreateSqlDatabaseNameMismatch {
616 database_name: db.name.clone(),
617 create_sql: entry.create_sql().to_string(),
618 },
619 );
620 }
621 }
622 _ => (),
623 }
624 }
625 }
626 }
627
628 if item_inconsistencies.is_empty() {
629 Ok(())
630 } else {
631 Err(item_inconsistencies)
632 }
633 }
634}
635
636#[derive(Debug, Serialize, Clone, PartialEq)]
637enum InternalFieldsInconsistency {
638 Database(String, DatabaseId),
639 AmbientSchema(String, SchemaId),
640 Cluster(String, ClusterId),
641 Role(String, RoleId),
642 SourceReferences(CatalogItemId),
643 EntryMissingGlobalIds(CatalogItemId, Vec<GlobalId>),
644 GlobalIdsMissingEntry(GlobalId, CatalogItemId),
645}
646
647#[derive(Debug, Serialize, Clone, PartialEq)]
648enum RoleInconsistency {
649 Database(DatabaseId, RoleId),
650 Schema(SchemaId, RoleId),
651 Entry(CatalogItemId, RoleId),
652 Cluster(ClusterId, RoleId),
653 ClusterReplica(ClusterId, ReplicaId, RoleId),
654 DefaultPrivilege(DefaultPrivilegeObject),
655 RoleAuth(RoleId),
656 DefaultPrivilegeItem {
657 grantor: RoleId,
658 grantee: RoleId,
659 },
660 SystemPrivilege {
661 grantor: Option<RoleId>,
662 grantee: Option<RoleId>,
663 },
664 Membership {
665 parent: Option<RoleId>,
666 grantor: Option<RoleId>,
667 },
668}
669
670#[derive(Debug, Serialize, Clone, PartialEq)]
671enum CommentInconsistency {
672 Dangling(CommentObjectId),
674 NonRelation(CommentObjectId, usize),
676}
677
678#[derive(Debug, Serialize, Clone, PartialEq)]
679enum ObjectDependencyInconsistency {
680 MissingUses {
682 object_a: CatalogItemId,
683 object_b: CatalogItemId,
684 },
685 MissingUsedBy {
687 object_a: CatalogItemId,
688 object_b: CatalogItemId,
689 },
690 InconsistentUsedBy {
692 object_a: CatalogItemId,
693 object_b: CatalogItemId,
694 },
695 InconsistentUses {
697 object_a: CatalogItemId,
698 object_b: CatalogItemId,
699 },
700}
701
702#[derive(Debug, Serialize, Clone, PartialEq)]
703enum ItemInconsistency {
704 KeyedName {
706 db_schema_by_name: String,
707 struct_name: String,
708 },
709 MissingSchema {
711 db_id: DatabaseId,
712 schema_name: String,
713 },
714 NonExistentItem {
716 db_id: DatabaseId,
717 schema_id: SchemaSpecifier,
718 item_id: CatalogItemId,
719 },
720 ItemNameMismatch {
722 item_id: CatalogItemId,
723 map_name: String,
725 entry_name: QualifiedItemName,
727 },
728 StatementParseFailure {
730 create_sql: String,
731 e: ParserStatementError,
732 },
733 MultiCreateStatement { create_sql: String },
735 NonFullyQualifiedItemName {
737 create_sql: String,
738 name: Vec<String>,
739 },
740 NonFullyQualifiedSchemaName {
742 create_sql: String,
743 name: Vec<String>,
744 },
745 CreateSqlItemNameMismatch {
747 item_name: Vec<String>,
748 create_sql: String,
749 },
750 CreateSqlSchemaNameMismatch {
752 schema_name: Vec<String>,
753 create_sql: String,
754 },
755 CreateSqlDatabaseNameMismatch {
757 database_name: String,
758 create_sql: String,
759 },
760}