use super::Coordinator;
use crate::catalog::consistency::CatalogInconsistencies;
use mz_adapter_types::connection::ConnectionIdType;
use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Source};
use mz_ore::instrument;
use mz_repr::GlobalId;
use serde::Serialize;
#[derive(Debug, Default, Serialize, PartialEq)]
pub struct CoordinatorInconsistencies {
catalog_inconsistencies: Box<CatalogInconsistencies>,
read_capabilities: Vec<ReadCapabilitiesInconsistency>,
active_webhooks: Vec<ActiveWebhookInconsistency>,
}
impl CoordinatorInconsistencies {
pub fn is_empty(&self) -> bool {
self.catalog_inconsistencies.is_empty()
&& self.read_capabilities.is_empty()
&& self.active_webhooks.is_empty()
}
}
impl Coordinator {
#[instrument(name = "coord::check_consistency")]
pub fn check_consistency(&self) -> Result<(), CoordinatorInconsistencies> {
let mut inconsistencies = CoordinatorInconsistencies::default();
if let Err(catalog_inconsistencies) = self.catalog().state().check_consistency() {
inconsistencies.catalog_inconsistencies = catalog_inconsistencies;
}
if let Err(read_capabilities) = self.check_read_capabilities() {
inconsistencies.read_capabilities = read_capabilities;
}
if let Err(active_webhooks) = self.check_active_webhooks() {
inconsistencies.active_webhooks = active_webhooks;
}
if inconsistencies.is_empty() {
Ok(())
} else {
Err(inconsistencies)
}
}
fn check_read_capabilities(&self) -> Result<(), Vec<ReadCapabilitiesInconsistency>> {
let mut read_capabilities_inconsistencies = Vec::new();
for (gid, _) in &self.storage_read_capabilities {
if self.catalog().try_get_entry(gid).is_none() {
read_capabilities_inconsistencies
.push(ReadCapabilitiesInconsistency::Storage(gid.clone()));
}
}
for (gid, _) in &self.compute_read_capabilities {
if !gid.is_transient() && self.catalog().try_get_entry(gid).is_none() {
read_capabilities_inconsistencies
.push(ReadCapabilitiesInconsistency::Compute(gid.clone()));
}
}
for (conn_id, _) in &self.txn_read_holds {
if !self.active_conns.contains_key(conn_id) {
read_capabilities_inconsistencies.push(ReadCapabilitiesInconsistency::Transaction(
conn_id.unhandled(),
));
}
}
if read_capabilities_inconsistencies.is_empty() {
Ok(())
} else {
Err(read_capabilities_inconsistencies)
}
}
fn check_active_webhooks(&self) -> Result<(), Vec<ActiveWebhookInconsistency>> {
let mut inconsistencies = vec![];
for (id, _) in &self.active_webhooks {
let is_webhook = self
.catalog()
.try_get_entry(id)
.map(|entry| entry.item())
.and_then(|item| {
let CatalogItem::Source(Source { data_source, .. }) = &item else {
return None;
};
Some(matches!(data_source, DataSourceDesc::Webhook { .. }))
})
.unwrap_or(false);
if !is_webhook {
inconsistencies.push(ActiveWebhookInconsistency::NonExistentWebhook(*id));
}
}
if inconsistencies.is_empty() {
Ok(())
} else {
Err(inconsistencies)
}
}
}
#[derive(Debug, Serialize, PartialEq, Eq)]
enum ReadCapabilitiesInconsistency {
Storage(GlobalId),
Compute(GlobalId),
Transaction(ConnectionIdType),
}
#[derive(Debug, Serialize, PartialEq, Eq)]
enum ActiveWebhookInconsistency {
NonExistentWebhook(GlobalId),
}