mz_adapter/coord/
consistency.rs
1use super::Coordinator;
13use crate::catalog::consistency::CatalogInconsistencies;
14use mz_adapter_types::connection::ConnectionIdType;
15use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Source, Table, TableDataSource};
16use mz_controller_types::{ClusterId, ReplicaId};
17use mz_ore::instrument;
18use mz_repr::{CatalogItemId, GlobalId};
19use mz_sql::catalog::{CatalogCluster, CatalogClusterReplica};
20use serde::Serialize;
21
22#[derive(Debug, Default, Serialize, PartialEq)]
23pub struct CoordinatorInconsistencies {
24 catalog_inconsistencies: Box<CatalogInconsistencies>,
26 read_holds: Vec<ReadHoldsInconsistency>,
28 active_webhooks: Vec<ActiveWebhookInconsistency>,
30 cluster_statuses: Vec<ClusterStatusInconsistency>,
32}
33
34impl CoordinatorInconsistencies {
35 pub fn is_empty(&self) -> bool {
36 self.catalog_inconsistencies.is_empty()
37 && self.read_holds.is_empty()
38 && self.active_webhooks.is_empty()
39 && self.cluster_statuses.is_empty()
40 }
41}
42
43impl Coordinator {
44 #[instrument(name = "coord::check_consistency")]
46 pub fn check_consistency(&self) -> Result<(), CoordinatorInconsistencies> {
47 let mut inconsistencies = CoordinatorInconsistencies::default();
48
49 if let Err(catalog_inconsistencies) = self.catalog().state().check_consistency() {
50 inconsistencies.catalog_inconsistencies = catalog_inconsistencies;
51 }
52
53 if let Err(read_holds) = self.check_read_holds() {
54 inconsistencies.read_holds = read_holds;
55 }
56
57 if let Err(active_webhooks) = self.check_active_webhooks() {
58 inconsistencies.active_webhooks = active_webhooks;
59 }
60
61 if let Err(cluster_statuses) = self.check_cluster_statuses() {
62 inconsistencies.cluster_statuses = cluster_statuses;
63 }
64
65 if inconsistencies.is_empty() {
66 Ok(())
67 } else {
68 Err(inconsistencies)
69 }
70 }
71
72 fn check_read_holds(&self) -> Result<(), Vec<ReadHoldsInconsistency>> {
77 let mut inconsistencies = Vec::new();
78
79 for timeline in self.global_timelines.values() {
80 for id in timeline.read_holds.storage_ids() {
81 if self.catalog().try_get_entry_by_global_id(&id).is_none() {
82 inconsistencies.push(ReadHoldsInconsistency::Storage(id));
83 }
84 }
85 for (cluster_id, id) in timeline.read_holds.compute_ids() {
86 if self.catalog().try_get_cluster(cluster_id).is_none() {
87 inconsistencies.push(ReadHoldsInconsistency::Cluster(cluster_id));
88 }
89 if !id.is_transient() && self.catalog().try_get_entry_by_global_id(&id).is_none() {
90 inconsistencies.push(ReadHoldsInconsistency::Compute(id));
91 }
92 }
93 }
94
95 for conn_id in self.txn_read_holds.keys() {
96 if !self.active_conns.contains_key(conn_id) {
97 inconsistencies.push(ReadHoldsInconsistency::Transaction(conn_id.unhandled()));
98 }
99 }
100
101 if inconsistencies.is_empty() {
102 Ok(())
103 } else {
104 Err(inconsistencies)
105 }
106 }
107
108 fn check_active_webhooks(&self) -> Result<(), Vec<ActiveWebhookInconsistency>> {
113 let mut inconsistencies = vec![];
114 for (id, _) in &self.active_webhooks {
115 let is_webhook = self
116 .catalog()
117 .try_get_entry(id)
118 .map(|entry| entry.item())
119 .and_then(|item| {
120 let data_source = match &item {
121 CatalogItem::Source(Source { data_source, .. }) => data_source,
122 CatalogItem::Table(Table {
123 data_source: TableDataSource::DataSource { desc, .. },
124 ..
125 }) => desc,
126 _ => return None,
127 };
128 Some(matches!(data_source, DataSourceDesc::Webhook { .. }))
129 })
130 .unwrap_or(false);
131 if !is_webhook {
132 inconsistencies.push(ActiveWebhookInconsistency::NonExistentWebhook(*id));
133 }
134 }
135
136 if inconsistencies.is_empty() {
137 Ok(())
138 } else {
139 Err(inconsistencies)
140 }
141 }
142
143 fn check_cluster_statuses(&self) -> Result<(), Vec<ClusterStatusInconsistency>> {
149 let mut inconsistencies = vec![];
150 for (cluster_id, replica_status) in &self.cluster_replica_statuses.0 {
151 if self.catalog().try_get_cluster(*cluster_id).is_none() {
152 inconsistencies.push(ClusterStatusInconsistency::NonExistentCluster(*cluster_id));
153 }
154 for replica_id in replica_status.keys() {
155 if self
156 .catalog()
157 .try_get_cluster_replica(*cluster_id, *replica_id)
158 .is_none()
159 {
160 inconsistencies.push(ClusterStatusInconsistency::NonExistentReplica(
161 *cluster_id,
162 *replica_id,
163 ));
164 }
165 }
166 }
167 for cluster in self.catalog().clusters() {
168 if let Some(cluster_statuses) = self.cluster_replica_statuses.0.get(&cluster.id()) {
169 for replica in cluster.replicas() {
170 if !cluster_statuses.contains_key(&replica.replica_id()) {
171 inconsistencies.push(ClusterStatusInconsistency::NonExistentReplicaStatus(
172 cluster.name.clone(),
173 replica.name.clone(),
174 cluster.id(),
175 replica.replica_id(),
176 ));
177 }
178 }
179 } else {
180 inconsistencies.push(ClusterStatusInconsistency::NonExistentClusterStatus(
181 cluster.name.clone(),
182 cluster.id(),
183 ));
184 }
185 }
186 if inconsistencies.is_empty() {
187 Ok(())
188 } else {
189 Err(inconsistencies)
190 }
191 }
192}
193
194#[derive(Debug, Serialize, PartialEq, Eq)]
195enum ReadHoldsInconsistency {
196 Storage(GlobalId),
197 Compute(GlobalId),
198 Cluster(ClusterId),
199 Transaction(ConnectionIdType),
200}
201
202#[derive(Debug, Serialize, PartialEq, Eq)]
203enum ActiveWebhookInconsistency {
204 NonExistentWebhook(CatalogItemId),
205}
206
207#[derive(Debug, Serialize, PartialEq, Eq)]
208enum ClusterStatusInconsistency {
209 NonExistentCluster(ClusterId),
210 NonExistentReplica(ClusterId, ReplicaId),
211 NonExistentClusterStatus(String, ClusterId),
212 NonExistentReplicaStatus(String, String, ClusterId, ReplicaId),
213}