1use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Write;
12use std::io::Write as _;
13use std::str::FromStr;
14use std::time::Duration;
15
16use crate::action;
17use crate::action::{ControlFlow, Run, State};
18use crate::parser::{BuiltinCommand, LineReader, parse};
19use anyhow::{Context, anyhow, bail};
20use mz_ore::retry::{Retry, RetryResult};
21use mz_persist_client::{PersistLocation, ShardId};
22use reqwest::StatusCode;
23use serde::{Deserialize, Serialize};
24
25#[derive(clap::ValueEnum, Default, Debug, Copy, Clone, PartialEq, Eq)]
27pub enum Level {
28 #[default]
30 File,
31 Statement,
33 Disable,
35}
36
37impl FromStr for Level {
38 type Err = String;
39
40 fn from_str(s: &str) -> Result<Self, Self::Err> {
41 match s {
42 "file" => Ok(Level::File),
43 "statement" => Ok(Level::Statement),
44 "disable" => Ok(Level::Disable),
45 s => Err(format!("Unknown consistency check level: {s}")),
46 }
47 }
48}
49
50pub fn skip_consistency_checks(
52 mut cmd: BuiltinCommand,
53 state: &mut State,
54) -> Result<ControlFlow, anyhow::Error> {
55 let reason = cmd
56 .args
57 .string("reason")
58 .context("must provide reason for skipping")?;
59 tracing::info!(reason, "Skipping consistency checks as requested.");
60
61 state.consistency_checks_adhoc_skip = true;
62 Ok(ControlFlow::Continue)
63}
64
65pub async fn run_consistency_checks(state: &State) -> Result<ControlFlow, anyhow::Error> {
68 if state.consistency_checks_adhoc_skip {
70 return Ok(ControlFlow::Continue);
71 }
72
73 let coordinator = check_coordinator(state).await.context("coordinator");
74 let catalog_state = check_catalog_state(state).await.context("catalog state");
75 let statement_logging_state = if state.check_statement_logging {
76 check_statement_logging(state)
77 .await
78 .context("statement logging state")
79 } else {
80 Ok(())
81 };
82 let mut msg = String::new();
87 if let Err(e) = coordinator {
88 writeln!(&mut msg, "coordinator inconsistency: {e:?}")?;
89 }
90 if let Err(e) = catalog_state {
91 writeln!(&mut msg, "catalog inconsistency: {e:?}")?;
92 }
93 if let Err(e) = statement_logging_state {
94 writeln!(&mut msg, "statement logging inconsistency: {e:?}")?;
95 }
96
97 if msg.is_empty() {
98 Ok(ControlFlow::Continue)
99 } else {
100 Err(anyhow!("{msg}"))
101 }
102}
103
104pub async fn run_check_shard_tombstone(
108 mut cmd: BuiltinCommand,
109 state: &State,
110) -> Result<ControlFlow, anyhow::Error> {
111 let shard_id = cmd.args.string("shard-id")?;
112 check_shard_tombstone(state, &shard_id).await?;
113 Ok(ControlFlow::Continue)
114}
115
116async fn check_coordinator(state: &State) -> Result<(), anyhow::Error> {
118 let response = reqwest::get(&format!(
120 "http://{}/api/coordinator/dump",
121 state.materialize.internal_http_addr
122 ))
123 .await?;
124 if !response.status().is_success() && response.status() != StatusCode::NOT_FOUND {
126 let response: Result<serde_json::Value, _> = response.json().await;
127 bail!("Coordinator failed to dump state: {:?}", response);
128 }
129
130 let response = Retry::default()
132 .max_duration(Duration::from_secs(2))
133 .retry_async(|_| async {
134 reqwest::get(&format!(
135 "http://{}/api/coordinator/check",
136 state.materialize.internal_http_addr,
137 ))
138 .await
139 })
140 .await
141 .context("querying coordinator")?;
142 if response.status() == StatusCode::NOT_FOUND {
143 bail!("Coordinator consistency check not available");
144 }
145
146 let inconsistencies: serde_json::Value =
147 response.json().await.context("deserialize response")?;
148
149 match inconsistencies {
150 serde_json::Value::String(x) if x.is_empty() => Ok(()),
151 other => Err(anyhow!("coordinator inconsistencies! {other:?}")),
152 }
153}
154
155async fn check_catalog_state(state: &State) -> Result<(), anyhow::Error> {
157 #[derive(Debug, Deserialize)]
158 struct StorageMetadata {
159 unfinalized_shards: Option<BTreeSet<String>>,
160 }
161
162 #[derive(Debug, Deserialize)]
163 struct CatalogDump {
164 system_parameter_defaults: Option<BTreeMap<String, String>>,
165 storage_metadata: Option<StorageMetadata>,
166 }
167
168 let memory_catalog = reqwest::get(&format!(
171 "http://{}/api/catalog/dump",
172 state.materialize.internal_http_addr,
173 ))
174 .await
175 .context("GET catalog")?
176 .text()
177 .await
178 .context("deserialize catalog")?;
179
180 let dump: CatalogDump = serde_json::from_str(&memory_catalog).context("decoding catalog")?;
183
184 let Some(system_parameter_defaults) = dump.system_parameter_defaults else {
185 tracing::warn!(
188 "Missing system_parameter_defaults in memory catalog state, skipping consistency check"
189 );
190 return Ok(());
191 };
192
193 let unfinalized_shards = dump
194 .storage_metadata
195 .and_then(|storage_metadata| storage_metadata.unfinalized_shards);
196
197 let _: semver::Version = state.build_info.version.parse().expect("invalid version");
201
202 let maybe_disk_catalog = state
203 .with_catalog_copy(
204 system_parameter_defaults,
205 state.build_info,
206 &state.materialize.bootstrap_args,
207 Some(false),
209 |catalog| catalog.state().clone(),
210 )
211 .await
212 .map_err(|e| anyhow!("failed to read on-disk catalog state: {e}"))?
213 .map(|catalog| {
214 catalog
215 .dump(unfinalized_shards)
222 .expect("state must be dumpable")
223 });
224 let Some(disk_catalog) = maybe_disk_catalog else {
225 tracing::warn!("No Catalog state on disk, skipping consistency check");
228 return Ok(());
229 };
230
231 if disk_catalog != memory_catalog {
232 let diff = similar::TextDiff::from_lines(&memory_catalog, &disk_catalog)
235 .unified_diff()
236 .context_radius(50)
237 .to_string()
238 .lines()
239 .take(200)
240 .collect::<Vec<_>>()
241 .join("\n");
242
243 bail!("the in-memory state of the catalog does not match its on-disk state:\n{diff}");
244 }
245
246 Ok(())
247}
248
249async fn check_statement_logging(orig_state: &State) -> Result<(), anyhow::Error> {
263 use crate::util::postgres::postgres_client;
264
265 let (mut state, state_cleanup) = action::create_state(&orig_state.config).await?;
268
269 let mz_system_url = format!(
271 "postgres://mz_system:materialize@{}",
272 state.materialize.internal_sql_addr
273 );
274
275 let (client, _handle) = postgres_client(&mz_system_url, state.default_timeout)
276 .await
277 .context("connecting as mz_system to query enable_rbac_checks")?;
278
279 let row = client
280 .query_one("SHOW enable_rbac_checks", &[])
281 .await
282 .context("querying enable_rbac_checks")?;
283
284 let original_value: String = row.get(0);
285
286 let check_script = format!(
290 r#"
291$ postgres-execute connection=postgres://mz_system:materialize@{0}
292ALTER SYSTEM SET enable_rbac_checks = false
293
294> SELECT count(*)
295 FROM mz_internal.mz_recent_activity_log
296 WHERE
297 (finished_at IS NULL OR finished_status IS NULL)
298 AND sql NOT LIKE '%__FILTER-OUT-THIS-QUERY__%'
299 AND finished_status != 'aborted';
3000
301
302$ postgres-execute connection=postgres://mz_system:materialize@{0}
303ALTER SYSTEM SET enable_rbac_checks = {1}
304"#,
305 state.materialize.internal_sql_addr, original_value
306 );
307
308 let mut line_reader = LineReader::new(&check_script);
309 let cmds = parse(&mut line_reader).map_err(|e| anyhow!("{}", e.source))?;
310
311 for cmd in cmds {
312 cmd.run(&mut state)
313 .await
314 .map_err(|e| anyhow!("{}", e.source))?;
315 }
316
317 drop(state);
318 state_cleanup.await?;
319
320 Ok(())
321}
322
323async fn check_shard_tombstone(state: &State, shard_id: &str) -> Result<(), anyhow::Error> {
325 println!("$ check-shard-tombstone {shard_id}");
326
327 let (Some(consensus_uri), Some(blob_uri)) =
328 (&state.persist_consensus_url, &state.persist_blob_url)
329 else {
330 tracing::warn!("Persist consensus or blob URL not known");
332 return Ok(());
333 };
334
335 let location = PersistLocation {
336 blob_uri: blob_uri.clone(),
337 consensus_uri: consensus_uri.clone(),
338 };
339 let client = state
340 .persist_clients
341 .open(location)
342 .await
343 .context("openning persist client")?;
344 let shard_id = ShardId::from_str(shard_id).map_err(|s| anyhow!("invalid ShardId: {s}"))?;
345
346 let (_client, result) = Retry::default()
348 .max_duration(state.timeout)
349 .retry_async_with_state(client, |retry_state, client| async move {
350 let inspect_state = client
351 .inspect_shard::<mz_repr::Timestamp>(&shard_id)
352 .await
353 .context("inspecting shard")
354 .and_then(|state| serde_json::to_value(state).context("to json"))
355 .and_then(|state| {
356 serde_json::from_value::<ShardState>(state).context("to shard state")
357 });
358
359 let result = match inspect_state {
360 Ok(state) if state.is_tombstone() => RetryResult::Ok(()),
361 Ok(state) => {
362 if retry_state.i == 0 {
363 print!("shard isn't tombstoned; sleeping to see if it gets cleaned up.");
364 }
365 if let Some(backoff) = retry_state.next_backoff {
366 if !backoff.is_zero() {
367 print!(" {:.0?}", backoff);
368 }
369 }
370 std::io::stdout().flush().expect("flushing stdout");
371
372 RetryResult::RetryableErr(anyhow!("non-tombstone state: {state:?}"))
373 }
374 Result::Err(e) => RetryResult::FatalErr(e),
375 };
376
377 (client, result)
378 })
379 .await;
380
381 result
382}
383
384#[derive(Debug, Serialize, Deserialize)]
386struct ShardState {
387 leased_readers: BTreeMap<String, serde_json::Value>,
388 critical_readers: BTreeMap<String, serde_json::Value>,
389 writers: BTreeMap<String, serde_json::Value>,
390 since: Vec<mz_repr::Timestamp>,
391 upper: Vec<mz_repr::Timestamp>,
392}
393
394impl ShardState {
395 fn is_tombstone(&self) -> bool {
397 self.upper.is_empty()
398 && self.since.is_empty()
399 && self.writers.is_empty()
400 && self.leased_readers.is_empty()
401 && self.critical_readers.is_empty()
402 }
403}