mz_testdrive/action/
consistency.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Write;
12use std::io::Write as _;
13use std::str::FromStr;
14use std::time::Duration;
15
16use anyhow::{Context, anyhow, bail};
17use mz_ore::retry::{Retry, RetryResult};
18use mz_persist_client::{PersistLocation, ShardId};
19use reqwest::StatusCode;
20use serde::{Deserialize, Serialize};
21
22use crate::action::{ControlFlow, State};
23use crate::parser::BuiltinCommand;
24
25/// Level of consistency checks we should enable on a testdrive run.
26#[derive(clap::ValueEnum, Default, Debug, Copy, Clone, PartialEq, Eq)]
27pub enum Level {
28    /// Run the consistency checks after the completion of a test file.
29    #[default]
30    File,
31    /// Run the consistency checks after each statement, good for debugging.
32    Statement,
33    /// Disable consistency checks entirely.
34    Disable,
35}
36
37impl FromStr for Level {
38    type Err = String;
39
40    fn from_str(s: &str) -> Result<Self, Self::Err> {
41        match s {
42            "file" => Ok(Level::File),
43            "statement" => Ok(Level::Statement),
44            "disable" => Ok(Level::Disable),
45            s => Err(format!("Unknown consistency check level: {s}")),
46        }
47    }
48}
49
50/// Skips consistency checks for the current file.
51pub fn skip_consistency_checks(
52    mut cmd: BuiltinCommand,
53    state: &mut State,
54) -> Result<ControlFlow, anyhow::Error> {
55    let reason = cmd
56        .args
57        .string("reason")
58        .context("must provide reason for skipping")?;
59    tracing::info!(reason, "Skipping consistency checks as requested.");
60
61    state.consistency_checks_adhoc_skip = true;
62    Ok(ControlFlow::Continue)
63}
64
65/// Runs consistency checks against multiple parts of Materialize to make sure we haven't violated
66/// our invariants or leaked resources.
67pub async fn run_consistency_checks(state: &State) -> Result<ControlFlow, anyhow::Error> {
68    // Return early if the user adhoc disabled consistency checks for the current file.
69    if state.consistency_checks_adhoc_skip {
70        return Ok(ControlFlow::Continue);
71    }
72
73    let coordinator = check_coordinator(state).await.context("coordinator");
74    let catalog_state = check_catalog_state(state).await.context("catalog state");
75    // TODO(parkmycar): Fix subsources so they don't leak their shards and then add a leaked shards
76    // consistency check.
77
78    // Make sure to report all inconsistencies, not just the first.
79    let mut msg = String::new();
80    if let Err(e) = coordinator {
81        writeln!(&mut msg, "coordinator inconsistency: {e:?}")?;
82    }
83    if let Err(e) = catalog_state {
84        writeln!(&mut msg, "catalog inconsistency: {e:?}")?;
85    }
86
87    if msg.is_empty() {
88        Ok(ControlFlow::Continue)
89    } else {
90        Err(anyhow!("{msg}"))
91    }
92}
93
94/// Checks if a shard in Persist has been tombstoned.
95///
96/// TODO(parkmycar): Run this as part of the consistency checks, instead of as a specific command.
97pub async fn run_check_shard_tombstone(
98    mut cmd: BuiltinCommand,
99    state: &State,
100) -> Result<ControlFlow, anyhow::Error> {
101    let shard_id = cmd.args.string("shard-id")?;
102    check_shard_tombstone(state, &shard_id).await?;
103    Ok(ControlFlow::Continue)
104}
105
106/// Asks the Coordinator to run it's own internal consistency checks.
107async fn check_coordinator(state: &State) -> Result<(), anyhow::Error> {
108    // Make sure we can dump the Coordinator state.
109    let response = reqwest::get(&format!(
110        "http://{}/api/coordinator/dump",
111        state.materialize.internal_http_addr
112    ))
113    .await?;
114    // We allow NOT_FOUND to support upgrade tests where this endpoint doesn't yet exist.
115    if !response.status().is_success() && response.status() != StatusCode::NOT_FOUND {
116        let response: Result<serde_json::Value, _> = response.json().await;
117        bail!("Coordinator failed to dump state: {:?}", response);
118    }
119
120    // Run the consistency checks.
121    let response = Retry::default()
122        .max_duration(Duration::from_secs(2))
123        .retry_async(|_| async {
124            reqwest::get(&format!(
125                "http://{}/api/coordinator/check",
126                state.materialize.internal_http_addr,
127            ))
128            .await
129        })
130        .await
131        .context("querying coordinator")?;
132    if response.status() == StatusCode::NOT_FOUND {
133        bail!("Coordinator consistency check not available");
134    }
135
136    let inconsistencies: serde_json::Value =
137        response.json().await.context("deserialize response")?;
138
139    match inconsistencies {
140        serde_json::Value::String(x) if x.is_empty() => Ok(()),
141        other => Err(anyhow!("coordinator inconsistencies! {other:?}")),
142    }
143}
144
145/// Checks that the in-memory catalog matches what we have persisted on disk.
146async fn check_catalog_state(state: &State) -> Result<(), anyhow::Error> {
147    #[derive(Debug, Deserialize)]
148    struct StorageMetadata {
149        unfinalized_shards: Option<BTreeSet<String>>,
150    }
151
152    #[derive(Debug, Deserialize)]
153    struct CatalogDump {
154        system_parameter_defaults: Option<BTreeMap<String, String>>,
155        storage_metadata: Option<StorageMetadata>,
156    }
157
158    // Dump the in-memory catalog state of the Materialize environment that we're
159    // connected to.
160    let memory_catalog = reqwest::get(&format!(
161        "http://{}/api/catalog/dump",
162        state.materialize.internal_http_addr,
163    ))
164    .await
165    .context("GET catalog")?
166    .text()
167    .await
168    .context("deserialize catalog")?;
169
170    // Pull out the system parameter defaults from the in-memory catalog, as we
171    // need to load the disk catalog with the same defaults.
172    let dump: CatalogDump = serde_json::from_str(&memory_catalog).context("decoding catalog")?;
173
174    let Some(system_parameter_defaults) = dump.system_parameter_defaults else {
175        // TODO(parkmycar, def-): Ideally this could be an error, but a lot of test suites fail. We
176        // should explicitly disable consistency check in these test suites.
177        tracing::warn!(
178            "Missing system_parameter_defaults in memory catalog state, skipping consistency check"
179        );
180        return Ok(());
181    };
182
183    let unfinalized_shards = dump
184        .storage_metadata
185        .and_then(|storage_metadata| storage_metadata.unfinalized_shards);
186
187    // Load the on-disk catalog and dump its state.
188
189    // Make sure the version is parseable.
190    let _: semver::Version = state.build_info.version.parse().expect("invalid version");
191
192    let maybe_disk_catalog = state
193        .with_catalog_copy(
194            system_parameter_defaults,
195            state.build_info,
196            &state.materialize.bootstrap_args,
197            // The expression cache can be taxing on the CPU and is unnecessary for consistency checks.
198            Some(false),
199            |catalog| catalog.state().clone(),
200        )
201        .await
202        .map_err(|e| anyhow!("failed to read on-disk catalog state: {e}"))?
203        .map(|catalog| {
204            catalog
205                // The set of unfinalized shards in the catalog are updated asynchronously by
206                // background processes. As a result, the value may legitimately change after
207                // fetching the memory catalog but before fetching the disk catalog, causing the
208                // comparison to fail. This is a gross hack that always sets the disk catalog's
209                // unfinalized shards equal to the memory catalog's unfinalized shards to ignore
210                // false negatives. Unfortunately, we also end up ignoring true negatives.
211                .dump(unfinalized_shards)
212                .expect("state must be dumpable")
213        });
214    let Some(disk_catalog) = maybe_disk_catalog else {
215        // TODO(parkmycar, def-): Ideally this could be an error, but a lot of test suites fail. We
216        // should explicitly disable consistency check in these test suites.
217        tracing::warn!("No Catalog state on disk, skipping consistency check");
218        return Ok(());
219    };
220
221    if disk_catalog != memory_catalog {
222        // The state objects here are around 100k lines pretty printed, so find the
223        // first lines that differs and show context around it.
224        let diff = similar::TextDiff::from_lines(&memory_catalog, &disk_catalog)
225            .unified_diff()
226            .context_radius(50)
227            .to_string()
228            .lines()
229            .take(200)
230            .collect::<Vec<_>>()
231            .join("\n");
232
233        bail!("the in-memory state of the catalog does not match its on-disk state:\n{diff}");
234    }
235
236    Ok(())
237}
238
239/// Checks if the provided `shard_id` is a tombstone, returning an error if it's not.
240async fn check_shard_tombstone(state: &State, shard_id: &str) -> Result<(), anyhow::Error> {
241    println!("$ check-shard-tombstone {shard_id}");
242
243    let (Some(consensus_uri), Some(blob_uri)) =
244        (&state.persist_consensus_url, &state.persist_blob_url)
245    else {
246        // TODO(parkmycar): Testdrive on Cloud Test doesn't currently supply the Persist URLs.
247        tracing::warn!("Persist consensus or blob URL not known");
248        return Ok(());
249    };
250
251    let location = PersistLocation {
252        blob_uri: blob_uri.clone(),
253        consensus_uri: consensus_uri.clone(),
254    };
255    let client = state
256        .persist_clients
257        .open(location)
258        .await
259        .context("openning persist client")?;
260    let shard_id = ShardId::from_str(shard_id).map_err(|s| anyhow!("invalid ShardId: {s}"))?;
261
262    // It might take the storage-controller a moment to drop it's handles, so do a couple retries.
263    let (_client, result) = Retry::default()
264        .max_duration(state.timeout)
265        .retry_async_with_state(client, |retry_state, client| async move {
266            let inspect_state = client
267                .inspect_shard::<mz_repr::Timestamp>(&shard_id)
268                .await
269                .context("inspecting shard")
270                .and_then(|state| serde_json::to_value(state).context("to json"))
271                .and_then(|state| {
272                    serde_json::from_value::<ShardState>(state).context("to shard state")
273                });
274
275            let result = match inspect_state {
276                Ok(state) if state.is_tombstone() => RetryResult::Ok(()),
277                Ok(state) => {
278                    if retry_state.i == 0 {
279                        print!("shard isn't tombstoned; sleeping to see if it gets cleaned up.");
280                    }
281                    if let Some(backoff) = retry_state.next_backoff {
282                        if !backoff.is_zero() {
283                            print!(" {:.0?}", backoff);
284                        }
285                    }
286                    std::io::stdout().flush().expect("flushing stdout");
287
288                    RetryResult::RetryableErr(anyhow!("non-tombstone state: {state:?}"))
289                }
290                Result::Err(e) => RetryResult::FatalErr(e),
291            };
292
293            (client, result)
294        })
295        .await;
296
297    result
298}
299
300/// Parts of a shard's state that we read to determine if it's a tombstone.
301#[derive(Debug, Serialize, Deserialize)]
302struct ShardState {
303    leased_readers: BTreeMap<String, serde_json::Value>,
304    critical_readers: BTreeMap<String, serde_json::Value>,
305    writers: BTreeMap<String, serde_json::Value>,
306    since: Vec<mz_repr::Timestamp>,
307    upper: Vec<mz_repr::Timestamp>,
308}
309
310impl ShardState {
311    /// Returns if this shard is currently a tombstsone.
312    fn is_tombstone(&self) -> bool {
313        self.upper.is_empty()
314            && self.since.is_empty()
315            && self.writers.is_empty()
316            && self.leased_readers.is_empty()
317            && self.critical_readers.is_empty()
318    }
319}