1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Write;
12use std::io::Write as _;
13use std::str::FromStr;
14use std::time::Duration;
1516use anyhow::{Context, anyhow, bail};
17use mz_ore::retry::{Retry, RetryResult};
18use mz_persist_client::{PersistLocation, ShardId};
19use reqwest::StatusCode;
20use serde::{Deserialize, Serialize};
2122use crate::action::{ControlFlow, State};
23use crate::parser::BuiltinCommand;
2425/// Level of consistency checks we should enable on a testdrive run.
26#[derive(clap::ValueEnum, Default, Debug, Copy, Clone, PartialEq, Eq)]
27pub enum Level {
28/// Run the consistency checks after the completion of a test file.
29#[default]
30File,
31/// Run the consistency checks after each statement, good for debugging.
32Statement,
33/// Disable consistency checks entirely.
34Disable,
35}
3637impl FromStr for Level {
38type Err = String;
3940fn from_str(s: &str) -> Result<Self, Self::Err> {
41match s {
42"file" => Ok(Level::File),
43"statement" => Ok(Level::Statement),
44"disable" => Ok(Level::Disable),
45 s => Err(format!("Unknown consistency check level: {s}")),
46 }
47 }
48}
4950/// Skips consistency checks for the current file.
51pub fn skip_consistency_checks(
52mut cmd: BuiltinCommand,
53 state: &mut State,
54) -> Result<ControlFlow, anyhow::Error> {
55let reason = cmd
56 .args
57 .string("reason")
58 .context("must provide reason for skipping")?;
59tracing::info!(reason, "Skipping consistency checks as requested.");
6061 state.consistency_checks_adhoc_skip = true;
62Ok(ControlFlow::Continue)
63}
6465/// Runs consistency checks against multiple parts of Materialize to make sure we haven't violated
66/// our invariants or leaked resources.
67pub async fn run_consistency_checks(state: &State) -> Result<ControlFlow, anyhow::Error> {
68// Return early if the user adhoc disabled consistency checks for the current file.
69if state.consistency_checks_adhoc_skip {
70return Ok(ControlFlow::Continue);
71 }
7273let coordinator = check_coordinator(state).await.context("coordinator");
74let catalog_state = check_catalog_state(state).await.context("catalog state");
75// TODO(parkmycar): Fix subsources so they don't leak their shards and then add a leaked shards
76 // consistency check.
7778 // Make sure to report all inconsistencies, not just the first.
79let mut msg = String::new();
80if let Err(e) = coordinator {
81writeln!(&mut msg, "coordinator inconsistency: {e:?}")?;
82 }
83if let Err(e) = catalog_state {
84writeln!(&mut msg, "catalog inconsistency: {e:?}")?;
85 }
8687if msg.is_empty() {
88Ok(ControlFlow::Continue)
89 } else {
90Err(anyhow!("{msg}"))
91 }
92}
9394/// Checks if a shard in Persist has been tombstoned.
95///
96/// TODO(parkmycar): Run this as part of the consistency checks, instead of as a specific command.
97pub async fn run_check_shard_tombstone(
98mut cmd: BuiltinCommand,
99 state: &State,
100) -> Result<ControlFlow, anyhow::Error> {
101let shard_id = cmd.args.string("shard-id")?;
102 check_shard_tombstone(state, &shard_id).await?;
103Ok(ControlFlow::Continue)
104}
105106/// Asks the Coordinator to run it's own internal consistency checks.
107async fn check_coordinator(state: &State) -> Result<(), anyhow::Error> {
108// Make sure we can dump the Coordinator state.
109let response = reqwest::get(&format!(
110"http://{}/api/coordinator/dump",
111 state.materialize.internal_http_addr
112 ))
113 .await?;
114// We allow NOT_FOUND to support upgrade tests where this endpoint doesn't yet exist.
115if !response.status().is_success() && response.status() != StatusCode::NOT_FOUND {
116let response: Result<serde_json::Value, _> = response.json().await;
117bail!("Coordinator failed to dump state: {:?}", response);
118 }
119120// Run the consistency checks.
121let response = Retry::default()
122 .max_duration(Duration::from_secs(2))
123 .retry_async(|_| async {
124 reqwest::get(&format!(
125"http://{}/api/coordinator/check",
126 state.materialize.internal_http_addr,
127 ))
128 .await
129})
130 .await
131.context("querying coordinator")?;
132if response.status() == StatusCode::NOT_FOUND {
133bail!("Coordinator consistency check not available");
134 }
135136let inconsistencies: serde_json::Value =
137 response.json().await.context("deserialize response")?;
138139match inconsistencies {
140 serde_json::Value::String(x) if x.is_empty() => Ok(()),
141 other => Err(anyhow!("coordinator inconsistencies! {other:?}")),
142 }
143}
144145/// Checks that the in-memory catalog matches what we have persisted on disk.
146async fn check_catalog_state(state: &State) -> Result<(), anyhow::Error> {
147#[derive(Debug, Deserialize)]
148struct StorageMetadata {
149 unfinalized_shards: Option<BTreeSet<String>>,
150 }
151152#[derive(Debug, Deserialize)]
153struct CatalogDump {
154 system_parameter_defaults: Option<BTreeMap<String, String>>,
155 storage_metadata: Option<StorageMetadata>,
156 }
157158// Dump the in-memory catalog state of the Materialize environment that we're
159 // connected to.
160let memory_catalog = reqwest::get(&format!(
161"http://{}/api/catalog/dump",
162 state.materialize.internal_http_addr,
163 ))
164 .await
165.context("GET catalog")?
166.text()
167 .await
168.context("deserialize catalog")?;
169170// Pull out the system parameter defaults from the in-memory catalog, as we
171 // need to load the disk catalog with the same defaults.
172let dump: CatalogDump = serde_json::from_str(&memory_catalog).context("decoding catalog")?;
173174let Some(system_parameter_defaults) = dump.system_parameter_defaults else {
175// TODO(parkmycar, def-): Ideally this could be an error, but a lot of test suites fail. We
176 // should explicitly disable consistency check in these test suites.
177tracing::warn!(
178"Missing system_parameter_defaults in memory catalog state, skipping consistency check"
179);
180return Ok(());
181 };
182183let unfinalized_shards = dump
184 .storage_metadata
185 .and_then(|storage_metadata| storage_metadata.unfinalized_shards);
186187// Load the on-disk catalog and dump its state.
188189 // Make sure the version is parseable.
190let _: semver::Version = state.build_info.version.parse().expect("invalid version");
191192let maybe_disk_catalog = state
193 .with_catalog_copy(
194 system_parameter_defaults,
195 state.build_info,
196&state.materialize.bootstrap_args,
197// The expression cache can be taxing on the CPU and is unnecessary for consistency checks.
198Some(false),
199 |catalog| catalog.state().clone(),
200 )
201 .await
202.map_err(|e| anyhow!("failed to read on-disk catalog state: {e}"))?
203.map(|catalog| {
204 catalog
205// The set of unfinalized shards in the catalog are updated asynchronously by
206 // background processes. As a result, the value may legitimately change after
207 // fetching the memory catalog but before fetching the disk catalog, causing the
208 // comparison to fail. This is a gross hack that always sets the disk catalog's
209 // unfinalized shards equal to the memory catalog's unfinalized shards to ignore
210 // false negatives. Unfortunately, we also end up ignoring true negatives.
211.dump(unfinalized_shards)
212 .expect("state must be dumpable")
213 });
214let Some(disk_catalog) = maybe_disk_catalog else {
215// TODO(parkmycar, def-): Ideally this could be an error, but a lot of test suites fail. We
216 // should explicitly disable consistency check in these test suites.
217tracing::warn!("No Catalog state on disk, skipping consistency check");
218return Ok(());
219 };
220221if disk_catalog != memory_catalog {
222// The state objects here are around 100k lines pretty printed, so find the
223 // first lines that differs and show context around it.
224let diff = similar::TextDiff::from_lines(&memory_catalog, &disk_catalog)
225 .unified_diff()
226 .context_radius(50)
227 .to_string()
228 .lines()
229 .take(200)
230 .collect::<Vec<_>>()
231 .join("\n");
232233bail!("the in-memory state of the catalog does not match its on-disk state:\n{diff}");
234 }
235236Ok(())
237}
238239/// Checks if the provided `shard_id` is a tombstone, returning an error if it's not.
240async fn check_shard_tombstone(state: &State, shard_id: &str) -> Result<(), anyhow::Error> {
241println!("$ check-shard-tombstone {shard_id}");
242243let (Some(consensus_uri), Some(blob_uri)) =
244 (&state.persist_consensus_url, &state.persist_blob_url)
245else {
246// TODO(parkmycar): Testdrive on Cloud Test doesn't currently supply the Persist URLs.
247tracing::warn!("Persist consensus or blob URL not known");
248return Ok(());
249 };
250251let location = PersistLocation {
252 blob_uri: blob_uri.clone(),
253 consensus_uri: consensus_uri.clone(),
254 };
255let client = state
256 .persist_clients
257 .open(location)
258 .await
259.context("openning persist client")?;
260let shard_id = ShardId::from_str(shard_id).map_err(|s| anyhow!("invalid ShardId: {s}"))?;
261262// It might take the storage-controller a moment to drop it's handles, so do a couple retries.
263let (_client, result) = Retry::default()
264 .max_duration(state.timeout)
265 .retry_async_with_state(client, |retry_state, client| async move {
266let inspect_state = client
267 .inspect_shard::<mz_repr::Timestamp>(&shard_id)
268 .await
269.context("inspecting shard")
270 .and_then(|state| serde_json::to_value(state).context("to json"))
271 .and_then(|state| {
272 serde_json::from_value::<ShardState>(state).context("to shard state")
273 });
274275let result = match inspect_state {
276Ok(state) if state.is_tombstone() => RetryResult::Ok(()),
277Ok(state) => {
278if retry_state.i == 0 {
279print!("shard isn't tombstoned; sleeping to see if it gets cleaned up.");
280 }
281if let Some(backoff) = retry_state.next_backoff {
282if !backoff.is_zero() {
283print!(" {:.0?}", backoff);
284 }
285 }
286 std::io::stdout().flush().expect("flushing stdout");
287288 RetryResult::RetryableErr(anyhow!("non-tombstone state: {state:?}"))
289 }
290 Result::Err(e) => RetryResult::FatalErr(e),
291 };
292293 (client, result)
294 })
295 .await;
296297 result
298}
299300/// Parts of a shard's state that we read to determine if it's a tombstone.
301#[derive(Debug, Serialize, Deserialize)]
302struct ShardState {
303 leased_readers: BTreeMap<String, serde_json::Value>,
304 critical_readers: BTreeMap<String, serde_json::Value>,
305 writers: BTreeMap<String, serde_json::Value>,
306 since: Vec<mz_repr::Timestamp>,
307 upper: Vec<mz_repr::Timestamp>,
308}
309310impl ShardState {
311/// Returns if this shard is currently a tombstsone.
312fn is_tombstone(&self) -> bool {
313self.upper.is_empty()
314 && self.since.is_empty()
315 && self.writers.is_empty()
316 && self.leased_readers.is_empty()
317 && self.critical_readers.is_empty()
318 }
319}