mz_testdrive/action/
consistency.rs1use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Write;
12use std::io::Write as _;
13use std::str::FromStr;
14use std::time::Duration;
15
16use anyhow::{Context, anyhow, bail};
17use mz_ore::retry::{Retry, RetryResult};
18use mz_persist_client::{PersistLocation, ShardId};
19use reqwest::StatusCode;
20use serde::{Deserialize, Serialize};
21
22use crate::action::{ControlFlow, State};
23use crate::parser::BuiltinCommand;
24
25#[derive(clap::ValueEnum, Default, Debug, Copy, Clone, PartialEq, Eq)]
27pub enum Level {
28 #[default]
30 File,
31 Statement,
33 Disable,
35}
36
37impl FromStr for Level {
38 type Err = String;
39
40 fn from_str(s: &str) -> Result<Self, Self::Err> {
41 match s {
42 "file" => Ok(Level::File),
43 "statement" => Ok(Level::Statement),
44 "disable" => Ok(Level::Disable),
45 s => Err(format!("Unknown consistency check level: {s}")),
46 }
47 }
48}
49
50pub fn skip_consistency_checks(
52 mut cmd: BuiltinCommand,
53 state: &mut State,
54) -> Result<ControlFlow, anyhow::Error> {
55 let reason = cmd
56 .args
57 .string("reason")
58 .context("must provide reason for skipping")?;
59 tracing::info!(reason, "Skipping consistency checks as requested.");
60
61 state.consistency_checks_adhoc_skip = true;
62 Ok(ControlFlow::Continue)
63}
64
65pub async fn run_consistency_checks(state: &State) -> Result<ControlFlow, anyhow::Error> {
68 if state.consistency_checks_adhoc_skip {
70 return Ok(ControlFlow::Continue);
71 }
72
73 let coordinator = check_coordinator(state).await.context("coordinator");
74 let catalog_state = check_catalog_state(state).await.context("catalog state");
75 let mut msg = String::new();
80 if let Err(e) = coordinator {
81 writeln!(&mut msg, "coordinator inconsistency: {e:?}")?;
82 }
83 if let Err(e) = catalog_state {
84 writeln!(&mut msg, "catalog inconsistency: {e:?}")?;
85 }
86
87 if msg.is_empty() {
88 Ok(ControlFlow::Continue)
89 } else {
90 Err(anyhow!("{msg}"))
91 }
92}
93
94pub async fn run_check_shard_tombstone(
98 mut cmd: BuiltinCommand,
99 state: &State,
100) -> Result<ControlFlow, anyhow::Error> {
101 let shard_id = cmd.args.string("shard-id")?;
102 check_shard_tombstone(state, &shard_id).await?;
103 Ok(ControlFlow::Continue)
104}
105
106async fn check_coordinator(state: &State) -> Result<(), anyhow::Error> {
108 let response = reqwest::get(&format!(
110 "http://{}/api/coordinator/dump",
111 state.materialize.internal_http_addr
112 ))
113 .await?;
114 if !response.status().is_success() && response.status() != StatusCode::NOT_FOUND {
116 let response: Result<serde_json::Value, _> = response.json().await;
117 bail!("Coordinator failed to dump state: {:?}", response);
118 }
119
120 let response = Retry::default()
122 .max_duration(Duration::from_secs(2))
123 .retry_async(|_| async {
124 reqwest::get(&format!(
125 "http://{}/api/coordinator/check",
126 state.materialize.internal_http_addr,
127 ))
128 .await
129 })
130 .await
131 .context("querying coordinator")?;
132 if response.status() == StatusCode::NOT_FOUND {
133 bail!("Coordinator consistency check not available");
134 }
135
136 let inconsistencies: serde_json::Value =
137 response.json().await.context("deserialize response")?;
138
139 match inconsistencies {
140 serde_json::Value::String(x) if x.is_empty() => Ok(()),
141 other => Err(anyhow!("coordinator inconsistencies! {other:?}")),
142 }
143}
144
145async fn check_catalog_state(state: &State) -> Result<(), anyhow::Error> {
147 #[derive(Debug, Deserialize)]
148 struct StorageMetadata {
149 unfinalized_shards: Option<BTreeSet<String>>,
150 }
151
152 #[derive(Debug, Deserialize)]
153 struct CatalogDump {
154 system_parameter_defaults: Option<BTreeMap<String, String>>,
155 storage_metadata: Option<StorageMetadata>,
156 }
157
158 let memory_catalog = reqwest::get(&format!(
161 "http://{}/api/catalog/dump",
162 state.materialize.internal_http_addr,
163 ))
164 .await
165 .context("GET catalog")?
166 .text()
167 .await
168 .context("deserialize catalog")?;
169
170 let dump: CatalogDump = serde_json::from_str(&memory_catalog).context("decoding catalog")?;
173
174 let Some(system_parameter_defaults) = dump.system_parameter_defaults else {
175 tracing::warn!(
178 "Missing system_parameter_defaults in memory catalog state, skipping consistency check"
179 );
180 return Ok(());
181 };
182
183 let unfinalized_shards = dump
184 .storage_metadata
185 .and_then(|storage_metadata| storage_metadata.unfinalized_shards);
186
187 let _: semver::Version = state.build_info.version.parse().expect("invalid version");
191
192 let maybe_disk_catalog = state
193 .with_catalog_copy(
194 system_parameter_defaults,
195 state.build_info,
196 &state.materialize.bootstrap_args,
197 Some(false),
199 |catalog| catalog.state().clone(),
200 )
201 .await
202 .map_err(|e| anyhow!("failed to read on-disk catalog state: {e}"))?
203 .map(|catalog| {
204 catalog
205 .dump(unfinalized_shards)
212 .expect("state must be dumpable")
213 });
214 let Some(disk_catalog) = maybe_disk_catalog else {
215 tracing::warn!("No Catalog state on disk, skipping consistency check");
218 return Ok(());
219 };
220
221 if disk_catalog != memory_catalog {
222 let diff = similar::TextDiff::from_lines(&memory_catalog, &disk_catalog)
225 .unified_diff()
226 .context_radius(50)
227 .to_string()
228 .lines()
229 .take(200)
230 .collect::<Vec<_>>()
231 .join("\n");
232
233 bail!("the in-memory state of the catalog does not match its on-disk state:\n{diff}");
234 }
235
236 Ok(())
237}
238
239async fn check_shard_tombstone(state: &State, shard_id: &str) -> Result<(), anyhow::Error> {
241 println!("$ check-shard-tombstone {shard_id}");
242
243 let (Some(consensus_uri), Some(blob_uri)) =
244 (&state.persist_consensus_url, &state.persist_blob_url)
245 else {
246 tracing::warn!("Persist consensus or blob URL not known");
248 return Ok(());
249 };
250
251 let location = PersistLocation {
252 blob_uri: blob_uri.clone(),
253 consensus_uri: consensus_uri.clone(),
254 };
255 let client = state
256 .persist_clients
257 .open(location)
258 .await
259 .context("openning persist client")?;
260 let shard_id = ShardId::from_str(shard_id).map_err(|s| anyhow!("invalid ShardId: {s}"))?;
261
262 let (_client, result) = Retry::default()
264 .max_duration(state.timeout)
265 .retry_async_with_state(client, |retry_state, client| async move {
266 let inspect_state = client
267 .inspect_shard::<mz_repr::Timestamp>(&shard_id)
268 .await
269 .context("inspecting shard")
270 .and_then(|state| serde_json::to_value(state).context("to json"))
271 .and_then(|state| {
272 serde_json::from_value::<ShardState>(state).context("to shard state")
273 });
274
275 let result = match inspect_state {
276 Ok(state) if state.is_tombstone() => RetryResult::Ok(()),
277 Ok(state) => {
278 if retry_state.i == 0 {
279 print!("shard isn't tombstoned; sleeping to see if it gets cleaned up.");
280 }
281 if let Some(backoff) = retry_state.next_backoff {
282 if !backoff.is_zero() {
283 print!(" {:.0?}", backoff);
284 }
285 }
286 std::io::stdout().flush().expect("flushing stdout");
287
288 RetryResult::RetryableErr(anyhow!("non-tombstone state: {state:?}"))
289 }
290 Result::Err(e) => RetryResult::FatalErr(e),
291 };
292
293 (client, result)
294 })
295 .await;
296
297 result
298}
299
300#[derive(Debug, Serialize, Deserialize)]
302struct ShardState {
303 leased_readers: BTreeMap<String, serde_json::Value>,
304 critical_readers: BTreeMap<String, serde_json::Value>,
305 writers: BTreeMap<String, serde_json::Value>,
306 since: Vec<mz_repr::Timestamp>,
307 upper: Vec<mz_repr::Timestamp>,
308}
309
310impl ShardState {
311 fn is_tombstone(&self) -> bool {
313 self.upper.is_empty()
314 && self.since.is_empty()
315 && self.writers.is_empty()
316 && self.leased_readers.is_empty()
317 && self.critical_readers.is_empty()
318 }
319}