1use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Write;
12use std::io::Write as _;
13use std::str::FromStr;
14use std::time::Duration;
15
16use crate::action;
17use crate::action::{ControlFlow, Run, State};
18use crate::parser::{BuiltinCommand, LineReader, parse};
19use anyhow::{Context, anyhow, bail};
20use mz_ore::retry::{Retry, RetryResult};
21use mz_persist_client::{PersistLocation, ShardId};
22use mz_postgres_util::{query_one, sql};
23use reqwest::StatusCode;
24use serde::{Deserialize, Serialize};
25
26#[derive(clap::ValueEnum, Default, Debug, Copy, Clone, PartialEq, Eq)]
28pub enum Level {
29 #[default]
31 File,
32 Statement,
34 Disable,
36}
37
38impl FromStr for Level {
39 type Err = String;
40
41 fn from_str(s: &str) -> Result<Self, Self::Err> {
42 match s {
43 "file" => Ok(Level::File),
44 "statement" => Ok(Level::Statement),
45 "disable" => Ok(Level::Disable),
46 s => Err(format!("Unknown consistency check level: {s}")),
47 }
48 }
49}
50
51pub fn skip_consistency_checks(
53 mut cmd: BuiltinCommand,
54 state: &mut State,
55) -> Result<ControlFlow, anyhow::Error> {
56 let reason = cmd
57 .args
58 .string("reason")
59 .context("must provide reason for skipping")?;
60 tracing::info!(reason, "Skipping consistency checks as requested.");
61
62 state.consistency_checks_adhoc_skip = true;
63 Ok(ControlFlow::Continue)
64}
65
66pub async fn run_consistency_checks(state: &State) -> Result<ControlFlow, anyhow::Error> {
69 if state.consistency_checks_adhoc_skip {
71 return Ok(ControlFlow::Continue);
72 }
73
74 let coordinator = check_coordinator(state).await.context("coordinator");
75 let catalog_state = check_catalog_state(state).await.context("catalog state");
76 let statement_logging_state = if state.check_statement_logging {
77 check_statement_logging(state)
78 .await
79 .context("statement logging state")
80 } else {
81 Ok(())
82 };
83 let mut msg = String::new();
88 if let Err(e) = coordinator {
89 writeln!(&mut msg, "coordinator inconsistency: {e:?}")?;
90 }
91 if let Err(e) = catalog_state {
92 writeln!(&mut msg, "catalog inconsistency: {e:?}")?;
93 }
94 if let Err(e) = statement_logging_state {
95 writeln!(&mut msg, "statement logging inconsistency: {e:?}")?;
96 }
97
98 if msg.is_empty() {
99 Ok(ControlFlow::Continue)
100 } else {
101 Err(anyhow!("{msg}"))
102 }
103}
104
105pub async fn run_check_shard_tombstone(
109 mut cmd: BuiltinCommand,
110 state: &State,
111) -> Result<ControlFlow, anyhow::Error> {
112 let shard_id = cmd.args.string("shard-id")?;
113 check_shard_tombstone(state, &shard_id).await?;
114 Ok(ControlFlow::Continue)
115}
116
117async fn check_coordinator(state: &State) -> Result<(), anyhow::Error> {
119 let response = reqwest::get(&format!(
121 "http://{}/api/coordinator/dump",
122 state.materialize.internal_http_addr
123 ))
124 .await?;
125 if !response.status().is_success() && response.status() != StatusCode::NOT_FOUND {
127 let response: Result<serde_json::Value, _> = response.json().await;
128 bail!("Coordinator failed to dump state: {:?}", response);
129 }
130
131 let response = Retry::default()
133 .max_duration(Duration::from_secs(2))
134 .retry_async(|_| async {
135 reqwest::get(&format!(
136 "http://{}/api/coordinator/check",
137 state.materialize.internal_http_addr,
138 ))
139 .await
140 })
141 .await
142 .context("querying coordinator")?;
143 if response.status() == StatusCode::NOT_FOUND {
144 bail!("Coordinator consistency check not available");
145 }
146
147 let inconsistencies: serde_json::Value =
148 response.json().await.context("deserialize response")?;
149
150 match inconsistencies {
151 serde_json::Value::String(x) if x.is_empty() => Ok(()),
152 other => Err(anyhow!("coordinator inconsistencies! {other:?}")),
153 }
154}
155
156async fn check_catalog_state(state: &State) -> Result<(), anyhow::Error> {
158 #[derive(Debug, Deserialize)]
159 struct StorageMetadata {
160 unfinalized_shards: Option<BTreeSet<String>>,
161 }
162
163 #[derive(Debug, Deserialize)]
164 struct CatalogDump {
165 system_parameter_defaults: Option<BTreeMap<String, String>>,
166 storage_metadata: Option<StorageMetadata>,
167 }
168
169 let memory_catalog = reqwest::get(&format!(
172 "http://{}/api/catalog/dump",
173 state.materialize.internal_http_addr,
174 ))
175 .await
176 .context("GET catalog")?
177 .text()
178 .await
179 .context("deserialize catalog")?;
180
181 let dump: CatalogDump = serde_json::from_str(&memory_catalog).context("decoding catalog")?;
184
185 let Some(system_parameter_defaults) = dump.system_parameter_defaults else {
186 tracing::warn!(
189 "Missing system_parameter_defaults in memory catalog state, skipping consistency check"
190 );
191 return Ok(());
192 };
193
194 let unfinalized_shards = dump
195 .storage_metadata
196 .and_then(|storage_metadata| storage_metadata.unfinalized_shards);
197
198 let _: semver::Version = state.build_info.version.parse().expect("invalid version");
202
203 let maybe_disk_catalog = state
204 .with_catalog_copy(
205 system_parameter_defaults,
206 state.build_info,
207 &state.materialize.bootstrap_args,
208 Some(false),
210 |catalog| catalog.state().clone(),
211 )
212 .await
213 .map_err(|e| anyhow!("failed to read on-disk catalog state: {e}"))?
214 .map(|catalog| {
215 catalog
216 .dump(unfinalized_shards)
223 .expect("state must be dumpable")
224 });
225 let Some(disk_catalog) = maybe_disk_catalog else {
226 tracing::warn!("No Catalog state on disk, skipping consistency check");
229 return Ok(());
230 };
231
232 if disk_catalog != memory_catalog {
233 let diff = similar::TextDiff::from_lines(&memory_catalog, &disk_catalog)
236 .unified_diff()
237 .context_radius(50)
238 .to_string()
239 .lines()
240 .take(200)
241 .collect::<Vec<_>>()
242 .join("\n");
243
244 bail!("the in-memory state of the catalog does not match its on-disk state:\n{diff}");
245 }
246
247 Ok(())
248}
249
250async fn check_statement_logging(orig_state: &State) -> Result<(), anyhow::Error> {
264 use crate::util::postgres::postgres_client;
265
266 let (mut state, state_cleanup) = action::create_state(&orig_state.config).await?;
269
270 let mz_system_url = format!(
272 "postgres://mz_system:materialize@{}",
273 state.materialize.internal_sql_addr
274 );
275
276 let (client, _handle) = postgres_client(&mz_system_url, state.default_timeout)
277 .await
278 .context("connecting as mz_system to query enable_rbac_checks")?;
279
280 let row = query_one(&client, sql!("SHOW enable_rbac_checks"), &[])
281 .await
282 .context("querying enable_rbac_checks")?;
283
284 let original_value: String = row.get(0);
285
286 let check_script = format!(
290 r#"
291$ postgres-execute connection=postgres://mz_system:materialize@{0}
292ALTER SYSTEM SET enable_rbac_checks = false
293
294> SELECT count(*)
295 FROM mz_internal.mz_recent_activity_log
296 WHERE
297 (finished_at IS NULL OR finished_status IS NULL)
298 AND sql NOT LIKE '%__FILTER-OUT-THIS-QUERY__%'
299 AND finished_status != 'aborted';
3000
301
302$ postgres-execute connection=postgres://mz_system:materialize@{0}
303ALTER SYSTEM SET enable_rbac_checks = {1}
304"#,
305 state.materialize.internal_sql_addr, original_value
306 );
307
308 let mut line_reader = LineReader::new(&check_script);
309 let cmds = parse(&mut line_reader).map_err(|e| anyhow!("{}", e.source))?;
310
311 for cmd in cmds {
312 cmd.run(&mut state)
313 .await
314 .map_err(|e| anyhow!("{}", e.source))?;
315 }
316
317 drop(state);
318 state_cleanup.await?;
319
320 Ok(())
321}
322
323async fn check_shard_tombstone(state: &State, shard_id: &str) -> Result<(), anyhow::Error> {
325 println!("$ check-shard-tombstone {shard_id}");
326
327 let (Some(consensus_uri), Some(blob_uri)) =
328 (&state.persist_consensus_url, &state.persist_blob_url)
329 else {
330 tracing::warn!("Persist consensus or blob URL not known");
332 return Ok(());
333 };
334
335 let location = PersistLocation {
336 blob_uri: blob_uri.clone(),
337 consensus_uri: consensus_uri.clone(),
338 };
339 let client = state
340 .persist_clients
341 .open(location)
342 .await
343 .context("openning persist client")?;
344 let shard_id = ShardId::from_str(shard_id).map_err(|s| anyhow!("invalid ShardId: {s}"))?;
345
346 let (_client, result) = Retry::default()
348 .max_duration(state.timeout)
349 .retry_async_with_state(client, |retry_state, client| async move {
350 let inspect_state = client
351 .inspect_shard::<mz_repr::Timestamp>(&shard_id)
352 .await
353 .context("inspecting shard")
354 .and_then(|state| serde_json::to_value(state).context("to json"))
355 .and_then(|state| {
356 serde_json::from_value::<ShardState>(state).context("to shard state")
357 });
358
359 let result = match inspect_state {
360 Ok(state) if state.is_tombstone() => RetryResult::Ok(()),
361 Ok(state) => {
362 if retry_state.i == 0 {
363 print!("shard isn't tombstoned; sleeping to see if it gets cleaned up.");
364 }
365 if let Some(backoff) = retry_state.next_backoff {
366 if !backoff.is_zero() {
367 print!(" {:.0?}", backoff);
368 }
369 }
370 std::io::stdout().flush().expect("flushing stdout");
371
372 RetryResult::RetryableErr(anyhow!("non-tombstone state: {state:?}"))
373 }
374 Result::Err(e) => RetryResult::FatalErr(e),
375 };
376
377 (client, result)
378 })
379 .await;
380
381 result
382}
383
384#[derive(Debug, Serialize, Deserialize)]
386struct ShardState {
387 leased_readers: BTreeMap<String, serde_json::Value>,
388 critical_readers: BTreeMap<String, serde_json::Value>,
389 writers: BTreeMap<String, serde_json::Value>,
390 since: Vec<mz_repr::Timestamp>,
391 upper: Vec<mz_repr::Timestamp>,
392}
393
394impl ShardState {
395 fn is_tombstone(&self) -> bool {
397 self.upper.is_empty()
398 && self.since.is_empty()
399 && self.writers.is_empty()
400 && self.leased_readers.is_empty()
401 && self.critical_readers.is_empty()
402 }
403}