Skip to main content

mz_adapter/catalog/
consistency.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Internal consistency checks that validate invariants of [`CatalogState`].
11//!
12//! Note: the implementation of consistency checks should favor simplicity over performance, to
13//! make it as easy as possible to understand what a given check is doing.
14
15use mz_controller_types::{ClusterId, ReplicaId};
16use mz_repr::role_id::RoleId;
17use mz_repr::{CatalogItemId, GlobalId};
18use mz_sql::catalog::{CatalogItem, DefaultPrivilegeObject};
19use mz_sql::names::{
20    CommentObjectId, DatabaseId, QualifiedItemName, ResolvedDatabaseSpecifier, SchemaId,
21    SchemaSpecifier,
22};
23use mz_sql_parser::ast::{self, Statement};
24use mz_sql_parser::parser::ParserStatementError;
25use serde::Serialize;
26
27// DO NOT add any more imports from `crate` outside of `crate::catalog`.
28use super::CatalogState;
29
30#[derive(Debug, Default, Clone, Serialize, PartialEq)]
31pub struct CatalogInconsistencies {
32    /// Inconsistencies found with internal fields, if any.
33    internal_fields: Vec<InternalFieldsInconsistency>,
34    /// Inconsistencies found with roles, if any.
35    roles: Vec<RoleInconsistency>,
36    /// Inconsistencies found with comments, if any.
37    comments: Vec<CommentInconsistency>,
38    /// Inconsistencies found with object dependencies, if any.
39    object_dependencies: Vec<ObjectDependencyInconsistency>,
40    /// Inconsistencies found with items in the catalog, if any.
41    items: Vec<ItemInconsistency>,
42}
43
44impl CatalogInconsistencies {
45    pub fn is_empty(&self) -> bool {
46        let CatalogInconsistencies {
47            internal_fields,
48            roles,
49            comments,
50            object_dependencies,
51            items,
52        } = self;
53        internal_fields.is_empty()
54            && roles.is_empty()
55            && comments.is_empty()
56            && object_dependencies.is_empty()
57            && items.is_empty()
58    }
59}
60
61impl CatalogState {
62    /// Checks the [`CatalogState`] to make sure we're internally consistent.
63    pub fn check_consistency(&self) -> Result<(), Box<CatalogInconsistencies>> {
64        let mut inconsistencies = CatalogInconsistencies::default();
65
66        if let Err(internal_fields) = self.check_internal_fields() {
67            inconsistencies.internal_fields = internal_fields;
68        }
69        if let Err(roles) = self.check_roles() {
70            inconsistencies.roles = roles;
71        }
72        if let Err(comments) = self.check_comments() {
73            inconsistencies.comments = comments;
74        }
75        if let Err(dependencies) = self.check_object_dependencies() {
76            inconsistencies.object_dependencies = dependencies;
77        }
78        if let Err(items) = self.check_items() {
79            inconsistencies.items = items;
80        }
81
82        if inconsistencies.is_empty() {
83            Ok(())
84        } else {
85            Err(Box::new(inconsistencies))
86        }
87    }
88
89    /// # Invariants:
90    ///
91    /// * Any fields within [`CatalogState`] that reference another field need to be kept in sync.
92    ///
93    /// TODO(parkmycar): Check the reverse direction for these collections, e.g. all of the
94    /// `DatabaseId`s in `database_by_id` also exist in `database_by_name`.
95    fn check_internal_fields(&self) -> Result<(), Vec<InternalFieldsInconsistency>> {
96        let mut inconsistencies = Vec::new();
97        for (name, id) in &self.database_by_name {
98            if !self.database_by_id.contains_key(id) {
99                inconsistencies.push(InternalFieldsInconsistency::Database(name.clone(), *id));
100            }
101        }
102        for (name, id) in &self.ambient_schemas_by_name {
103            if !self.ambient_schemas_by_id.contains_key(id) {
104                inconsistencies.push(InternalFieldsInconsistency::AmbientSchema(
105                    name.clone(),
106                    *id,
107                ));
108            }
109        }
110        for (name, id) in &self.clusters_by_name {
111            if !self.clusters_by_id.contains_key(id) {
112                inconsistencies.push(InternalFieldsInconsistency::Cluster(name.clone(), *id));
113            }
114        }
115        for (name, role_id) in &self.roles_by_name {
116            if !self.roles_by_id.contains_key(role_id) {
117                inconsistencies.push(InternalFieldsInconsistency::Role(name.clone(), *role_id))
118            }
119        }
120
121        for (source_id, _references) in &self.source_references {
122            if !self.entry_by_id.contains_key(source_id) {
123                inconsistencies.push(InternalFieldsInconsistency::SourceReferences(*source_id));
124            }
125        }
126
127        for (item_id, entry) in &self.entry_by_id {
128            let missing_gids: Vec<_> = entry
129                .global_ids()
130                .filter(|gid| !self.entry_by_global_id.contains_key(gid))
131                .collect();
132            if !missing_gids.is_empty() {
133                inconsistencies.push(InternalFieldsInconsistency::EntryMissingGlobalIds(
134                    *item_id,
135                    missing_gids,
136                ));
137            }
138        }
139        for (gid, item_id) in &self.entry_by_global_id {
140            if !self.entry_by_id.contains_key(item_id) {
141                inconsistencies.push(InternalFieldsInconsistency::GlobalIdsMissingEntry(
142                    *gid, *item_id,
143                ));
144            }
145        }
146
147        if inconsistencies.is_empty() {
148            Ok(())
149        } else {
150            Err(inconsistencies)
151        }
152    }
153
154    /// # Invariants:
155    ///
156    /// * All RoleIds referenced from other objects must exist.
157    ///
158    fn check_roles(&self) -> Result<(), Vec<RoleInconsistency>> {
159        let mut inconsistencies = Vec::new();
160        for (database_id, database) in &self.database_by_id {
161            if !self.roles_by_id.contains_key(&database.owner_id) {
162                inconsistencies.push(RoleInconsistency::Database(*database_id, database.owner_id));
163            }
164            for (schema_id, schema) in &database.schemas_by_id {
165                if !self.roles_by_id.contains_key(&schema.owner_id) {
166                    inconsistencies.push(RoleInconsistency::Schema(*schema_id, schema.owner_id));
167                }
168            }
169        }
170        for (item_id, entry) in &self.entry_by_id {
171            if !self.roles_by_id.contains_key(entry.owner_id()) {
172                inconsistencies.push(RoleInconsistency::Entry(*item_id, entry.owner_id().clone()));
173            }
174        }
175        for (cluster_id, cluster) in &self.clusters_by_id {
176            if !self.roles_by_id.contains_key(&cluster.owner_id) {
177                inconsistencies.push(RoleInconsistency::Cluster(*cluster_id, cluster.owner_id));
178            }
179            for replica in cluster.replicas() {
180                if !self.roles_by_id.contains_key(&replica.owner_id) {
181                    inconsistencies.push(RoleInconsistency::ClusterReplica(
182                        *cluster_id,
183                        replica.replica_id,
184                        cluster.owner_id,
185                    ));
186                }
187            }
188        }
189        for (role_id, _) in &self.role_auth_by_id {
190            if !self.roles_by_id.contains_key(role_id) {
191                inconsistencies.push(RoleInconsistency::RoleAuth(role_id.clone()));
192            }
193        }
194        for (default_priv, privileges) in self.default_privileges.iter() {
195            if !self.roles_by_id.contains_key(&default_priv.role_id) {
196                inconsistencies.push(RoleInconsistency::DefaultPrivilege(default_priv.clone()));
197            }
198            for acl_item in privileges {
199                if !self.roles_by_id.contains_key(&acl_item.grantee) {
200                    inconsistencies.push(RoleInconsistency::DefaultPrivilegeItem {
201                        grantor: default_priv.role_id,
202                        grantee: acl_item.grantee,
203                    });
204                }
205            }
206        }
207        for acl in self.system_privileges.all_values() {
208            let grantor = self.roles_by_id.get(&acl.grantor);
209            let grantee = self.roles_by_id.get(&acl.grantee);
210
211            let inconsistency = match (grantor, grantee) {
212                (None, None) => RoleInconsistency::SystemPrivilege {
213                    grantor: Some(acl.grantor),
214                    grantee: Some(acl.grantee),
215                },
216                (Some(_), None) => RoleInconsistency::SystemPrivilege {
217                    grantor: None,
218                    grantee: Some(acl.grantee),
219                },
220                (None, Some(_)) => RoleInconsistency::SystemPrivilege {
221                    grantor: Some(acl.grantor),
222                    grantee: None,
223                },
224                (Some(_), Some(_)) => continue,
225            };
226            inconsistencies.push(inconsistency);
227        }
228        for role in self.roles_by_id.values() {
229            for (parent_id, grantor_id) in &role.membership.map {
230                let parent = self.roles_by_id.get(parent_id);
231                let grantor = self.roles_by_id.get(grantor_id);
232                let inconsistency = match (parent, grantor) {
233                    (None, None) => RoleInconsistency::Membership {
234                        parent: Some(*parent_id),
235                        grantor: Some(*grantor_id),
236                    },
237                    (Some(_), None) => RoleInconsistency::Membership {
238                        parent: None,
239                        grantor: Some(*grantor_id),
240                    },
241                    (None, Some(_)) => RoleInconsistency::Membership {
242                        parent: Some(*parent_id),
243                        grantor: None,
244                    },
245                    (Some(_), Some(_)) => continue,
246                };
247                inconsistencies.push(inconsistency);
248            }
249        }
250
251        if inconsistencies.is_empty() {
252            Ok(())
253        } else {
254            Err(inconsistencies)
255        }
256    }
257
258    /// # Invariants:
259    ///
260    /// * Comments should only reference existing objects.
261    /// * A comment should only have a column position if it references a relation.
262    ///
263    fn check_comments(&self) -> Result<(), Vec<CommentInconsistency>> {
264        let mut comment_inconsistencies = Vec::new();
265        for (comment_object_id, col_pos, _comment) in self.comments.iter() {
266            match comment_object_id {
267                CommentObjectId::Table(item_id)
268                | CommentObjectId::View(item_id)
269                | CommentObjectId::MaterializedView(item_id)
270                | CommentObjectId::Source(item_id)
271                | CommentObjectId::Sink(item_id)
272                | CommentObjectId::Index(item_id)
273                | CommentObjectId::Func(item_id)
274                | CommentObjectId::Connection(item_id)
275                | CommentObjectId::Type(item_id)
276                | CommentObjectId::Secret(item_id) => {
277                    let entry = self.entry_by_id.get(&item_id);
278                    match entry {
279                        None => comment_inconsistencies
280                            .push(CommentInconsistency::Dangling(comment_object_id)),
281                        Some(entry) => {
282                            // TODO: Refactor this to use if-let chains, once they're stable.
283                            #[allow(clippy::unnecessary_unwrap)]
284                            if !entry.has_columns() && col_pos.is_some() {
285                                let col_pos = col_pos.expect("checked above");
286                                comment_inconsistencies.push(CommentInconsistency::NonRelation(
287                                    comment_object_id,
288                                    col_pos,
289                                ));
290                            }
291                        }
292                    }
293                }
294                CommentObjectId::NetworkPolicy(network_policy_id) => {
295                    if !self.network_policies_by_id.contains_key(&network_policy_id) {
296                        comment_inconsistencies
297                            .push(CommentInconsistency::Dangling(comment_object_id));
298                    }
299                }
300
301                CommentObjectId::Role(role_id) => {
302                    if !self.roles_by_id.contains_key(&role_id) {
303                        comment_inconsistencies
304                            .push(CommentInconsistency::Dangling(comment_object_id));
305                    }
306                }
307                CommentObjectId::Database(database_id) => {
308                    if !self.database_by_id.contains_key(&database_id) {
309                        comment_inconsistencies
310                            .push(CommentInconsistency::Dangling(comment_object_id));
311                    }
312                }
313                CommentObjectId::Schema((database, schema)) => {
314                    match (database, schema) {
315                        (
316                            ResolvedDatabaseSpecifier::Id(database_id),
317                            SchemaSpecifier::Id(schema_id),
318                        ) => {
319                            let schema = self
320                                .database_by_id
321                                .get(&database_id)
322                                .and_then(|database| database.schemas_by_id.get(&schema_id));
323                            if schema.is_none() {
324                                comment_inconsistencies
325                                    .push(CommentInconsistency::Dangling(comment_object_id));
326                            }
327                        }
328                        (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Id(schema_id)) => {
329                            if !self.ambient_schemas_by_id.contains_key(&schema_id) {
330                                comment_inconsistencies
331                                    .push(CommentInconsistency::Dangling(comment_object_id));
332                            }
333                        }
334                        // Temporary schemas are in the ambient database.
335                        (ResolvedDatabaseSpecifier::Id(_id), SchemaSpecifier::Temporary) => (),
336                        // TODO: figure out how to check for consistency in this case.
337                        (ResolvedDatabaseSpecifier::Ambient, SchemaSpecifier::Temporary) => (),
338                    }
339                }
340                CommentObjectId::Cluster(cluster_id) => {
341                    if !self.clusters_by_id.contains_key(&cluster_id) {
342                        comment_inconsistencies
343                            .push(CommentInconsistency::Dangling(comment_object_id));
344                    }
345                }
346                CommentObjectId::ClusterReplica((cluster_id, replica_id)) => {
347                    let replica = self
348                        .clusters_by_id
349                        .get(&cluster_id)
350                        .and_then(|cluster| cluster.replica(replica_id));
351                    if replica.is_none() {
352                        comment_inconsistencies
353                            .push(CommentInconsistency::Dangling(comment_object_id));
354                    }
355                }
356            }
357        }
358
359        if comment_inconsistencies.is_empty() {
360            Ok(())
361        } else {
362            Err(comment_inconsistencies)
363        }
364    }
365
366    /// # Invariants:
367    ///
368    /// * All of the objects in the "uses" collection of a CatalogEntry, should contain said
369    ///   CatalogEntry in their own "used_by" collection.
370    /// * All of the objects in the "used_by" collection of a CatalogEntry, should contain said
371    ///   CatalogEntry in their own "uses" collection.
372    ///
373    fn check_object_dependencies(&self) -> Result<(), Vec<ObjectDependencyInconsistency>> {
374        let mut dependency_inconsistencies = vec![];
375
376        for (id, entry) in &self.entry_by_id {
377            for referenced_id in entry.references().items() {
378                let Some(referenced_entry) = self.entry_by_id.get(referenced_id) else {
379                    dependency_inconsistencies.push(ObjectDependencyInconsistency::MissingUses {
380                        object_a: *id,
381                        object_b: *referenced_id,
382                    });
383                    continue;
384                };
385                if !referenced_entry.referenced_by().contains(id) && referenced_entry.id() != *id {
386                    dependency_inconsistencies.push(
387                        ObjectDependencyInconsistency::InconsistentUsedBy {
388                            object_a: *id,
389                            object_b: *referenced_id,
390                        },
391                    );
392                }
393            }
394            for used_id in entry.uses() {
395                let Some(used_entry) = self.entry_by_id.get(&used_id) else {
396                    dependency_inconsistencies.push(ObjectDependencyInconsistency::MissingUses {
397                        object_a: *id,
398                        object_b: used_id,
399                    });
400                    continue;
401                };
402                if !used_entry.used_by().contains(id) && used_entry.id() != *id {
403                    dependency_inconsistencies.push(
404                        ObjectDependencyInconsistency::InconsistentUsedBy {
405                            object_a: *id,
406                            object_b: used_id,
407                        },
408                    );
409                }
410            }
411
412            for referenced_by in entry.referenced_by() {
413                let Some(referenced_by_entry) = self.entry_by_id.get(referenced_by) else {
414                    dependency_inconsistencies.push(ObjectDependencyInconsistency::MissingUsedBy {
415                        object_a: *id,
416                        object_b: *referenced_by,
417                    });
418                    continue;
419                };
420                if !referenced_by_entry.references().contains_item(id) {
421                    dependency_inconsistencies.push(
422                        ObjectDependencyInconsistency::InconsistentUses {
423                            object_a: *id,
424                            object_b: *referenced_by,
425                        },
426                    );
427                }
428            }
429            for used_by in entry.used_by() {
430                let Some(used_by_entry) = self.entry_by_id.get(used_by) else {
431                    dependency_inconsistencies.push(ObjectDependencyInconsistency::MissingUsedBy {
432                        object_a: *id,
433                        object_b: *used_by,
434                    });
435                    continue;
436                };
437                if !used_by_entry.uses().contains(id) {
438                    dependency_inconsistencies.push(
439                        ObjectDependencyInconsistency::InconsistentUses {
440                            object_a: *id,
441                            object_b: *used_by,
442                        },
443                    );
444                }
445            }
446        }
447
448        if dependency_inconsistencies.is_empty() {
449            Ok(())
450        } else {
451            Err(dependency_inconsistencies)
452        }
453    }
454
455    /// # Invariants
456    ///
457    /// * Every schema that exists in the `schemas_by_name` map, also exists in `schemas_by_id`.
458    /// * The name present in the `schemas_by_name` map matches the name in the associated `Schema`
459    ///   struct.
460    /// * All items that exist in a `Schema` struct, also exist in the `entries_by_id` map.
461    /// * Parsing the `create_sql` string from an `Entry` succeeds.
462    /// * The result of parsing the `create_sql` must return a single `Statement`.
463    /// * The names in the returned `Statement`, must match that of the parent struct.
464    /// * The item from the parsed `create_sql` must be fully qualified.
465    ///
466    fn check_items(&self) -> Result<(), Vec<ItemInconsistency>> {
467        let mut item_inconsistencies = vec![];
468
469        for (db_id, db) in &self.database_by_id {
470            for (schema_name, schema_id) in &db.schemas_by_name {
471                // Make sure the schema themselves are consistent.
472                let Some(schema) = db.schemas_by_id.get(schema_id) else {
473                    item_inconsistencies.push(ItemInconsistency::MissingSchema {
474                        db_id: *db_id,
475                        schema_name: schema_name.clone(),
476                    });
477                    continue;
478                };
479                if schema_name != &schema.name.schema {
480                    item_inconsistencies.push(ItemInconsistency::KeyedName {
481                        db_schema_by_name: schema_name.clone(),
482                        struct_name: schema.name.schema.clone(),
483                    });
484                }
485
486                // Make sure the items in the schema are consistent.
487                for (item_name, item_id) in &schema.items {
488                    let Some(entry) = self.entry_by_id.get(item_id) else {
489                        item_inconsistencies.push(ItemInconsistency::NonExistentItem {
490                            db_id: *db_id,
491                            schema_id: schema.id,
492                            item_id: *item_id,
493                        });
494                        continue;
495                    };
496                    if item_name != &entry.name().item {
497                        item_inconsistencies.push(ItemInconsistency::ItemNameMismatch {
498                            item_id: *item_id,
499                            map_name: item_name.clone(),
500                            entry_name: entry.name().clone(),
501                        });
502                    }
503                    let statement = match mz_sql::parse::parse(entry.create_sql()) {
504                        Ok(mut statements) if statements.len() == 1 => {
505                            let statement = statements.pop().expect("checked length");
506                            statement.ast
507                        }
508                        Ok(_) => {
509                            item_inconsistencies.push(ItemInconsistency::MultiCreateStatement {
510                                create_sql: entry.create_sql().to_string(),
511                            });
512                            continue;
513                        }
514                        Err(e) => {
515                            item_inconsistencies.push(ItemInconsistency::StatementParseFailure {
516                                create_sql: entry.create_sql().to_string(),
517                                e,
518                            });
519                            continue;
520                        }
521                    };
522                    match statement {
523                        Statement::CreateConnection(ast::CreateConnectionStatement {
524                            name,
525                            ..
526                        })
527                        | Statement::CreateWebhookSource(ast::CreateWebhookSourceStatement {
528                            name,
529                            ..
530                        })
531                        | Statement::CreateSource(ast::CreateSourceStatement { name, .. })
532                        | Statement::CreateSubsource(ast::CreateSubsourceStatement {
533                            name, ..
534                        })
535                        | Statement::CreateSink(ast::CreateSinkStatement {
536                            name: Some(name),
537                            ..
538                        })
539                        | Statement::CreateView(ast::CreateViewStatement {
540                            definition: ast::ViewDefinition { name, .. },
541                            ..
542                        })
543                        | Statement::CreateMaterializedView(
544                            ast::CreateMaterializedViewStatement { name, .. },
545                        )
546                        | Statement::CreateTable(ast::CreateTableStatement { name, .. })
547                        | Statement::CreateType(ast::CreateTypeStatement { name, .. })
548                        | Statement::CreateSecret(ast::CreateSecretStatement { name, .. }) => {
549                            let [db_component, schema_component, item_component] = &name.0[..]
550                            else {
551                                let name =
552                                    name.0.into_iter().map(|ident| ident.to_string()).collect();
553                                item_inconsistencies.push(
554                                    ItemInconsistency::NonFullyQualifiedItemName {
555                                        create_sql: entry.create_sql().to_string(),
556                                        name,
557                                    },
558                                );
559                                continue;
560                            };
561                            if db_component.as_str() != &db.name
562                                || schema_component.as_str() != &schema.name.schema
563                                || item_component.as_str() != &entry.name().item
564                            {
565                                item_inconsistencies.push(
566                                    ItemInconsistency::CreateSqlItemNameMismatch {
567                                        item_name: vec![
568                                            db.name.clone(),
569                                            schema.name.schema.clone(),
570                                            entry.name().item.clone(),
571                                        ],
572                                        create_sql: entry.create_sql().to_string(),
573                                    },
574                                );
575                            }
576                        }
577                        Statement::CreateSchema(ast::CreateSchemaStatement { name, .. }) => {
578                            let [db_component, schema_component] = &name.0[..] else {
579                                let name =
580                                    name.0.into_iter().map(|ident| ident.to_string()).collect();
581                                item_inconsistencies.push(
582                                    ItemInconsistency::NonFullyQualifiedSchemaName {
583                                        create_sql: entry.create_sql().to_string(),
584                                        name,
585                                    },
586                                );
587                                continue;
588                            };
589                            if db_component.as_str() != &db.name
590                                || schema_component.as_str() != &schema.name.schema
591                            {
592                                item_inconsistencies.push(
593                                    ItemInconsistency::CreateSqlSchemaNameMismatch {
594                                        schema_name: vec![
595                                            db.name.clone(),
596                                            schema.name.schema.clone(),
597                                        ],
598                                        create_sql: entry.create_sql().to_string(),
599                                    },
600                                );
601                            }
602                        }
603                        Statement::CreateDatabase(ast::CreateDatabaseStatement {
604                            name, ..
605                        }) => {
606                            if db.name != name.0.as_str() {
607                                item_inconsistencies.push(
608                                    ItemInconsistency::CreateSqlDatabaseNameMismatch {
609                                        database_name: db.name.clone(),
610                                        create_sql: entry.create_sql().to_string(),
611                                    },
612                                );
613                            }
614                        }
615                        _ => (),
616                    }
617                }
618            }
619        }
620
621        if item_inconsistencies.is_empty() {
622            Ok(())
623        } else {
624            Err(item_inconsistencies)
625        }
626    }
627}
628
629#[derive(Debug, Serialize, Clone, PartialEq)]
630enum InternalFieldsInconsistency {
631    Database(String, DatabaseId),
632    AmbientSchema(String, SchemaId),
633    Cluster(String, ClusterId),
634    Role(String, RoleId),
635    SourceReferences(CatalogItemId),
636    EntryMissingGlobalIds(CatalogItemId, Vec<GlobalId>),
637    GlobalIdsMissingEntry(GlobalId, CatalogItemId),
638}
639
640#[derive(Debug, Serialize, Clone, PartialEq)]
641enum RoleInconsistency {
642    Database(DatabaseId, RoleId),
643    Schema(SchemaId, RoleId),
644    Entry(CatalogItemId, RoleId),
645    Cluster(ClusterId, RoleId),
646    ClusterReplica(ClusterId, ReplicaId, RoleId),
647    DefaultPrivilege(DefaultPrivilegeObject),
648    RoleAuth(RoleId),
649    DefaultPrivilegeItem {
650        grantor: RoleId,
651        grantee: RoleId,
652    },
653    SystemPrivilege {
654        grantor: Option<RoleId>,
655        grantee: Option<RoleId>,
656    },
657    Membership {
658        parent: Option<RoleId>,
659        grantor: Option<RoleId>,
660    },
661}
662
663#[derive(Debug, Serialize, Clone, PartialEq)]
664enum CommentInconsistency {
665    /// A comment was found for an object that no longer exists.
666    Dangling(CommentObjectId),
667    /// A comment with a column position was found on a non-relation.
668    NonRelation(CommentObjectId, usize),
669}
670
671#[derive(Debug, Serialize, Clone, PartialEq)]
672enum ObjectDependencyInconsistency {
673    /// Object A uses Object B, but Object B does not exist.
674    MissingUses {
675        object_a: CatalogItemId,
676        object_b: CatalogItemId,
677    },
678    /// Object A is used by Object B, but Object B does not exist.
679    MissingUsedBy {
680        object_a: CatalogItemId,
681        object_b: CatalogItemId,
682    },
683    /// Object A uses Object B, but Object B does not record that it is used by Object A.
684    InconsistentUsedBy {
685        object_a: CatalogItemId,
686        object_b: CatalogItemId,
687    },
688    /// Object B is used by Object A, but Object B does not record that is uses Object A.
689    InconsistentUses {
690        object_a: CatalogItemId,
691        object_b: CatalogItemId,
692    },
693}
694
695#[derive(Debug, Serialize, Clone, PartialEq)]
696enum ItemInconsistency {
697    /// The name in a `Database` `schemas_by_name` does not match the name on the `Schema` struct.
698    KeyedName {
699        db_schema_by_name: String,
700        struct_name: String,
701    },
702    /// A schema present in a `Database` `schemas_by_name` map is not in the `schema_by_id` map.
703    MissingSchema {
704        db_id: DatabaseId,
705        schema_name: String,
706    },
707    /// An item in a `Schema` `items` collection does not exist.
708    NonExistentItem {
709        db_id: DatabaseId,
710        schema_id: SchemaSpecifier,
711        item_id: CatalogItemId,
712    },
713    /// An item in the `Schema` `items` collection has a mismatched name.
714    ItemNameMismatch {
715        item_id: CatalogItemId,
716        /// Name from the `items` map.
717        map_name: String,
718        /// Name on the entry itself.
719        entry_name: QualifiedItemName,
720    },
721    /// Failed to parse the `create_sql` persisted with an item.
722    StatementParseFailure {
723        create_sql: String,
724        e: ParserStatementError,
725    },
726    /// Parsing the `create_sql` returned multiple Statements.
727    MultiCreateStatement { create_sql: String },
728    /// The name from a parsed `create_sql` statement, is not fully qualified.
729    NonFullyQualifiedItemName {
730        create_sql: String,
731        name: Vec<String>,
732    },
733    /// The name from a parsed `create_sql` statement, is not fully qualified.
734    NonFullyQualifiedSchemaName {
735        create_sql: String,
736        name: Vec<String>,
737    },
738    /// The name from a parsed `create_sql` statement, did not match that from the parent item.
739    CreateSqlItemNameMismatch {
740        item_name: Vec<String>,
741        create_sql: String,
742    },
743    /// The name from a parsed `create_sql` statement, did not match that from the parent schema.
744    CreateSqlSchemaNameMismatch {
745        schema_name: Vec<String>,
746        create_sql: String,
747    },
748    /// The name from a parsed `create_sql` statement, did not match that from the parent database.
749    CreateSqlDatabaseNameMismatch {
750        database_name: String,
751        create_sql: String,
752    },
753}