Skip to main content

mz_testdrive/action/
consistency.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Write;
12use std::io::Write as _;
13use std::str::FromStr;
14use std::time::Duration;
15
16use crate::action;
17use crate::action::{ControlFlow, Run, State};
18use crate::parser::{BuiltinCommand, LineReader, parse};
19use anyhow::{Context, anyhow, bail};
20use mz_ore::retry::{Retry, RetryResult};
21use mz_persist_client::{PersistLocation, ShardId};
22use mz_postgres_util::{query_one, sql};
23use reqwest::StatusCode;
24use serde::{Deserialize, Serialize};
25
26/// Level of consistency checks we should enable on a testdrive run.
27#[derive(clap::ValueEnum, Default, Debug, Copy, Clone, PartialEq, Eq)]
28pub enum Level {
29    /// Run the consistency checks after the completion of a test file.
30    #[default]
31    File,
32    /// Run the consistency checks after each statement, good for debugging.
33    Statement,
34    /// Disable consistency checks entirely.
35    Disable,
36}
37
38impl FromStr for Level {
39    type Err = String;
40
41    fn from_str(s: &str) -> Result<Self, Self::Err> {
42        match s {
43            "file" => Ok(Level::File),
44            "statement" => Ok(Level::Statement),
45            "disable" => Ok(Level::Disable),
46            s => Err(format!("Unknown consistency check level: {s}")),
47        }
48    }
49}
50
51/// Skips consistency checks for the current file.
52pub fn skip_consistency_checks(
53    mut cmd: BuiltinCommand,
54    state: &mut State,
55) -> Result<ControlFlow, anyhow::Error> {
56    let reason = cmd
57        .args
58        .string("reason")
59        .context("must provide reason for skipping")?;
60    tracing::info!(reason, "Skipping consistency checks as requested.");
61
62    state.consistency_checks_adhoc_skip = true;
63    Ok(ControlFlow::Continue)
64}
65
66/// Runs consistency checks against multiple parts of Materialize to make sure we haven't violated
67/// our invariants or leaked resources.
68pub async fn run_consistency_checks(state: &State) -> Result<ControlFlow, anyhow::Error> {
69    // Return early if the user adhoc disabled consistency checks for the current file.
70    if state.consistency_checks_adhoc_skip {
71        return Ok(ControlFlow::Continue);
72    }
73
74    let coordinator = check_coordinator(state).await.context("coordinator");
75    let catalog_state = check_catalog_state(state).await.context("catalog state");
76    let statement_logging_state = if state.check_statement_logging {
77        check_statement_logging(state)
78            .await
79            .context("statement logging state")
80    } else {
81        Ok(())
82    };
83    // TODO(parkmycar): Fix subsources so they don't leak their shards and then add a leaked shards
84    // consistency check.
85
86    // Make sure to report all inconsistencies, not just the first.
87    let mut msg = String::new();
88    if let Err(e) = coordinator {
89        writeln!(&mut msg, "coordinator inconsistency: {e:?}")?;
90    }
91    if let Err(e) = catalog_state {
92        writeln!(&mut msg, "catalog inconsistency: {e:?}")?;
93    }
94    if let Err(e) = statement_logging_state {
95        writeln!(&mut msg, "statement logging inconsistency: {e:?}")?;
96    }
97
98    if msg.is_empty() {
99        Ok(ControlFlow::Continue)
100    } else {
101        Err(anyhow!("{msg}"))
102    }
103}
104
105/// Checks if a shard in Persist has been tombstoned.
106///
107/// TODO(parkmycar): Run this as part of the consistency checks, instead of as a specific command.
108pub async fn run_check_shard_tombstone(
109    mut cmd: BuiltinCommand,
110    state: &State,
111) -> Result<ControlFlow, anyhow::Error> {
112    let shard_id = cmd.args.string("shard-id")?;
113    check_shard_tombstone(state, &shard_id).await?;
114    Ok(ControlFlow::Continue)
115}
116
117/// Asks the Coordinator to run it's own internal consistency checks.
118async fn check_coordinator(state: &State) -> Result<(), anyhow::Error> {
119    // Make sure we can dump the Coordinator state.
120    let response = reqwest::get(&format!(
121        "http://{}/api/coordinator/dump",
122        state.materialize.internal_http_addr
123    ))
124    .await?;
125    // We allow NOT_FOUND to support upgrade tests where this endpoint doesn't yet exist.
126    if !response.status().is_success() && response.status() != StatusCode::NOT_FOUND {
127        let response: Result<serde_json::Value, _> = response.json().await;
128        bail!("Coordinator failed to dump state: {:?}", response);
129    }
130
131    // Run the consistency checks.
132    let response = Retry::default()
133        .max_duration(Duration::from_secs(2))
134        .retry_async(|_| async {
135            reqwest::get(&format!(
136                "http://{}/api/coordinator/check",
137                state.materialize.internal_http_addr,
138            ))
139            .await
140        })
141        .await
142        .context("querying coordinator")?;
143    if response.status() == StatusCode::NOT_FOUND {
144        bail!("Coordinator consistency check not available");
145    }
146
147    let inconsistencies: serde_json::Value =
148        response.json().await.context("deserialize response")?;
149
150    match inconsistencies {
151        serde_json::Value::String(x) if x.is_empty() => Ok(()),
152        other => Err(anyhow!("coordinator inconsistencies! {other:?}")),
153    }
154}
155
156/// Checks that the in-memory catalog matches what we have persisted on disk.
157async fn check_catalog_state(state: &State) -> Result<(), anyhow::Error> {
158    #[derive(Debug, Deserialize)]
159    struct StorageMetadata {
160        unfinalized_shards: Option<BTreeSet<String>>,
161    }
162
163    #[derive(Debug, Deserialize)]
164    struct CatalogDump {
165        system_parameter_defaults: Option<BTreeMap<String, String>>,
166        storage_metadata: Option<StorageMetadata>,
167    }
168
169    // Dump the in-memory catalog state of the Materialize environment that we're
170    // connected to.
171    let memory_catalog = reqwest::get(&format!(
172        "http://{}/api/catalog/dump",
173        state.materialize.internal_http_addr,
174    ))
175    .await
176    .context("GET catalog")?
177    .text()
178    .await
179    .context("deserialize catalog")?;
180
181    // Pull out the system parameter defaults from the in-memory catalog, as we
182    // need to load the disk catalog with the same defaults.
183    let dump: CatalogDump = serde_json::from_str(&memory_catalog).context("decoding catalog")?;
184
185    let Some(system_parameter_defaults) = dump.system_parameter_defaults else {
186        // TODO(parkmycar, def-): Ideally this could be an error, but a lot of test suites fail. We
187        // should explicitly disable consistency check in these test suites.
188        tracing::warn!(
189            "Missing system_parameter_defaults in memory catalog state, skipping consistency check"
190        );
191        return Ok(());
192    };
193
194    let unfinalized_shards = dump
195        .storage_metadata
196        .and_then(|storage_metadata| storage_metadata.unfinalized_shards);
197
198    // Load the on-disk catalog and dump its state.
199
200    // Make sure the version is parseable.
201    let _: semver::Version = state.build_info.version.parse().expect("invalid version");
202
203    let maybe_disk_catalog = state
204        .with_catalog_copy(
205            system_parameter_defaults,
206            state.build_info,
207            &state.materialize.bootstrap_args,
208            // The expression cache can be taxing on the CPU and is unnecessary for consistency checks.
209            Some(false),
210            |catalog| catalog.state().clone(),
211        )
212        .await
213        .map_err(|e| anyhow!("failed to read on-disk catalog state: {e}"))?
214        .map(|catalog| {
215            catalog
216                // The set of unfinalized shards in the catalog are updated asynchronously by
217                // background processes. As a result, the value may legitimately change after
218                // fetching the memory catalog but before fetching the disk catalog, causing the
219                // comparison to fail. This is a gross hack that always sets the disk catalog's
220                // unfinalized shards equal to the memory catalog's unfinalized shards to ignore
221                // false negatives. Unfortunately, we also end up ignoring true negatives.
222                .dump(unfinalized_shards)
223                .expect("state must be dumpable")
224        });
225    let Some(disk_catalog) = maybe_disk_catalog else {
226        // TODO(parkmycar, def-): Ideally this could be an error, but a lot of test suites fail. We
227        // should explicitly disable consistency check in these test suites.
228        tracing::warn!("No Catalog state on disk, skipping consistency check");
229        return Ok(());
230    };
231
232    if disk_catalog != memory_catalog {
233        // The state objects here are around 100k lines pretty printed, so find the
234        // first lines that differs and show context around it.
235        let diff = similar::TextDiff::from_lines(&memory_catalog, &disk_catalog)
236            .unified_diff()
237            .context_radius(50)
238            .to_string()
239            .lines()
240            .take(200)
241            .collect::<Vec<_>>()
242            .join("\n");
243
244        bail!("the in-memory state of the catalog does not match its on-disk state:\n{diff}");
245    }
246
247    Ok(())
248}
249
250/// This currently checks only whether the statement log reports all statements to be in a finished
251/// state. (We used to have an assertion for roughly this in `ExecuteContextExtra`'s Drop, but that
252/// had to be removed due to <https://github.com/MaterializeInc/database-issues/issues/7304>)
253///
254/// Note that this check should succeed regardless of the statement logging sampling rate.
255///
256/// Ideally, we could run this at any moment successfully, but currently system restarts can mess
257/// this up: there is a buffering of statement log writes, with the buffers flushed every 5 seconds.
258/// So, if a system kill/restart comes at a bad moment, then some statements might get permanently
259/// stuck in an unfinished state in the statement log. Therefore, we currently run this only after
260/// normal `.td`s, but not after cluster tests and whatnot that kill/restart the system.
261/// (Also, this can take several seconds due to the 5 sec buffering, so we run this only in Nightly
262/// by default.)
263async fn check_statement_logging(orig_state: &State) -> Result<(), anyhow::Error> {
264    use crate::util::postgres::postgres_client;
265
266    // Create new Testdrive state, so that we create a new session to Materialize, and we forget any
267    // weird Testdrive setting that the `.td` file before us might have set.
268    let (mut state, state_cleanup) = action::create_state(&orig_state.config).await?;
269
270    // First, query the current value of enable_rbac_checks so we can restore it later
271    let mz_system_url = format!(
272        "postgres://mz_system:materialize@{}",
273        state.materialize.internal_sql_addr
274    );
275
276    let (client, _handle) = postgres_client(&mz_system_url, state.default_timeout)
277        .await
278        .context("connecting as mz_system to query enable_rbac_checks")?;
279
280    let row = query_one(&client, sql!("SHOW enable_rbac_checks"), &[])
281        .await
282        .context("querying enable_rbac_checks")?;
283
284    let original_value: String = row.get(0);
285
286    // Create a testdrive script to check that all statements have finished executing.
287    // We disable RBAC checks so we can query mz_internal tables, similar to statement-logging.td.
288    // We restore the setting to its original value at the end.
289    let check_script = format!(
290        r#"
291$ postgres-execute connection=postgres://mz_system:materialize@{0}
292ALTER SYSTEM SET enable_rbac_checks = false
293
294> SELECT count(*)
295  FROM mz_internal.mz_recent_activity_log
296  WHERE
297    (finished_at IS NULL OR finished_status IS NULL)
298    AND sql NOT LIKE '%__FILTER-OUT-THIS-QUERY__%'
299    AND finished_status != 'aborted';
3000
301
302$ postgres-execute connection=postgres://mz_system:materialize@{0}
303ALTER SYSTEM SET enable_rbac_checks = {1}
304"#,
305        state.materialize.internal_sql_addr, original_value
306    );
307
308    let mut line_reader = LineReader::new(&check_script);
309    let cmds = parse(&mut line_reader).map_err(|e| anyhow!("{}", e.source))?;
310
311    for cmd in cmds {
312        cmd.run(&mut state)
313            .await
314            .map_err(|e| anyhow!("{}", e.source))?;
315    }
316
317    drop(state);
318    state_cleanup.await?;
319
320    Ok(())
321}
322
323/// Checks if the provided `shard_id` is a tombstone, returning an error if it's not.
324async fn check_shard_tombstone(state: &State, shard_id: &str) -> Result<(), anyhow::Error> {
325    println!("$ check-shard-tombstone {shard_id}");
326
327    let (Some(consensus_uri), Some(blob_uri)) =
328        (&state.persist_consensus_url, &state.persist_blob_url)
329    else {
330        // TODO(parkmycar): Testdrive on Cloud Test doesn't currently supply the Persist URLs.
331        tracing::warn!("Persist consensus or blob URL not known");
332        return Ok(());
333    };
334
335    let location = PersistLocation {
336        blob_uri: blob_uri.clone(),
337        consensus_uri: consensus_uri.clone(),
338    };
339    let client = state
340        .persist_clients
341        .open(location)
342        .await
343        .context("openning persist client")?;
344    let shard_id = ShardId::from_str(shard_id).map_err(|s| anyhow!("invalid ShardId: {s}"))?;
345
346    // It might take the storage-controller a moment to drop it's handles, so do a couple retries.
347    let (_client, result) = Retry::default()
348        .max_duration(state.timeout)
349        .retry_async_with_state(client, |retry_state, client| async move {
350            let inspect_state = client
351                .inspect_shard::<mz_repr::Timestamp>(&shard_id)
352                .await
353                .context("inspecting shard")
354                .and_then(|state| serde_json::to_value(state).context("to json"))
355                .and_then(|state| {
356                    serde_json::from_value::<ShardState>(state).context("to shard state")
357                });
358
359            let result = match inspect_state {
360                Ok(state) if state.is_tombstone() => RetryResult::Ok(()),
361                Ok(state) => {
362                    if retry_state.i == 0 {
363                        print!("shard isn't tombstoned; sleeping to see if it gets cleaned up.");
364                    }
365                    if let Some(backoff) = retry_state.next_backoff {
366                        if !backoff.is_zero() {
367                            print!(" {:.0?}", backoff);
368                        }
369                    }
370                    std::io::stdout().flush().expect("flushing stdout");
371
372                    RetryResult::RetryableErr(anyhow!("non-tombstone state: {state:?}"))
373                }
374                Result::Err(e) => RetryResult::FatalErr(e),
375            };
376
377            (client, result)
378        })
379        .await;
380
381    result
382}
383
384/// Parts of a shard's state that we read to determine if it's a tombstone.
385#[derive(Debug, Serialize, Deserialize)]
386struct ShardState {
387    leased_readers: BTreeMap<String, serde_json::Value>,
388    critical_readers: BTreeMap<String, serde_json::Value>,
389    writers: BTreeMap<String, serde_json::Value>,
390    since: Vec<mz_repr::Timestamp>,
391    upper: Vec<mz_repr::Timestamp>,
392}
393
394impl ShardState {
395    /// Returns if this shard is currently a tombstsone.
396    fn is_tombstone(&self) -> bool {
397        self.upper.is_empty()
398            && self.since.is_empty()
399            && self.writers.is_empty()
400            && self.leased_readers.is_empty()
401            && self.critical_readers.is_empty()
402    }
403}