mz_testdrive/action/
consistency.rs1use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Write;
12use std::io::Write as _;
13use std::str::FromStr;
14use std::time::Duration;
15
16use anyhow::{Context, anyhow, bail};
17use mz_ore::retry::{Retry, RetryResult};
18use mz_persist_client::{PersistLocation, ShardId};
19use reqwest::StatusCode;
20use serde::{Deserialize, Serialize};
21
22use crate::action::{ControlFlow, State};
23use crate::parser::BuiltinCommand;
24
25#[derive(clap::ValueEnum, Default, Debug, Copy, Clone, PartialEq, Eq)]
27pub enum Level {
28    #[default]
30    File,
31    Statement,
33    Disable,
35}
36
37impl FromStr for Level {
38    type Err = String;
39
40    fn from_str(s: &str) -> Result<Self, Self::Err> {
41        match s {
42            "file" => Ok(Level::File),
43            "statement" => Ok(Level::Statement),
44            "disable" => Ok(Level::Disable),
45            s => Err(format!("Unknown consistency check level: {s}")),
46        }
47    }
48}
49
50pub fn skip_consistency_checks(
52    mut cmd: BuiltinCommand,
53    state: &mut State,
54) -> Result<ControlFlow, anyhow::Error> {
55    let reason = cmd
56        .args
57        .string("reason")
58        .context("must provide reason for skipping")?;
59    tracing::info!(reason, "Skipping consistency checks as requested.");
60
61    state.consistency_checks_adhoc_skip = true;
62    Ok(ControlFlow::Continue)
63}
64
65pub async fn run_consistency_checks(state: &State) -> Result<ControlFlow, anyhow::Error> {
68    if state.consistency_checks_adhoc_skip {
70        return Ok(ControlFlow::Continue);
71    }
72
73    let coordinator = check_coordinator(state).await.context("coordinator");
74    let catalog_state = check_catalog_state(state).await.context("catalog state");
75    let mut msg = String::new();
80    if let Err(e) = coordinator {
81        writeln!(&mut msg, "coordinator inconsistency: {e:?}")?;
82    }
83    if let Err(e) = catalog_state {
84        writeln!(&mut msg, "catalog inconsistency: {e:?}")?;
85    }
86
87    if msg.is_empty() {
88        Ok(ControlFlow::Continue)
89    } else {
90        Err(anyhow!("{msg}"))
91    }
92}
93
94pub async fn run_check_shard_tombstone(
98    mut cmd: BuiltinCommand,
99    state: &State,
100) -> Result<ControlFlow, anyhow::Error> {
101    let shard_id = cmd.args.string("shard-id")?;
102    check_shard_tombstone(state, &shard_id).await?;
103    Ok(ControlFlow::Continue)
104}
105
106async fn check_coordinator(state: &State) -> Result<(), anyhow::Error> {
108    let response = reqwest::get(&format!(
110        "http://{}/api/coordinator/dump",
111        state.materialize.internal_http_addr
112    ))
113    .await?;
114    if !response.status().is_success() && response.status() != StatusCode::NOT_FOUND {
116        let response: Result<serde_json::Value, _> = response.json().await;
117        bail!("Coordinator failed to dump state: {:?}", response);
118    }
119
120    let response = Retry::default()
122        .max_duration(Duration::from_secs(2))
123        .retry_async(|_| async {
124            reqwest::get(&format!(
125                "http://{}/api/coordinator/check",
126                state.materialize.internal_http_addr,
127            ))
128            .await
129        })
130        .await
131        .context("querying coordinator")?;
132    if response.status() == StatusCode::NOT_FOUND {
133        bail!("Coordinator consistency check not available");
134    }
135
136    let inconsistencies: serde_json::Value =
137        response.json().await.context("deserialize response")?;
138
139    match inconsistencies {
140        serde_json::Value::String(x) if x.is_empty() => Ok(()),
141        other => Err(anyhow!("coordinator inconsistencies! {other:?}")),
142    }
143}
144
145async fn check_catalog_state(state: &State) -> Result<(), anyhow::Error> {
147    #[derive(Debug, Deserialize)]
148    struct StorageMetadata {
149        unfinalized_shards: Option<BTreeSet<String>>,
150    }
151
152    #[derive(Debug, Deserialize)]
153    struct CatalogDump {
154        system_parameter_defaults: Option<BTreeMap<String, String>>,
155        storage_metadata: Option<StorageMetadata>,
156    }
157
158    let memory_catalog = reqwest::get(&format!(
161        "http://{}/api/catalog/dump",
162        state.materialize.internal_http_addr,
163    ))
164    .await
165    .context("GET catalog")?
166    .text()
167    .await
168    .context("deserialize catalog")?;
169
170    let dump: CatalogDump = serde_json::from_str(&memory_catalog).context("decoding catalog")?;
173
174    let Some(system_parameter_defaults) = dump.system_parameter_defaults else {
175        tracing::warn!(
178            "Missing system_parameter_defaults in memory catalog state, skipping consistency check"
179        );
180        return Ok(());
181    };
182
183    let unfinalized_shards = dump
184        .storage_metadata
185        .and_then(|storage_metadata| storage_metadata.unfinalized_shards);
186
187    let _: semver::Version = state.build_info.version.parse().expect("invalid version");
191
192    let maybe_disk_catalog = state
193        .with_catalog_copy(
194            system_parameter_defaults,
195            state.build_info,
196            &state.materialize.bootstrap_args,
197            Some(false),
199            |catalog| catalog.state().clone(),
200        )
201        .await
202        .map_err(|e| anyhow!("failed to read on-disk catalog state: {e}"))?
203        .map(|catalog| {
204            catalog
205                .dump(unfinalized_shards)
212                .expect("state must be dumpable")
213        });
214    let Some(disk_catalog) = maybe_disk_catalog else {
215        tracing::warn!("No Catalog state on disk, skipping consistency check");
218        return Ok(());
219    };
220
221    if disk_catalog != memory_catalog {
222        let diff = similar::TextDiff::from_lines(&memory_catalog, &disk_catalog)
225            .unified_diff()
226            .context_radius(50)
227            .to_string()
228            .lines()
229            .take(200)
230            .collect::<Vec<_>>()
231            .join("\n");
232
233        bail!("the in-memory state of the catalog does not match its on-disk state:\n{diff}");
234    }
235
236    Ok(())
237}
238
239async fn check_shard_tombstone(state: &State, shard_id: &str) -> Result<(), anyhow::Error> {
241    println!("$ check-shard-tombstone {shard_id}");
242
243    let (Some(consensus_uri), Some(blob_uri)) =
244        (&state.persist_consensus_url, &state.persist_blob_url)
245    else {
246        tracing::warn!("Persist consensus or blob URL not known");
248        return Ok(());
249    };
250
251    let location = PersistLocation {
252        blob_uri: blob_uri.clone(),
253        consensus_uri: consensus_uri.clone(),
254    };
255    let client = state
256        .persist_clients
257        .open(location)
258        .await
259        .context("openning persist client")?;
260    let shard_id = ShardId::from_str(shard_id).map_err(|s| anyhow!("invalid ShardId: {s}"))?;
261
262    let (_client, result) = Retry::default()
264        .max_duration(state.timeout)
265        .retry_async_with_state(client, |retry_state, client| async move {
266            let inspect_state = client
267                .inspect_shard::<mz_repr::Timestamp>(&shard_id)
268                .await
269                .context("inspecting shard")
270                .and_then(|state| serde_json::to_value(state).context("to json"))
271                .and_then(|state| {
272                    serde_json::from_value::<ShardState>(state).context("to shard state")
273                });
274
275            let result = match inspect_state {
276                Ok(state) if state.is_tombstone() => RetryResult::Ok(()),
277                Ok(state) => {
278                    if retry_state.i == 0 {
279                        print!("shard isn't tombstoned; sleeping to see if it gets cleaned up.");
280                    }
281                    if let Some(backoff) = retry_state.next_backoff {
282                        if !backoff.is_zero() {
283                            print!(" {:.0?}", backoff);
284                        }
285                    }
286                    std::io::stdout().flush().expect("flushing stdout");
287
288                    RetryResult::RetryableErr(anyhow!("non-tombstone state: {state:?}"))
289                }
290                Result::Err(e) => RetryResult::FatalErr(e),
291            };
292
293            (client, result)
294        })
295        .await;
296
297    result
298}
299
300#[derive(Debug, Serialize, Deserialize)]
302struct ShardState {
303    leased_readers: BTreeMap<String, serde_json::Value>,
304    critical_readers: BTreeMap<String, serde_json::Value>,
305    writers: BTreeMap<String, serde_json::Value>,
306    since: Vec<mz_repr::Timestamp>,
307    upper: Vec<mz_repr::Timestamp>,
308}
309
310impl ShardState {
311    fn is_tombstone(&self) -> bool {
313        self.upper.is_empty()
314            && self.since.is_empty()
315            && self.writers.is_empty()
316            && self.leased_readers.is_empty()
317            && self.critical_readers.is_empty()
318    }
319}