1use mz_controller_types::{ClusterId, ReplicaId};
16use mz_repr::role_id::RoleId;
17use mz_repr::{CatalogItemId, GlobalId};
18use mz_sql::catalog::{CatalogItem, DefaultPrivilegeObject};
19use mz_sql::names::{
20 CommentObjectId, DatabaseId, QualifiedItemName, ResolvedDatabaseSpecifier, SchemaId,
21 SchemaSpecifier,
22};
23use mz_sql_parser::ast::{self, Statement};
24use mz_sql_parser::parser::ParserStatementError;
25use serde::Serialize;
26
27use super::CatalogState;
29
30#[derive(Debug, Default, Clone, Serialize, PartialEq)]
31pub struct CatalogInconsistencies {
32 internal_fields: Vec<InternalFieldsInconsistency>,
34 roles: Vec<RoleInconsistency>,
36 comments: Vec<CommentInconsistency>,
38 object_dependencies: Vec<ObjectDependencyInconsistency>,
40 items: Vec<ItemInconsistency>,
42}
43
44impl CatalogInconsistencies {
45 pub fn is_empty(&self) -> bool {
46 let CatalogInconsistencies {
47 internal_fields,
48 roles,
49 comments,
50 object_dependencies,
51 items,
52 } = self;
53 internal_fields.is_empty()
54 && roles.is_empty()
55 && comments.is_empty()
56 && object_dependencies.is_empty()
57 && items.is_empty()
58 }
59}
60
61impl CatalogState {
62 pub fn check_consistency(&self) -> Result<(), Box<CatalogInconsistencies>> {
64 let mut inconsistencies = CatalogInconsistencies::default();
65
66 if let Err(internal_fields) = self.check_internal_fields() {
67 inconsistencies.internal_fields = internal_fields;
68 }
69 if let Err(roles) = self.check_roles() {
70 inconsistencies.roles = roles;
71 }
72 if let Err(comments) = self.check_comments() {
73 inconsistencies.comments = comments;
74 }
75 if let Err(dependencies) = self.check_object_dependencies() {
76 inconsistencies.object_dependencies = dependencies;
77 }
78 if let Err(items) = self.check_items() {
79 inconsistencies.items = items;
80 }
81
82 if inconsistencies.is_empty() {
83 Ok(())
84 } else {
85 Err(Box::new(inconsistencies))
86 }
87 }
88
89 fn check_internal_fields(&self) -> Result<(), Vec<InternalFieldsInconsistency>> {
96 let mut inconsistencies = Vec::new();
97 for (name, id) in &self.database_by_name {
98 if !self.database_by_id.contains_key(id) {
99 inconsistencies.push(InternalFieldsInconsistency::Database(name.clone(), *id));
100 }
101 }
102 for (name, id) in &self.ambient_schemas_by_name {
103 if !self.ambient_schemas_by_id.contains_key(id) {
104 inconsistencies.push(InternalFieldsInconsistency::AmbientSchema(
105 name.clone(),
106 *id,
107 ));
108 }
109 }
110 for (name, id) in &self.clusters_by_name {
111 if !self.clusters_by_id.contains_key(id) {
112 inconsistencies.push(InternalFieldsInconsistency::Cluster(name.clone(), *id));
113 }
114 }
115 for (name, role_id) in &self.roles_by_name {
116 if !self.roles_by_id.contains_key(role_id) {
117 inconsistencies.push(InternalFieldsInconsistency::Role(name.clone(), *role_id))
118 }
119 }
120
121 for (source_id, _references) in &self.source_references {
122 if !self.entry_by_id.contains_key(source_id) {
123 inconsistencies.push(InternalFieldsInconsistency::SourceReferences(*source_id));
124 }
125 }
126
127 for (item_id, entry) in &self.entry_by_id {
128 let missing_gids: Vec<_> = entry
129 .global_ids()
130 .filter(|gid| !self.entry_by_global_id.contains_key(gid))
131 .collect();
132 if !missing_gids.is_empty() {
133 inconsistencies.push(InternalFieldsInconsistency::EntryMissingGlobalIds(
134 *item_id,
135 missing_gids,
136 ));
137 }
138 }
139 for (gid, item_id) in &self.entry_by_global_id {
140 if !self.entry_by_id.contains_key(item_id) {
141 inconsistencies.push(InternalFieldsInconsistency::GlobalIdsMissingEntry(
142 *gid, *item_id,
143 ));
144 }
145 }
146
147 if inconsistencies.is_empty() {
148 Ok(())
149 } else {
150 Err(inconsistencies)
151 }
152 }
153
154 fn check_roles(&self) -> Result<(), Vec<RoleInconsistency>> {
159 let mut inconsistencies = Vec::new();
160 for (database_id, database) in &self.database_by_id {
161 if !self.roles_by_id.contains_key(&database.owner_id) {
162 inconsistencies.push(RoleInconsistency::Database(*database_id, database.owner_id));
163 }
164 for (schema_id, schema) in &database.schemas_by_id {
165 if !self.roles_by_id.contains_key(&schema.owner_id) {
166 inconsistencies.push(RoleInconsistency::Schema(*schema_id, schema.owner_id));
167 }
168 }
169 }
170 for (item_id, entry) in &self.entry_by_id {
171 if !self.roles_by_id.contains_key(entry.owner_id()) {
172 inconsistencies.push(RoleInconsistency::Entry(*item_id, entry.owner_id().clone()));
173 }
174 }
175 for (cluster_id, cluster) in &self.clusters_by_id {
176 if !self.roles_by_id.contains_key(&cluster.owner_id) {
177 inconsistencies.push(RoleInconsistency::Cluster(*cluster_id, cluster.owner_id));
178 }
179 for replica in cluster.replicas() {
180 if !self.roles_by_id.contains_key(&replica.owner_id) {
181 inconsistencies.push(RoleInconsistency::ClusterReplica(
182 *cluster_id,
183 replica.replica_id,
184 cluster.owner_id,
185 ));
186 }
187 }
188 }
189 for (role_id, _) in &self.role_auth_by_id {
190 if !self.roles_by_id.contains_key(role_id) {
191 inconsistencies.push(RoleInconsistency::RoleAuth(role_id.clone()));
192 }
193 }
194 for (default_priv, privileges) in self.default_privileges.iter() {
195 if !self.roles_by_id.contains_key(&default_priv.role_id) {
196 inconsistencies.push(RoleInconsistency::DefaultPrivilege(default_priv.clone()));
197 }
198 for acl_item in privileges {
199 if !self.roles_by_id.contains_key(&acl_item.grantee) {
200 inconsistencies.push(RoleInconsistency::DefaultPrivilegeItem {
201 grantor: default_priv.role_id,
202 grantee: acl_item.grantee,
203 });
204 }
205 }
206 }
207 for acl in self.system_privileges.all_values() {
208 let grantor = self.roles_by_id.get(&acl.grantor);
209 let grantee = self.roles_by_id.get(&acl.grantee);
210
211 let inconsistency = match (grantor, grantee) {
212 (None, None) => RoleInconsistency::SystemPrivilege {
213 grantor: Some(acl.grantor),
214 grantee: Some(acl.grantee),
215 },
216 (Some(_), None) => RoleInconsistency::SystemPrivilege {
217 grantor: None,
218 grantee: Some(acl.grantee),
219 },
220 (None, Some(_)) => RoleInconsistency::SystemPrivilege {
221 grantor: Some(acl.grantor),
222 grantee: None,
223 },
224 (Some(_), Some(_)) => continue,
225 };
226 inconsistencies.push(inconsistency);
227 }
228 for role in self.roles_by_id.values() {
229 for (parent_id, grantor_id) in &role.membership.map {
230 let parent = self.roles_by_id.get(parent_id);
231 let grantor = self.roles_by_id.get(grantor_id);
232 let inconsistency = match (parent, grantor) {
233 (None, None) => RoleInconsistency::Membership {
234 parent: Some(*parent_id),
235 grantor: Some(*grantor_id),
236 },
237 (Some(_), None) => RoleInconsistency::Membership {
238 parent: None,
239 grantor: Some(*grantor_id),
240 },
241 (None, Some(_)) => RoleInconsistency::Membership {
242 parent: Some(*parent_id),
243 grantor: None,
244 },
245 (Some(_), Some(_)) => continue,
246 };
247 inconsistencies.push(inconsistency);
248 }
249 }
250
251 if inconsistencies.is_empty() {
252 Ok(())
253 } else {
254 Err(inconsistencies)
255 }
256 }
257
258 fn check_comments(&self) -> Result<(), Vec<CommentInconsistency>> {
264 let mut comment_inconsistencies = Vec::new();
265 for (comment_object_id, col_pos, _comment) in self.comments.iter() {
266 match comment_object_id {
267 CommentObjectId::Table(item_id)
268 | CommentObjectId::View(item_id)
269 | CommentObjectId::MaterializedView(item_id)
270 | CommentObjectId::Source(item_id)
271 | CommentObjectId::Sink(item_id)
272 | CommentObjectId::Index(item_id)
273 | CommentObjectId::Func(item_id)
274 | CommentObjectId::Connection(item_id)
275 | CommentObjectId::Type(item_id)
276 | CommentObjectId::Secret(item_id) => {
277 let entry = self.entry_by_id.get(&item_id);
278 match entry {
279 None => comment_inconsistencies
280 .push(CommentInconsistency::Dangling(comment_object_id)),
281 Some(entry) => {
282 #[allow(clippy::unnecessary_unwrap)]
284 if !entry.has_columns() && col_pos.is_some() {
285 let col_pos = col_pos.expect("checked above");
286 comment_inconsistencies.push(CommentInconsistency::NonRelation(
287 comment_object_id,
288 col_pos,
289 ));
290 }
291 }
292 }
293 }
294 CommentObjectId::NetworkPolicy(network_policy_id) => {
295 if !self.network_policies_by_id.contains_key(&network_policy_id) {
296 comment_inconsistencies
297 .push(CommentInconsistency::Dangling(comment_object_id));
298 }
299 }
300
301 CommentObjectId::Role(role_id) => {
302 if !self.roles_by_id.contains_key(&role_id) {
303 comment_inconsistencies
304 .push(CommentInconsistency::Dangling(comment_object_id));
305 }
306 }
307 CommentObjectId::Database(database_id) => {
308 if !self.database_by_id.contains_key(&database_id) {
309 comment_inconsistencies
310 .push(CommentInconsistency::Dangling(comment_object_id));
311 }
312 }
313 CommentObjectId::Schema((database, schema)) => {
314 match (database, schema) {
315 (
316 ResolvedDatabaseSpecifier::Id(database_id),
317 SchemaSpecifier::Id(schema_id),
318 ) => {
319 let schema = self
320 .database_by_id
321 .get(&database_id)
322 .and_then(|database| database.schemas_by_id.get(&schema_id));
323 if schema.is_none() {
324 comment_inconsistencies
325 .push(CommentInconsistency::Dangling(comment_object_id));
326 }
327 }
328 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(schema_id)) => {
329 if !self.ambient_schemas_by_id.contains_key(&schema_id) {
330 comment_inconsistencies
331 .push(CommentInconsistency::Dangling(comment_object_id));
332 }
333 }
334 (ResolvedDatabaseSpecifier::Id(_id), SchemaSpecifier::Temporary) => (),
336 (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => (),
338 }
339 }
340 CommentObjectId::Cluster(cluster_id) => {
341 if !self.clusters_by_id.contains_key(&cluster_id) {
342 comment_inconsistencies
343 .push(CommentInconsistency::Dangling(comment_object_id));
344 }
345 }
346 CommentObjectId::ClusterReplica((cluster_id, replica_id)) => {
347 let replica = self
348 .clusters_by_id
349 .get(&cluster_id)
350 .and_then(|cluster| cluster.replica(replica_id));
351 if replica.is_none() {
352 comment_inconsistencies
353 .push(CommentInconsistency::Dangling(comment_object_id));
354 }
355 }
356 }
357 }
358
359 if comment_inconsistencies.is_empty() {
360 Ok(())
361 } else {
362 Err(comment_inconsistencies)
363 }
364 }
365
366 fn check_object_dependencies(&self) -> Result<(), Vec<ObjectDependencyInconsistency>> {
374 let mut dependency_inconsistencies = vec![];
375
376 for (id, entry) in &self.entry_by_id {
377 for referenced_id in entry.references().items() {
378 let Some(referenced_entry) = self.entry_by_id.get(referenced_id) else {
379 dependency_inconsistencies.push(ObjectDependencyInconsistency::MissingUses {
380 object_a: *id,
381 object_b: *referenced_id,
382 });
383 continue;
384 };
385 if !referenced_entry.referenced_by().contains(id) && referenced_entry.id() != *id {
386 dependency_inconsistencies.push(
387 ObjectDependencyInconsistency::InconsistentUsedBy {
388 object_a: *id,
389 object_b: *referenced_id,
390 },
391 );
392 }
393 }
394 for used_id in entry.uses() {
395 let Some(used_entry) = self.entry_by_id.get(&used_id) else {
396 dependency_inconsistencies.push(ObjectDependencyInconsistency::MissingUses {
397 object_a: *id,
398 object_b: used_id,
399 });
400 continue;
401 };
402 if !used_entry.used_by().contains(id) && used_entry.id() != *id {
403 dependency_inconsistencies.push(
404 ObjectDependencyInconsistency::InconsistentUsedBy {
405 object_a: *id,
406 object_b: used_id,
407 },
408 );
409 }
410 }
411
412 for referenced_by in entry.referenced_by() {
413 let Some(referenced_by_entry) = self.entry_by_id.get(referenced_by) else {
414 dependency_inconsistencies.push(ObjectDependencyInconsistency::MissingUsedBy {
415 object_a: *id,
416 object_b: *referenced_by,
417 });
418 continue;
419 };
420 if !referenced_by_entry.references().contains_item(id) {
421 dependency_inconsistencies.push(
422 ObjectDependencyInconsistency::InconsistentUses {
423 object_a: *id,
424 object_b: *referenced_by,
425 },
426 );
427 }
428 }
429 for used_by in entry.used_by() {
430 let Some(used_by_entry) = self.entry_by_id.get(used_by) else {
431 dependency_inconsistencies.push(ObjectDependencyInconsistency::MissingUsedBy {
432 object_a: *id,
433 object_b: *used_by,
434 });
435 continue;
436 };
437 if !used_by_entry.uses().contains(id) {
438 dependency_inconsistencies.push(
439 ObjectDependencyInconsistency::InconsistentUses {
440 object_a: *id,
441 object_b: *used_by,
442 },
443 );
444 }
445 }
446 }
447
448 if dependency_inconsistencies.is_empty() {
449 Ok(())
450 } else {
451 Err(dependency_inconsistencies)
452 }
453 }
454
455 fn check_items(&self) -> Result<(), Vec<ItemInconsistency>> {
467 let mut item_inconsistencies = vec![];
468
469 for (db_id, db) in &self.database_by_id {
470 for (schema_name, schema_id) in &db.schemas_by_name {
471 let Some(schema) = db.schemas_by_id.get(schema_id) else {
473 item_inconsistencies.push(ItemInconsistency::MissingSchema {
474 db_id: *db_id,
475 schema_name: schema_name.clone(),
476 });
477 continue;
478 };
479 if schema_name != &schema.name.schema {
480 item_inconsistencies.push(ItemInconsistency::KeyedName {
481 db_schema_by_name: schema_name.clone(),
482 struct_name: schema.name.schema.clone(),
483 });
484 }
485
486 for (item_name, item_id) in &schema.items {
488 let Some(entry) = self.entry_by_id.get(item_id) else {
489 item_inconsistencies.push(ItemInconsistency::NonExistentItem {
490 db_id: *db_id,
491 schema_id: schema.id,
492 item_id: *item_id,
493 });
494 continue;
495 };
496 if item_name != &entry.name().item {
497 item_inconsistencies.push(ItemInconsistency::ItemNameMismatch {
498 item_id: *item_id,
499 map_name: item_name.clone(),
500 entry_name: entry.name().clone(),
501 });
502 }
503 let statement = match mz_sql::parse::parse(entry.create_sql()) {
504 Ok(mut statements) if statements.len() == 1 => {
505 let statement = statements.pop().expect("checked length");
506 statement.ast
507 }
508 Ok(_) => {
509 item_inconsistencies.push(ItemInconsistency::MultiCreateStatement {
510 create_sql: entry.create_sql().to_string(),
511 });
512 continue;
513 }
514 Err(e) => {
515 item_inconsistencies.push(ItemInconsistency::StatementParseFailure {
516 create_sql: entry.create_sql().to_string(),
517 e,
518 });
519 continue;
520 }
521 };
522 match statement {
523 Statement::CreateConnection(ast::CreateConnectionStatement {
524 name,
525 ..
526 })
527 | Statement::CreateWebhookSource(ast::CreateWebhookSourceStatement {
528 name,
529 ..
530 })
531 | Statement::CreateSource(ast::CreateSourceStatement { name, .. })
532 | Statement::CreateSubsource(ast::CreateSubsourceStatement {
533 name, ..
534 })
535 | Statement::CreateSink(ast::CreateSinkStatement {
536 name: Some(name),
537 ..
538 })
539 | Statement::CreateView(ast::CreateViewStatement {
540 definition: ast::ViewDefinition { name, .. },
541 ..
542 })
543 | Statement::CreateMaterializedView(
544 ast::CreateMaterializedViewStatement { name, .. },
545 )
546 | Statement::CreateTable(ast::CreateTableStatement { name, .. })
547 | Statement::CreateType(ast::CreateTypeStatement { name, .. })
548 | Statement::CreateSecret(ast::CreateSecretStatement { name, .. }) => {
549 let [db_component, schema_component, item_component] = &name.0[..]
550 else {
551 let name =
552 name.0.into_iter().map(|ident| ident.to_string()).collect();
553 item_inconsistencies.push(
554 ItemInconsistency::NonFullyQualifiedItemName {
555 create_sql: entry.create_sql().to_string(),
556 name,
557 },
558 );
559 continue;
560 };
561 if db_component.as_str() != &db.name
562 || schema_component.as_str() != &schema.name.schema
563 || item_component.as_str() != &entry.name().item
564 {
565 item_inconsistencies.push(
566 ItemInconsistency::CreateSqlItemNameMismatch {
567 item_name: vec![
568 db.name.clone(),
569 schema.name.schema.clone(),
570 entry.name().item.clone(),
571 ],
572 create_sql: entry.create_sql().to_string(),
573 },
574 );
575 }
576 }
577 Statement::CreateSchema(ast::CreateSchemaStatement { name, .. }) => {
578 let [db_component, schema_component] = &name.0[..] else {
579 let name =
580 name.0.into_iter().map(|ident| ident.to_string()).collect();
581 item_inconsistencies.push(
582 ItemInconsistency::NonFullyQualifiedSchemaName {
583 create_sql: entry.create_sql().to_string(),
584 name,
585 },
586 );
587 continue;
588 };
589 if db_component.as_str() != &db.name
590 || schema_component.as_str() != &schema.name.schema
591 {
592 item_inconsistencies.push(
593 ItemInconsistency::CreateSqlSchemaNameMismatch {
594 schema_name: vec![
595 db.name.clone(),
596 schema.name.schema.clone(),
597 ],
598 create_sql: entry.create_sql().to_string(),
599 },
600 );
601 }
602 }
603 Statement::CreateDatabase(ast::CreateDatabaseStatement {
604 name, ..
605 }) => {
606 if db.name != name.0.as_str() {
607 item_inconsistencies.push(
608 ItemInconsistency::CreateSqlDatabaseNameMismatch {
609 database_name: db.name.clone(),
610 create_sql: entry.create_sql().to_string(),
611 },
612 );
613 }
614 }
615 _ => (),
616 }
617 }
618 }
619 }
620
621 if item_inconsistencies.is_empty() {
622 Ok(())
623 } else {
624 Err(item_inconsistencies)
625 }
626 }
627}
628
629#[derive(Debug, Serialize, Clone, PartialEq)]
630enum InternalFieldsInconsistency {
631 Database(String, DatabaseId),
632 AmbientSchema(String, SchemaId),
633 Cluster(String, ClusterId),
634 Role(String, RoleId),
635 SourceReferences(CatalogItemId),
636 EntryMissingGlobalIds(CatalogItemId, Vec<GlobalId>),
637 GlobalIdsMissingEntry(GlobalId, CatalogItemId),
638}
639
640#[derive(Debug, Serialize, Clone, PartialEq)]
641enum RoleInconsistency {
642 Database(DatabaseId, RoleId),
643 Schema(SchemaId, RoleId),
644 Entry(CatalogItemId, RoleId),
645 Cluster(ClusterId, RoleId),
646 ClusterReplica(ClusterId, ReplicaId, RoleId),
647 DefaultPrivilege(DefaultPrivilegeObject),
648 RoleAuth(RoleId),
649 DefaultPrivilegeItem {
650 grantor: RoleId,
651 grantee: RoleId,
652 },
653 SystemPrivilege {
654 grantor: Option<RoleId>,
655 grantee: Option<RoleId>,
656 },
657 Membership {
658 parent: Option<RoleId>,
659 grantor: Option<RoleId>,
660 },
661}
662
663#[derive(Debug, Serialize, Clone, PartialEq)]
664enum CommentInconsistency {
665 Dangling(CommentObjectId),
667 NonRelation(CommentObjectId, usize),
669}
670
671#[derive(Debug, Serialize, Clone, PartialEq)]
672enum ObjectDependencyInconsistency {
673 MissingUses {
675 object_a: CatalogItemId,
676 object_b: CatalogItemId,
677 },
678 MissingUsedBy {
680 object_a: CatalogItemId,
681 object_b: CatalogItemId,
682 },
683 InconsistentUsedBy {
685 object_a: CatalogItemId,
686 object_b: CatalogItemId,
687 },
688 InconsistentUses {
690 object_a: CatalogItemId,
691 object_b: CatalogItemId,
692 },
693}
694
695#[derive(Debug, Serialize, Clone, PartialEq)]
696enum ItemInconsistency {
697 KeyedName {
699 db_schema_by_name: String,
700 struct_name: String,
701 },
702 MissingSchema {
704 db_id: DatabaseId,
705 schema_name: String,
706 },
707 NonExistentItem {
709 db_id: DatabaseId,
710 schema_id: SchemaSpecifier,
711 item_id: CatalogItemId,
712 },
713 ItemNameMismatch {
715 item_id: CatalogItemId,
716 map_name: String,
718 entry_name: QualifiedItemName,
720 },
721 StatementParseFailure {
723 create_sql: String,
724 e: ParserStatementError,
725 },
726 MultiCreateStatement { create_sql: String },
728 NonFullyQualifiedItemName {
730 create_sql: String,
731 name: Vec<String>,
732 },
733 NonFullyQualifiedSchemaName {
735 create_sql: String,
736 name: Vec<String>,
737 },
738 CreateSqlItemNameMismatch {
740 item_name: Vec<String>,
741 create_sql: String,
742 },
743 CreateSqlSchemaNameMismatch {
745 schema_name: Vec<String>,
746 create_sql: String,
747 },
748 CreateSqlDatabaseNameMismatch {
750 database_name: String,
751 create_sql: String,
752 },
753}