mz_adapter/catalog/
consistency.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Internal consistency checks that validate invariants of [`CatalogState`].
11//!
12//! Note: the implementation of consistency checks should favor simplicity over performance, to
13//! make it as easy as possible to understand what a given check is doing.
14
15use mz_controller_types::{ClusterId, ReplicaId};
16use mz_repr::role_id::RoleId;
17use mz_repr::{CatalogItemId, GlobalId};
18use mz_sql::catalog::{CatalogItem, DefaultPrivilegeObject};
19use mz_sql::names::{
20    CommentObjectId, DatabaseId, QualifiedItemName, ResolvedDatabaseSpecifier, SchemaId,
21    SchemaSpecifier,
22};
23use mz_sql_parser::ast::{self, Statement};
24use mz_sql_parser::parser::ParserStatementError;
25use serde::Serialize;
26
27// DO NOT add any more imports from `crate` outside of `crate::catalog`.
28use super::CatalogState;
29
30#[derive(Debug, Default, Clone, Serialize, PartialEq)]
31pub struct CatalogInconsistencies {
32    /// Inconsistencies found with internal fields, if any.
33    internal_fields: Vec<InternalFieldsInconsistency>,
34    /// Inconsistencies found with roles, if any.
35    roles: Vec<RoleInconsistency>,
36    /// Inconsistencies found with comments, if any.
37    comments: Vec<CommentInconsistency>,
38    /// Inconsistencies found with object dependencies, if any.
39    object_dependencies: Vec<ObjectDependencyInconsistency>,
40    /// Inconsistencies found with items in the catalog, if any.
41    items: Vec<ItemInconsistency>,
42}
43
44impl CatalogInconsistencies {
45    pub fn is_empty(&self) -> bool {
46        let CatalogInconsistencies {
47            internal_fields,
48            roles,
49            comments,
50            object_dependencies,
51            items,
52        } = self;
53        internal_fields.is_empty()
54            && roles.is_empty()
55            && comments.is_empty()
56            && object_dependencies.is_empty()
57            && items.is_empty()
58    }
59}
60
61impl CatalogState {
62    /// Checks the [`CatalogState`] to make sure we're internally consistent.
63    pub fn check_consistency(&self) -> Result<(), Box<CatalogInconsistencies>> {
64        let mut inconsistencies = CatalogInconsistencies::default();
65
66        if let Err(internal_fields) = self.check_internal_fields() {
67            inconsistencies.internal_fields = internal_fields;
68        }
69        if let Err(roles) = self.check_roles() {
70            inconsistencies.roles = roles;
71        }
72        if let Err(comments) = self.check_comments() {
73            inconsistencies.comments = comments;
74        }
75        if let Err(dependencies) = self.check_object_dependencies() {
76            inconsistencies.object_dependencies = dependencies;
77        }
78        if let Err(items) = self.check_items() {
79            inconsistencies.items = items;
80        }
81
82        if inconsistencies.is_empty() {
83            Ok(())
84        } else {
85            Err(Box::new(inconsistencies))
86        }
87    }
88
89    /// # Invariants:
90    ///
91    /// * Any fields within [`CatalogState`] that reference another field need to be kept in sync.
92    ///
93    /// TODO(parkmycar): Check the reverse direction for these collections, e.g. all of the
94    /// `DatabaseId`s in `database_by_id` also exist in `database_by_name`.
95    fn check_internal_fields(&self) -> Result<(), Vec<InternalFieldsInconsistency>> {
96        let mut inconsistencies = Vec::new();
97        for (name, id) in &self.database_by_name {
98            if !self.database_by_id.contains_key(id) {
99                inconsistencies.push(InternalFieldsInconsistency::Database(name.clone(), *id));
100            }
101        }
102        for (name, id) in &self.ambient_schemas_by_name {
103            if !self.ambient_schemas_by_id.contains_key(id) {
104                inconsistencies.push(InternalFieldsInconsistency::AmbientSchema(
105                    name.clone(),
106                    *id,
107                ));
108            }
109        }
110        for (name, id) in &self.clusters_by_name {
111            if !self.clusters_by_id.contains_key(id) {
112                inconsistencies.push(InternalFieldsInconsistency::Cluster(name.clone(), *id));
113            }
114        }
115        for (name, role_id) in &self.roles_by_name {
116            if !self.roles_by_id.contains_key(role_id) {
117                inconsistencies.push(InternalFieldsInconsistency::Role(name.clone(), *role_id))
118            }
119        }
120
121        for (source_id, _references) in &self.source_references {
122            if !self.entry_by_id.contains_key(source_id) {
123                inconsistencies.push(InternalFieldsInconsistency::SourceReferences(*source_id));
124            }
125        }
126
127        for (item_id, entry) in &self.entry_by_id {
128            let missing_gids: Vec<_> = entry
129                .global_ids()
130                .filter(|gid| !self.entry_by_global_id.contains_key(gid))
131                .collect();
132            if !missing_gids.is_empty() {
133                inconsistencies.push(InternalFieldsInconsistency::EntryMissingGlobalIds(
134                    *item_id,
135                    missing_gids,
136                ));
137            }
138        }
139        for (gid, item_id) in &self.entry_by_global_id {
140            if !self.entry_by_id.contains_key(item_id) {
141                inconsistencies.push(InternalFieldsInconsistency::GlobalIdsMissingEntry(
142                    *gid, *item_id,
143                ));
144            }
145        }
146
147        if inconsistencies.is_empty() {
148            Ok(())
149        } else {
150            Err(inconsistencies)
151        }
152    }
153
154    /// # Invariants:
155    ///
156    /// * All RoleIds referenced from other objects must exist.
157    ///
158    fn check_roles(&self) -> Result<(), Vec<RoleInconsistency>> {
159        let mut inconsistencies = Vec::new();
160        for (database_id, database) in &self.database_by_id {
161            if !self.roles_by_id.contains_key(&database.owner_id) {
162                inconsistencies.push(RoleInconsistency::Database(*database_id, database.owner_id));
163            }
164            for (schema_id, schema) in &database.schemas_by_id {
165                if !self.roles_by_id.contains_key(&schema.owner_id) {
166                    inconsistencies.push(RoleInconsistency::Schema(*schema_id, schema.owner_id));
167                }
168            }
169        }
170        for (item_id, entry) in &self.entry_by_id {
171            if !self.roles_by_id.contains_key(entry.owner_id()) {
172                inconsistencies.push(RoleInconsistency::Entry(*item_id, entry.owner_id().clone()));
173            }
174        }
175        for (cluster_id, cluster) in &self.clusters_by_id {
176            if !self.roles_by_id.contains_key(&cluster.owner_id) {
177                inconsistencies.push(RoleInconsistency::Cluster(*cluster_id, cluster.owner_id));
178            }
179            for replica in cluster.replicas() {
180                if !self.roles_by_id.contains_key(&replica.owner_id) {
181                    inconsistencies.push(RoleInconsistency::ClusterReplica(
182                        *cluster_id,
183                        replica.replica_id,
184                        cluster.owner_id,
185                    ));
186                }
187            }
188        }
189        for (role_id, _) in &self.role_auth_by_id {
190            if !self.roles_by_id.contains_key(role_id) {
191                inconsistencies.push(RoleInconsistency::RoleAuth(role_id.clone()));
192            }
193        }
194        for (default_priv, privileges) in self.default_privileges.iter() {
195            if !self.roles_by_id.contains_key(&default_priv.role_id) {
196                inconsistencies.push(RoleInconsistency::DefaultPrivilege(default_priv.clone()));
197            }
198            for acl_item in privileges {
199                if !self.roles_by_id.contains_key(&acl_item.grantee) {
200                    inconsistencies.push(RoleInconsistency::DefaultPrivilegeItem {
201                        grantor: default_priv.role_id,
202                        grantee: acl_item.grantee,
203                    });
204                }
205            }
206        }
207        for acl in self.system_privileges.all_values() {
208            let grantor = self.roles_by_id.get(&acl.grantor);
209            let grantee = self.roles_by_id.get(&acl.grantee);
210
211            let inconsistency = match (grantor, grantee) {
212                (None, None) => RoleInconsistency::SystemPrivilege {
213                    grantor: Some(acl.grantor),
214                    grantee: Some(acl.grantee),
215                },
216                (Some(_), None) => RoleInconsistency::SystemPrivilege {
217                    grantor: None,
218                    grantee: Some(acl.grantee),
219                },
220                (None, Some(_)) => RoleInconsistency::SystemPrivilege {
221                    grantor: Some(acl.grantor),
222                    grantee: None,
223                },
224                (Some(_), Some(_)) => continue,
225            };
226            inconsistencies.push(inconsistency);
227        }
228        for role in self.roles_by_id.values() {
229            for (parent_id, grantor_id) in &role.membership.map {
230                let parent = self.roles_by_id.get(parent_id);
231                let grantor = self.roles_by_id.get(grantor_id);
232                let inconsistency = match (parent, grantor) {
233                    (None, None) => RoleInconsistency::Membership {
234                        parent: Some(*parent_id),
235                        grantor: Some(*grantor_id),
236                    },
237                    (Some(_), None) => RoleInconsistency::Membership {
238                        parent: None,
239                        grantor: Some(*grantor_id),
240                    },
241                    (None, Some(_)) => RoleInconsistency::Membership {
242                        parent: Some(*parent_id),
243                        grantor: None,
244                    },
245                    (Some(_), Some(_)) => continue,
246                };
247                inconsistencies.push(inconsistency);
248            }
249        }
250
251        if inconsistencies.is_empty() {
252            Ok(())
253        } else {
254            Err(inconsistencies)
255        }
256    }
257
258    /// # Invariants:
259    ///
260    /// * Comments should only reference existing objects.
261    /// * A comment should only have a column position if it references a relation.
262    ///
263    fn check_comments(&self) -> Result<(), Vec<CommentInconsistency>> {
264        let mut comment_inconsistencies = Vec::new();
265        for (comment_object_id, col_pos, _comment) in self.comments.iter() {
266            match comment_object_id {
267                CommentObjectId::Table(item_id)
268                | CommentObjectId::View(item_id)
269                | CommentObjectId::MaterializedView(item_id)
270                | CommentObjectId::Source(item_id)
271                | CommentObjectId::Sink(item_id)
272                | CommentObjectId::Index(item_id)
273                | CommentObjectId::Func(item_id)
274                | CommentObjectId::Connection(item_id)
275                | CommentObjectId::Type(item_id)
276                | CommentObjectId::Secret(item_id)
277                | CommentObjectId::ContinualTask(item_id) => {
278                    let entry = self.entry_by_id.get(&item_id);
279                    match entry {
280                        None => comment_inconsistencies
281                            .push(CommentInconsistency::Dangling(comment_object_id)),
282                        Some(entry) => {
283                            // TODO: Refactor this to use if-let chains, once they're stable.
284                            #[allow(clippy::unnecessary_unwrap)]
285                            if !entry.has_columns() && col_pos.is_some() {
286                                let col_pos = col_pos.expect("checked above");
287                                comment_inconsistencies.push(CommentInconsistency::NonRelation(
288                                    comment_object_id,
289                                    col_pos,
290                                ));
291                            }
292                        }
293                    }
294                }
295                CommentObjectId::NetworkPolicy(network_policy_id) => {
296                    if !self.network_policies_by_id.contains_key(&network_policy_id) {
297                        comment_inconsistencies
298                            .push(CommentInconsistency::Dangling(comment_object_id));
299                    }
300                }
301
302                CommentObjectId::Role(role_id) => {
303                    if !self.roles_by_id.contains_key(&role_id) {
304                        comment_inconsistencies
305                            .push(CommentInconsistency::Dangling(comment_object_id));
306                    }
307                }
308                CommentObjectId::Database(database_id) => {
309                    if !self.database_by_id.contains_key(&database_id) {
310                        comment_inconsistencies
311                            .push(CommentInconsistency::Dangling(comment_object_id));
312                    }
313                }
314                CommentObjectId::Schema((database, schema)) => {
315                    match (database, schema) {
316                        (
317                            ResolvedDatabaseSpecifier::Id(database_id),
318                            SchemaSpecifier::Id(schema_id),
319                        ) => {
320                            let schema = self
321                                .database_by_id
322                                .get(&database_id)
323                                .and_then(|database| database.schemas_by_id.get(&schema_id));
324                            if schema.is_none() {
325                                comment_inconsistencies
326                                    .push(CommentInconsistency::Dangling(comment_object_id));
327                            }
328                        }
329                        (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(schema_id)) => {
330                            if !self.ambient_schemas_by_id.contains_key(&schema_id) {
331                                comment_inconsistencies
332                                    .push(CommentInconsistency::Dangling(comment_object_id));
333                            }
334                        }
335                        // Temporary schemas are in the ambient database.
336                        (ResolvedDatabaseSpecifier::Id(_id), SchemaSpecifier::Temporary) => (),
337                        // TODO: figure out how to check for consistency in this case.
338                        (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => (),
339                    }
340                }
341                CommentObjectId::Cluster(cluster_id) => {
342                    if !self.clusters_by_id.contains_key(&cluster_id) {
343                        comment_inconsistencies
344                            .push(CommentInconsistency::Dangling(comment_object_id));
345                    }
346                }
347                CommentObjectId::ClusterReplica((cluster_id, replica_id)) => {
348                    let replica = self
349                        .clusters_by_id
350                        .get(&cluster_id)
351                        .and_then(|cluster| cluster.replica(replica_id));
352                    if replica.is_none() {
353                        comment_inconsistencies
354                            .push(CommentInconsistency::Dangling(comment_object_id));
355                    }
356                }
357            }
358        }
359
360        if comment_inconsistencies.is_empty() {
361            Ok(())
362        } else {
363            Err(comment_inconsistencies)
364        }
365    }
366
367    /// # Invariants:
368    ///
369    /// * All of the objects in the "uses" collection of a CatalogEntry, should contain said
370    ///   CatalogEntry in their own "used_by" collection.
371    /// * All of the objects in the "used_by" collection of a CatalogEntry, should contain said
372    ///   CatalogEntry in their own "uses" collection.
373    ///
374    fn check_object_dependencies(&self) -> Result<(), Vec<ObjectDependencyInconsistency>> {
375        let mut dependency_inconsistencies = vec![];
376
377        for (id, entry) in &self.entry_by_id {
378            for referenced_id in entry.references().items() {
379                let Some(referenced_entry) = self.entry_by_id.get(referenced_id) else {
380                    dependency_inconsistencies.push(ObjectDependencyInconsistency::MissingUses {
381                        object_a: *id,
382                        object_b: *referenced_id,
383                    });
384                    continue;
385                };
386                if !referenced_entry.referenced_by().contains(id)
387                    // Continual Tasks are self referential.
388                    && (referenced_entry.id() != *id && !referenced_entry.is_continual_task())
389                {
390                    dependency_inconsistencies.push(
391                        ObjectDependencyInconsistency::InconsistentUsedBy {
392                            object_a: *id,
393                            object_b: *referenced_id,
394                        },
395                    );
396                }
397            }
398            for used_id in entry.uses() {
399                let Some(used_entry) = self.entry_by_id.get(&used_id) else {
400                    dependency_inconsistencies.push(ObjectDependencyInconsistency::MissingUses {
401                        object_a: *id,
402                        object_b: used_id,
403                    });
404                    continue;
405                };
406                if !used_entry.used_by().contains(id)
407                    // Continual Tasks are self referential.
408                    && (used_entry.id() != *id && !used_entry.is_continual_task())
409                {
410                    dependency_inconsistencies.push(
411                        ObjectDependencyInconsistency::InconsistentUsedBy {
412                            object_a: *id,
413                            object_b: used_id,
414                        },
415                    );
416                }
417            }
418
419            for referenced_by in entry.referenced_by() {
420                let Some(referenced_by_entry) = self.entry_by_id.get(referenced_by) else {
421                    dependency_inconsistencies.push(ObjectDependencyInconsistency::MissingUsedBy {
422                        object_a: *id,
423                        object_b: *referenced_by,
424                    });
425                    continue;
426                };
427                if !referenced_by_entry.references().contains_item(id) {
428                    dependency_inconsistencies.push(
429                        ObjectDependencyInconsistency::InconsistentUses {
430                            object_a: *id,
431                            object_b: *referenced_by,
432                        },
433                    );
434                }
435            }
436            for used_by in entry.used_by() {
437                let Some(used_by_entry) = self.entry_by_id.get(used_by) else {
438                    dependency_inconsistencies.push(ObjectDependencyInconsistency::MissingUsedBy {
439                        object_a: *id,
440                        object_b: *used_by,
441                    });
442                    continue;
443                };
444                if !used_by_entry.uses().contains(id) {
445                    dependency_inconsistencies.push(
446                        ObjectDependencyInconsistency::InconsistentUses {
447                            object_a: *id,
448                            object_b: *used_by,
449                        },
450                    );
451                }
452            }
453        }
454
455        if dependency_inconsistencies.is_empty() {
456            Ok(())
457        } else {
458            Err(dependency_inconsistencies)
459        }
460    }
461
462    /// # Invariants
463    ///
464    /// * Every schema that exists in the `schemas_by_name` map, also exists in `schemas_by_id`.
465    /// * The name present in the `schemas_by_name` map matches the name in the associated `Schema`
466    ///   struct.
467    /// * All items that exist in a `Schema` struct, also exist in the `entries_by_id` map.
468    /// * Parsing the `create_sql` string from an `Entry` succeeds.
469    /// * The result of parsing the `create_sql` must return a single `Statement`.
470    /// * The names in the returned `Statement`, must match that of the parent struct.
471    /// * The item from the parsed `create_sql` must be fully qualified.
472    ///
473    fn check_items(&self) -> Result<(), Vec<ItemInconsistency>> {
474        let mut item_inconsistencies = vec![];
475
476        for (db_id, db) in &self.database_by_id {
477            for (schema_name, schema_id) in &db.schemas_by_name {
478                // Make sure the schema themselves are consistent.
479                let Some(schema) = db.schemas_by_id.get(schema_id) else {
480                    item_inconsistencies.push(ItemInconsistency::MissingSchema {
481                        db_id: *db_id,
482                        schema_name: schema_name.clone(),
483                    });
484                    continue;
485                };
486                if schema_name != &schema.name.schema {
487                    item_inconsistencies.push(ItemInconsistency::KeyedName {
488                        db_schema_by_name: schema_name.clone(),
489                        struct_name: schema.name.schema.clone(),
490                    });
491                }
492
493                // Make sure the items in the schema are consistent.
494                for (item_name, item_id) in &schema.items {
495                    let Some(entry) = self.entry_by_id.get(item_id) else {
496                        item_inconsistencies.push(ItemInconsistency::NonExistentItem {
497                            db_id: *db_id,
498                            schema_id: schema.id,
499                            item_id: *item_id,
500                        });
501                        continue;
502                    };
503                    if item_name != &entry.name().item {
504                        item_inconsistencies.push(ItemInconsistency::ItemNameMismatch {
505                            item_id: *item_id,
506                            map_name: item_name.clone(),
507                            entry_name: entry.name().clone(),
508                        });
509                    }
510                    let statement = match mz_sql::parse::parse(entry.create_sql()) {
511                        Ok(mut statements) if statements.len() == 1 => {
512                            let statement = statements.pop().expect("checked length");
513                            statement.ast
514                        }
515                        Ok(_) => {
516                            item_inconsistencies.push(ItemInconsistency::MultiCreateStatement {
517                                create_sql: entry.create_sql().to_string(),
518                            });
519                            continue;
520                        }
521                        Err(e) => {
522                            item_inconsistencies.push(ItemInconsistency::StatementParseFailure {
523                                create_sql: entry.create_sql().to_string(),
524                                e,
525                            });
526                            continue;
527                        }
528                    };
529                    match statement {
530                        Statement::CreateConnection(ast::CreateConnectionStatement {
531                            name,
532                            ..
533                        })
534                        | Statement::CreateWebhookSource(ast::CreateWebhookSourceStatement {
535                            name,
536                            ..
537                        })
538                        | Statement::CreateSource(ast::CreateSourceStatement { name, .. })
539                        | Statement::CreateSubsource(ast::CreateSubsourceStatement {
540                            name, ..
541                        })
542                        | Statement::CreateSink(ast::CreateSinkStatement {
543                            name: Some(name),
544                            ..
545                        })
546                        | Statement::CreateView(ast::CreateViewStatement {
547                            definition: ast::ViewDefinition { name, .. },
548                            ..
549                        })
550                        | Statement::CreateMaterializedView(
551                            ast::CreateMaterializedViewStatement { name, .. },
552                        )
553                        | Statement::CreateTable(ast::CreateTableStatement { name, .. })
554                        | Statement::CreateType(ast::CreateTypeStatement { name, .. })
555                        | Statement::CreateSecret(ast::CreateSecretStatement { name, .. }) => {
556                            let [db_component, schema_component, item_component] = &name.0[..]
557                            else {
558                                let name =
559                                    name.0.into_iter().map(|ident| ident.to_string()).collect();
560                                item_inconsistencies.push(
561                                    ItemInconsistency::NonFullyQualifiedItemName {
562                                        create_sql: entry.create_sql().to_string(),
563                                        name,
564                                    },
565                                );
566                                continue;
567                            };
568                            if db_component.as_str() != &db.name
569                                || schema_component.as_str() != &schema.name.schema
570                                || item_component.as_str() != &entry.name().item
571                            {
572                                item_inconsistencies.push(
573                                    ItemInconsistency::CreateSqlItemNameMismatch {
574                                        item_name: vec![
575                                            db.name.clone(),
576                                            schema.name.schema.clone(),
577                                            entry.name().item.clone(),
578                                        ],
579                                        create_sql: entry.create_sql().to_string(),
580                                    },
581                                );
582                            }
583                        }
584                        Statement::CreateSchema(ast::CreateSchemaStatement { name, .. }) => {
585                            let [db_component, schema_component] = &name.0[..] else {
586                                let name =
587                                    name.0.into_iter().map(|ident| ident.to_string()).collect();
588                                item_inconsistencies.push(
589                                    ItemInconsistency::NonFullyQualifiedSchemaName {
590                                        create_sql: entry.create_sql().to_string(),
591                                        name,
592                                    },
593                                );
594                                continue;
595                            };
596                            if db_component.as_str() != &db.name
597                                || schema_component.as_str() != &schema.name.schema
598                            {
599                                item_inconsistencies.push(
600                                    ItemInconsistency::CreateSqlSchemaNameMismatch {
601                                        schema_name: vec![
602                                            db.name.clone(),
603                                            schema.name.schema.clone(),
604                                        ],
605                                        create_sql: entry.create_sql().to_string(),
606                                    },
607                                );
608                            }
609                        }
610                        Statement::CreateDatabase(ast::CreateDatabaseStatement {
611                            name, ..
612                        }) => {
613                            if db.name != name.0.as_str() {
614                                item_inconsistencies.push(
615                                    ItemInconsistency::CreateSqlDatabaseNameMismatch {
616                                        database_name: db.name.clone(),
617                                        create_sql: entry.create_sql().to_string(),
618                                    },
619                                );
620                            }
621                        }
622                        _ => (),
623                    }
624                }
625            }
626        }
627
628        if item_inconsistencies.is_empty() {
629            Ok(())
630        } else {
631            Err(item_inconsistencies)
632        }
633    }
634}
635
636#[derive(Debug, Serialize, Clone, PartialEq)]
637enum InternalFieldsInconsistency {
638    Database(String, DatabaseId),
639    AmbientSchema(String, SchemaId),
640    Cluster(String, ClusterId),
641    Role(String, RoleId),
642    SourceReferences(CatalogItemId),
643    EntryMissingGlobalIds(CatalogItemId, Vec<GlobalId>),
644    GlobalIdsMissingEntry(GlobalId, CatalogItemId),
645}
646
647#[derive(Debug, Serialize, Clone, PartialEq)]
648enum RoleInconsistency {
649    Database(DatabaseId, RoleId),
650    Schema(SchemaId, RoleId),
651    Entry(CatalogItemId, RoleId),
652    Cluster(ClusterId, RoleId),
653    ClusterReplica(ClusterId, ReplicaId, RoleId),
654    DefaultPrivilege(DefaultPrivilegeObject),
655    RoleAuth(RoleId),
656    DefaultPrivilegeItem {
657        grantor: RoleId,
658        grantee: RoleId,
659    },
660    SystemPrivilege {
661        grantor: Option<RoleId>,
662        grantee: Option<RoleId>,
663    },
664    Membership {
665        parent: Option<RoleId>,
666        grantor: Option<RoleId>,
667    },
668}
669
670#[derive(Debug, Serialize, Clone, PartialEq)]
671enum CommentInconsistency {
672    /// A comment was found for an object that no longer exists.
673    Dangling(CommentObjectId),
674    /// A comment with a column position was found on a non-relation.
675    NonRelation(CommentObjectId, usize),
676}
677
678#[derive(Debug, Serialize, Clone, PartialEq)]
679enum ObjectDependencyInconsistency {
680    /// Object A uses Object B, but Object B does not exist.
681    MissingUses {
682        object_a: CatalogItemId,
683        object_b: CatalogItemId,
684    },
685    /// Object A is used by Object B, but Object B does not exist.
686    MissingUsedBy {
687        object_a: CatalogItemId,
688        object_b: CatalogItemId,
689    },
690    /// Object A uses Object B, but Object B does not record that it is used by Object A.
691    InconsistentUsedBy {
692        object_a: CatalogItemId,
693        object_b: CatalogItemId,
694    },
695    /// Object B is used by Object A, but Object B does not record that is uses Object A.
696    InconsistentUses {
697        object_a: CatalogItemId,
698        object_b: CatalogItemId,
699    },
700}
701
702#[derive(Debug, Serialize, Clone, PartialEq)]
703enum ItemInconsistency {
704    /// The name in a `Database` `schemas_by_name` does not match the name on the `Schema` struct.
705    KeyedName {
706        db_schema_by_name: String,
707        struct_name: String,
708    },
709    /// A schema present in a `Database` `schemas_by_name` map is not in the `schema_by_id` map.
710    MissingSchema {
711        db_id: DatabaseId,
712        schema_name: String,
713    },
714    /// An item in a `Schema` `items` collection does not exist.
715    NonExistentItem {
716        db_id: DatabaseId,
717        schema_id: SchemaSpecifier,
718        item_id: CatalogItemId,
719    },
720    /// An item in the `Schema` `items` collection has a mismatched name.
721    ItemNameMismatch {
722        item_id: CatalogItemId,
723        /// Name from the `items` map.
724        map_name: String,
725        /// Name on the entry itself.
726        entry_name: QualifiedItemName,
727    },
728    /// Failed to parse the `create_sql` persisted with an item.
729    StatementParseFailure {
730        create_sql: String,
731        e: ParserStatementError,
732    },
733    /// Parsing the `create_sql` returned multiple Statements.
734    MultiCreateStatement { create_sql: String },
735    /// The name from a parsed `create_sql` statement, is not fully qualified.
736    NonFullyQualifiedItemName {
737        create_sql: String,
738        name: Vec<String>,
739    },
740    /// The name from a parsed `create_sql` statement, is not fully qualified.
741    NonFullyQualifiedSchemaName {
742        create_sql: String,
743        name: Vec<String>,
744    },
745    /// The name from a parsed `create_sql` statement, did not match that from the parent item.
746    CreateSqlItemNameMismatch {
747        item_name: Vec<String>,
748        create_sql: String,
749    },
750    /// The name from a parsed `create_sql` statement, did not match that from the parent schema.
751    CreateSqlSchemaNameMismatch {
752        schema_name: Vec<String>,
753        create_sql: String,
754    },
755    /// The name from a parsed `create_sql` statement, did not match that from the parent database.
756    CreateSqlDatabaseNameMismatch {
757        database_name: String,
758        create_sql: String,
759    },
760}