mz_adapter/coord/
consistency.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Internal consistency checks that validate invariants of [`Coordinator`].
11
12use super::Coordinator;
13use crate::catalog::consistency::CatalogInconsistencies;
14use mz_adapter_types::connection::ConnectionIdType;
15use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Source, Table, TableDataSource};
16use mz_controller_types::{ClusterId, ReplicaId};
17use mz_ore::instrument;
18use mz_repr::{CatalogItemId, GlobalId};
19use mz_sql::catalog::{CatalogCluster, CatalogClusterReplica};
20use serde::Serialize;
21
22#[derive(Debug, Default, Serialize, PartialEq)]
23pub struct CoordinatorInconsistencies {
24    /// Inconsistencies found in the catalog.
25    catalog_inconsistencies: Box<CatalogInconsistencies>,
26    /// Inconsistencies found in read holds.
27    read_holds: Vec<ReadHoldsInconsistency>,
28    /// Inconsistencies found with our map of active webhooks.
29    active_webhooks: Vec<ActiveWebhookInconsistency>,
30    /// Inconsistencies found with our map of cluster statuses.
31    cluster_statuses: Vec<ClusterStatusInconsistency>,
32}
33
34impl CoordinatorInconsistencies {
35    pub fn is_empty(&self) -> bool {
36        self.catalog_inconsistencies.is_empty()
37            && self.read_holds.is_empty()
38            && self.active_webhooks.is_empty()
39            && self.cluster_statuses.is_empty()
40    }
41}
42
43impl Coordinator {
44    /// Checks the [`Coordinator`] to make sure we're internally consistent.
45    #[instrument(name = "coord::check_consistency")]
46    pub fn check_consistency(&self) -> Result<(), CoordinatorInconsistencies> {
47        let mut inconsistencies = CoordinatorInconsistencies::default();
48
49        if let Err(catalog_inconsistencies) = self.catalog().state().check_consistency() {
50            inconsistencies.catalog_inconsistencies = catalog_inconsistencies;
51        }
52
53        if let Err(read_holds) = self.check_read_holds() {
54            inconsistencies.read_holds = read_holds;
55        }
56
57        if let Err(active_webhooks) = self.check_active_webhooks() {
58            inconsistencies.active_webhooks = active_webhooks;
59        }
60
61        if let Err(cluster_statuses) = self.check_cluster_statuses() {
62            inconsistencies.cluster_statuses = cluster_statuses;
63        }
64
65        if inconsistencies.is_empty() {
66            Ok(())
67        } else {
68            Err(inconsistencies)
69        }
70    }
71
72    /// # Invariants:
73    ///
74    /// * Read holds should reference known objects.
75    ///
76    fn check_read_holds(&self) -> Result<(), Vec<ReadHoldsInconsistency>> {
77        let mut inconsistencies = Vec::new();
78
79        for timeline in self.global_timelines.values() {
80            for id in timeline.read_holds.storage_ids() {
81                if self.catalog().try_get_entry_by_global_id(&id).is_none() {
82                    inconsistencies.push(ReadHoldsInconsistency::Storage(id));
83                }
84            }
85            for (cluster_id, id) in timeline.read_holds.compute_ids() {
86                if self.catalog().try_get_cluster(cluster_id).is_none() {
87                    inconsistencies.push(ReadHoldsInconsistency::Cluster(cluster_id));
88                }
89                if !id.is_transient() && self.catalog().try_get_entry_by_global_id(&id).is_none() {
90                    inconsistencies.push(ReadHoldsInconsistency::Compute(id));
91                }
92            }
93        }
94
95        for conn_id in self.txn_read_holds.keys() {
96            if !self.active_conns.contains_key(conn_id) {
97                inconsistencies.push(ReadHoldsInconsistency::Transaction(conn_id.unhandled()));
98            }
99        }
100
101        if inconsistencies.is_empty() {
102            Ok(())
103        } else {
104            Err(inconsistencies)
105        }
106    }
107
108    /// # Invariants
109    ///
110    /// * All [`GlobalId`]s in the `active_webhooks` map should reference known webhook sources.
111    ///
112    fn check_active_webhooks(&self) -> Result<(), Vec<ActiveWebhookInconsistency>> {
113        let mut inconsistencies = vec![];
114        for (id, _) in &self.active_webhooks {
115            let is_webhook = self
116                .catalog()
117                .try_get_entry(id)
118                .map(|entry| entry.item())
119                .and_then(|item| {
120                    let data_source = match &item {
121                        CatalogItem::Source(Source { data_source, .. }) => data_source,
122                        CatalogItem::Table(Table {
123                            data_source: TableDataSource::DataSource { desc, .. },
124                            ..
125                        }) => desc,
126                        _ => return None,
127                    };
128                    Some(matches!(data_source, DataSourceDesc::Webhook { .. }))
129                })
130                .unwrap_or(false);
131            if !is_webhook {
132                inconsistencies.push(ActiveWebhookInconsistency::NonExistentWebhook(*id));
133            }
134        }
135
136        if inconsistencies.is_empty() {
137            Ok(())
138        } else {
139            Err(inconsistencies)
140        }
141    }
142
143    /// # Invariants
144    ///
145    /// * All [`ClusterId`]s in the `cluster_replica_statuses` map should reference known clusters.
146    /// * All [`ReplicaId`]s in the `cluster_replica_statuses` map should reference known cluster
147    /// replicas.
148    fn check_cluster_statuses(&self) -> Result<(), Vec<ClusterStatusInconsistency>> {
149        let mut inconsistencies = vec![];
150        for (cluster_id, replica_status) in &self.cluster_replica_statuses.0 {
151            if self.catalog().try_get_cluster(*cluster_id).is_none() {
152                inconsistencies.push(ClusterStatusInconsistency::NonExistentCluster(*cluster_id));
153            }
154            for replica_id in replica_status.keys() {
155                if self
156                    .catalog()
157                    .try_get_cluster_replica(*cluster_id, *replica_id)
158                    .is_none()
159                {
160                    inconsistencies.push(ClusterStatusInconsistency::NonExistentReplica(
161                        *cluster_id,
162                        *replica_id,
163                    ));
164                }
165            }
166        }
167        for cluster in self.catalog().clusters() {
168            if let Some(cluster_statuses) = self.cluster_replica_statuses.0.get(&cluster.id()) {
169                for replica in cluster.replicas() {
170                    if !cluster_statuses.contains_key(&replica.replica_id()) {
171                        inconsistencies.push(ClusterStatusInconsistency::NonExistentReplicaStatus(
172                            cluster.name.clone(),
173                            replica.name.clone(),
174                            cluster.id(),
175                            replica.replica_id(),
176                        ));
177                    }
178                }
179            } else {
180                inconsistencies.push(ClusterStatusInconsistency::NonExistentClusterStatus(
181                    cluster.name.clone(),
182                    cluster.id(),
183                ));
184            }
185        }
186        if inconsistencies.is_empty() {
187            Ok(())
188        } else {
189            Err(inconsistencies)
190        }
191    }
192}
193
194#[derive(Debug, Serialize, PartialEq, Eq)]
195enum ReadHoldsInconsistency {
196    Storage(GlobalId),
197    Compute(GlobalId),
198    Cluster(ClusterId),
199    Transaction(ConnectionIdType),
200}
201
202#[derive(Debug, Serialize, PartialEq, Eq)]
203enum ActiveWebhookInconsistency {
204    NonExistentWebhook(CatalogItemId),
205}
206
207#[derive(Debug, Serialize, PartialEq, Eq)]
208enum ClusterStatusInconsistency {
209    NonExistentCluster(ClusterId),
210    NonExistentReplica(ClusterId, ReplicaId),
211    NonExistentClusterStatus(String, ClusterId),
212    NonExistentReplicaStatus(String, String, ClusterId, ReplicaId),
213}