use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Write;
use std::io::Write as _;
use std::str::FromStr;
use std::time::Duration;
use anyhow::{anyhow, bail, Context};
use mz_ore::retry::{Retry, RetryResult};
use mz_persist_client::{PersistLocation, ShardId};
use reqwest::StatusCode;
use serde::{Deserialize, Serialize};
use crate::action::{ControlFlow, State};
use crate::parser::BuiltinCommand;
#[derive(clap::ValueEnum, Default, Debug, Copy, Clone, PartialEq, Eq)]
pub enum Level {
#[default]
File,
Statement,
Disable,
}
impl FromStr for Level {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"file" => Ok(Level::File),
"statement" => Ok(Level::Statement),
"disable" => Ok(Level::Disable),
s => Err(format!("Unknown consistency check level: {s}")),
}
}
}
pub fn skip_consistency_checks(
mut cmd: BuiltinCommand,
state: &mut State,
) -> Result<ControlFlow, anyhow::Error> {
let reason = cmd
.args
.string("reason")
.context("must provide reason for skipping")?;
tracing::info!(reason, "Skipping consistency checks as requested.");
state.consistency_checks_adhoc_skip = true;
Ok(ControlFlow::Continue)
}
pub async fn run_consistency_checks(state: &State) -> Result<ControlFlow, anyhow::Error> {
if state.consistency_checks_adhoc_skip {
return Ok(ControlFlow::Continue);
}
let coordinator = check_coordinator(state).await.context("coordinator");
let catalog_state = check_catalog_state(state).await.context("catalog state");
let mut msg = String::new();
if let Err(e) = coordinator {
writeln!(&mut msg, "coordinator inconsistency: {e:?}")?;
}
if let Err(e) = catalog_state {
writeln!(&mut msg, "catalog inconsistency: {e:?}")?;
}
if msg.is_empty() {
Ok(ControlFlow::Continue)
} else {
Err(anyhow!("{msg}"))
}
}
pub async fn run_check_shard_tombstoned(
mut cmd: BuiltinCommand,
state: &State,
) -> Result<ControlFlow, anyhow::Error> {
let shard_id = cmd.args.string("shard-id")?;
check_shard_tombstoned(state, &shard_id).await?;
Ok(ControlFlow::Continue)
}
async fn check_coordinator(state: &State) -> Result<(), anyhow::Error> {
let response = reqwest::get(&format!(
"http://{}/api/coordinator/dump",
state.materialize.internal_http_addr
))
.await?;
if !response.status().is_success() && response.status() != StatusCode::NOT_FOUND {
let response: Result<serde_json::Value, _> = response.json().await;
bail!("Coordinator failed to dump state: {:?}", response);
}
let response = Retry::default()
.max_duration(Duration::from_secs(2))
.retry_async(|_| async {
reqwest::get(&format!(
"http://{}/api/coordinator/check",
state.materialize.internal_http_addr,
))
.await
})
.await
.context("querying coordinator")?;
if response.status() == StatusCode::NOT_FOUND {
bail!("Coordinator consistency check not available");
}
let inconsistencies: serde_json::Value =
response.json().await.context("deserialize response")?;
match inconsistencies {
serde_json::Value::String(x) if x.is_empty() => Ok(()),
other => Err(anyhow!("coordinator inconsistencies! {other:?}")),
}
}
async fn check_catalog_state(state: &State) -> Result<(), anyhow::Error> {
#[derive(Debug, Deserialize)]
struct StorageMetadata {
unfinalized_shards: Option<BTreeSet<String>>,
}
#[derive(Debug, Deserialize)]
struct CatalogDump {
system_parameter_defaults: Option<BTreeMap<String, String>>,
storage_metadata: Option<StorageMetadata>,
}
let memory_catalog = reqwest::get(&format!(
"http://{}/api/catalog/dump",
state.materialize.internal_http_addr,
))
.await
.context("GET catalog")?
.text()
.await
.context("deserialize catalog")?;
let dump: CatalogDump = serde_json::from_str(&memory_catalog).context("decoding catalog")?;
let Some(system_parameter_defaults) = dump.system_parameter_defaults else {
tracing::warn!(
"Missing system_parameter_defaults in memory catalog state, skipping consistency check"
);
return Ok(());
};
let unfinalized_shards = dump
.storage_metadata
.and_then(|storage_metadata| storage_metadata.unfinalized_shards);
let version: semver::Version = state.build_info.version.parse().expect("invalid version");
let maybe_disk_catalog = state
.with_catalog_copy(system_parameter_defaults, version, |catalog| {
catalog.state().clone()
})
.await
.map_err(|e| anyhow!("failed to read on-disk catalog state: {e}"))?
.map(|catalog| {
catalog
.dump(unfinalized_shards)
.expect("state must be dumpable")
});
let Some(disk_catalog) = maybe_disk_catalog else {
tracing::warn!("No Catalog state on disk, skipping consistency check");
return Ok(());
};
if disk_catalog != memory_catalog {
let diff = similar::TextDiff::from_lines(&memory_catalog, &disk_catalog)
.unified_diff()
.context_radius(50)
.to_string()
.lines()
.take(200)
.collect::<Vec<_>>()
.join("\n");
bail!("the in-memory state of the catalog does not match its on-disk state:\n{diff}");
}
Ok(())
}
async fn check_shard_tombstoned(state: &State, shard_id: &str) -> Result<(), anyhow::Error> {
println!("$ check-shard-tombstoned {shard_id}");
let (Some(consensus_uri), Some(blob_uri)) =
(&state.persist_consensus_url, &state.persist_blob_url)
else {
tracing::warn!("Persist consensus or blob URL not known");
return Ok(());
};
let location = PersistLocation {
blob_uri: blob_uri.clone(),
consensus_uri: consensus_uri.clone(),
};
let client = state
.persist_clients
.open(location)
.await
.context("openning persist client")?;
let shard_id = ShardId::from_str(shard_id).map_err(|s| anyhow!("invalid ShardId: {s}"))?;
let (_client, result) = Retry::default()
.max_duration(state.timeout)
.retry_async_with_state(client, |retry_state, client| async move {
let inspect_state = client
.inspect_shard::<mz_repr::Timestamp>(&shard_id)
.await
.context("inspecting shard")
.and_then(|state| serde_json::to_value(state).context("to json"))
.and_then(|state| {
serde_json::from_value::<ShardState>(state).context("to shard state")
});
let result = match inspect_state {
Ok(state) if state.is_tombstone() => RetryResult::Ok(()),
Ok(state) => {
if retry_state.i == 0 {
print!("shard isn't tombstoned; sleeping to see if it gets cleaned up.");
}
if let Some(backoff) = retry_state.next_backoff {
if !backoff.is_zero() {
print!(" {:.0?}", backoff);
}
}
std::io::stdout().flush().expect("flushing stdout");
RetryResult::RetryableErr(anyhow!("non-tombstone state: {state:?}"))
}
Result::Err(e) => RetryResult::FatalErr(e),
};
(client, result)
})
.await;
result
}
#[derive(Debug, Serialize, Deserialize)]
struct ShardState {
leased_readers: BTreeMap<String, serde_json::Value>,
critical_readers: BTreeMap<String, serde_json::Value>,
writers: BTreeMap<String, serde_json::Value>,
since: Vec<mz_repr::Timestamp>,
upper: Vec<mz_repr::Timestamp>,
}
impl ShardState {
fn is_tombstone(&self) -> bool {
self.upper.is_empty()
&& self.since.is_empty()
&& self.writers.is_empty()
&& self.leased_readers.is_empty()
&& self.critical_readers.is_empty()
}
}