mz_testdrive/action/
consistency.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Write;
12use std::io::Write as _;
13use std::str::FromStr;
14use std::time::Duration;
15
16use crate::action;
17use crate::action::{ControlFlow, Run, State};
18use crate::parser::{BuiltinCommand, LineReader, parse};
19use anyhow::{Context, anyhow, bail};
20use mz_ore::retry::{Retry, RetryResult};
21use mz_persist_client::{PersistLocation, ShardId};
22use reqwest::StatusCode;
23use serde::{Deserialize, Serialize};
24
25/// Level of consistency checks we should enable on a testdrive run.
26#[derive(clap::ValueEnum, Default, Debug, Copy, Clone, PartialEq, Eq)]
27pub enum Level {
28    /// Run the consistency checks after the completion of a test file.
29    #[default]
30    File,
31    /// Run the consistency checks after each statement, good for debugging.
32    Statement,
33    /// Disable consistency checks entirely.
34    Disable,
35}
36
37impl FromStr for Level {
38    type Err = String;
39
40    fn from_str(s: &str) -> Result<Self, Self::Err> {
41        match s {
42            "file" => Ok(Level::File),
43            "statement" => Ok(Level::Statement),
44            "disable" => Ok(Level::Disable),
45            s => Err(format!("Unknown consistency check level: {s}")),
46        }
47    }
48}
49
50/// Skips consistency checks for the current file.
51pub fn skip_consistency_checks(
52    mut cmd: BuiltinCommand,
53    state: &mut State,
54) -> Result<ControlFlow, anyhow::Error> {
55    let reason = cmd
56        .args
57        .string("reason")
58        .context("must provide reason for skipping")?;
59    tracing::info!(reason, "Skipping consistency checks as requested.");
60
61    state.consistency_checks_adhoc_skip = true;
62    Ok(ControlFlow::Continue)
63}
64
65/// Runs consistency checks against multiple parts of Materialize to make sure we haven't violated
66/// our invariants or leaked resources.
67pub async fn run_consistency_checks(state: &State) -> Result<ControlFlow, anyhow::Error> {
68    // Return early if the user adhoc disabled consistency checks for the current file.
69    if state.consistency_checks_adhoc_skip {
70        return Ok(ControlFlow::Continue);
71    }
72
73    let coordinator = check_coordinator(state).await.context("coordinator");
74    let catalog_state = check_catalog_state(state).await.context("catalog state");
75    let statement_logging_state = if state.check_statement_logging {
76        check_statement_logging(state)
77            .await
78            .context("statement logging state")
79    } else {
80        Ok(())
81    };
82    // TODO(parkmycar): Fix subsources so they don't leak their shards and then add a leaked shards
83    // consistency check.
84
85    // Make sure to report all inconsistencies, not just the first.
86    let mut msg = String::new();
87    if let Err(e) = coordinator {
88        writeln!(&mut msg, "coordinator inconsistency: {e:?}")?;
89    }
90    if let Err(e) = catalog_state {
91        writeln!(&mut msg, "catalog inconsistency: {e:?}")?;
92    }
93    if let Err(e) = statement_logging_state {
94        writeln!(&mut msg, "statement logging inconsistency: {e:?}")?;
95    }
96
97    if msg.is_empty() {
98        Ok(ControlFlow::Continue)
99    } else {
100        Err(anyhow!("{msg}"))
101    }
102}
103
104/// Checks if a shard in Persist has been tombstoned.
105///
106/// TODO(parkmycar): Run this as part of the consistency checks, instead of as a specific command.
107pub async fn run_check_shard_tombstone(
108    mut cmd: BuiltinCommand,
109    state: &State,
110) -> Result<ControlFlow, anyhow::Error> {
111    let shard_id = cmd.args.string("shard-id")?;
112    check_shard_tombstone(state, &shard_id).await?;
113    Ok(ControlFlow::Continue)
114}
115
116/// Asks the Coordinator to run it's own internal consistency checks.
117async fn check_coordinator(state: &State) -> Result<(), anyhow::Error> {
118    // Make sure we can dump the Coordinator state.
119    let response = reqwest::get(&format!(
120        "http://{}/api/coordinator/dump",
121        state.materialize.internal_http_addr
122    ))
123    .await?;
124    // We allow NOT_FOUND to support upgrade tests where this endpoint doesn't yet exist.
125    if !response.status().is_success() && response.status() != StatusCode::NOT_FOUND {
126        let response: Result<serde_json::Value, _> = response.json().await;
127        bail!("Coordinator failed to dump state: {:?}", response);
128    }
129
130    // Run the consistency checks.
131    let response = Retry::default()
132        .max_duration(Duration::from_secs(2))
133        .retry_async(|_| async {
134            reqwest::get(&format!(
135                "http://{}/api/coordinator/check",
136                state.materialize.internal_http_addr,
137            ))
138            .await
139        })
140        .await
141        .context("querying coordinator")?;
142    if response.status() == StatusCode::NOT_FOUND {
143        bail!("Coordinator consistency check not available");
144    }
145
146    let inconsistencies: serde_json::Value =
147        response.json().await.context("deserialize response")?;
148
149    match inconsistencies {
150        serde_json::Value::String(x) if x.is_empty() => Ok(()),
151        other => Err(anyhow!("coordinator inconsistencies! {other:?}")),
152    }
153}
154
155/// Checks that the in-memory catalog matches what we have persisted on disk.
156async fn check_catalog_state(state: &State) -> Result<(), anyhow::Error> {
157    #[derive(Debug, Deserialize)]
158    struct StorageMetadata {
159        unfinalized_shards: Option<BTreeSet<String>>,
160    }
161
162    #[derive(Debug, Deserialize)]
163    struct CatalogDump {
164        system_parameter_defaults: Option<BTreeMap<String, String>>,
165        storage_metadata: Option<StorageMetadata>,
166    }
167
168    // Dump the in-memory catalog state of the Materialize environment that we're
169    // connected to.
170    let memory_catalog = reqwest::get(&format!(
171        "http://{}/api/catalog/dump",
172        state.materialize.internal_http_addr,
173    ))
174    .await
175    .context("GET catalog")?
176    .text()
177    .await
178    .context("deserialize catalog")?;
179
180    // Pull out the system parameter defaults from the in-memory catalog, as we
181    // need to load the disk catalog with the same defaults.
182    let dump: CatalogDump = serde_json::from_str(&memory_catalog).context("decoding catalog")?;
183
184    let Some(system_parameter_defaults) = dump.system_parameter_defaults else {
185        // TODO(parkmycar, def-): Ideally this could be an error, but a lot of test suites fail. We
186        // should explicitly disable consistency check in these test suites.
187        tracing::warn!(
188            "Missing system_parameter_defaults in memory catalog state, skipping consistency check"
189        );
190        return Ok(());
191    };
192
193    let unfinalized_shards = dump
194        .storage_metadata
195        .and_then(|storage_metadata| storage_metadata.unfinalized_shards);
196
197    // Load the on-disk catalog and dump its state.
198
199    // Make sure the version is parseable.
200    let _: semver::Version = state.build_info.version.parse().expect("invalid version");
201
202    let maybe_disk_catalog = state
203        .with_catalog_copy(
204            system_parameter_defaults,
205            state.build_info,
206            &state.materialize.bootstrap_args,
207            // The expression cache can be taxing on the CPU and is unnecessary for consistency checks.
208            Some(false),
209            |catalog| catalog.state().clone(),
210        )
211        .await
212        .map_err(|e| anyhow!("failed to read on-disk catalog state: {e}"))?
213        .map(|catalog| {
214            catalog
215                // The set of unfinalized shards in the catalog are updated asynchronously by
216                // background processes. As a result, the value may legitimately change after
217                // fetching the memory catalog but before fetching the disk catalog, causing the
218                // comparison to fail. This is a gross hack that always sets the disk catalog's
219                // unfinalized shards equal to the memory catalog's unfinalized shards to ignore
220                // false negatives. Unfortunately, we also end up ignoring true negatives.
221                .dump(unfinalized_shards)
222                .expect("state must be dumpable")
223        });
224    let Some(disk_catalog) = maybe_disk_catalog else {
225        // TODO(parkmycar, def-): Ideally this could be an error, but a lot of test suites fail. We
226        // should explicitly disable consistency check in these test suites.
227        tracing::warn!("No Catalog state on disk, skipping consistency check");
228        return Ok(());
229    };
230
231    if disk_catalog != memory_catalog {
232        // The state objects here are around 100k lines pretty printed, so find the
233        // first lines that differs and show context around it.
234        let diff = similar::TextDiff::from_lines(&memory_catalog, &disk_catalog)
235            .unified_diff()
236            .context_radius(50)
237            .to_string()
238            .lines()
239            .take(200)
240            .collect::<Vec<_>>()
241            .join("\n");
242
243        bail!("the in-memory state of the catalog does not match its on-disk state:\n{diff}");
244    }
245
246    Ok(())
247}
248
249/// This currently checks only whether the statement log reports all statements to be in a finished
250/// state. (We used to have an assertion for roughly this in `ExecuteContextExtra`'s Drop, but that
251/// had to be removed due to <https://github.com/MaterializeInc/database-issues/issues/7304>)
252///
253/// Note that this check should succeed regardless of the statement logging sampling rate.
254///
255/// Ideally, we could run this at any moment successfully, but currently system restarts can mess
256/// this up: there is a buffering of statement log writes, with the buffers flushed every 5 seconds.
257/// So, if a system kill/restart comes at a bad moment, then some statements might get permanently
258/// stuck in an unfinished state in the statement log. Therefore, we currently run this only after
259/// normal `.td`s, but not after cluster tests and whatnot that kill/restart the system.
260/// (Also, this can take several seconds due to the 5 sec buffering, so we run this only in Nightly
261/// by default.)
262async fn check_statement_logging(orig_state: &State) -> Result<(), anyhow::Error> {
263    use crate::util::postgres::postgres_client;
264
265    // Create new Testdrive state, so that we create a new session to Materialize, and we forget any
266    // weird Testdrive setting that the `.td` file before us might have set.
267    let (mut state, state_cleanup) = action::create_state(&orig_state.config).await?;
268
269    // First, query the current value of enable_rbac_checks so we can restore it later
270    let mz_system_url = format!(
271        "postgres://mz_system:materialize@{}",
272        state.materialize.internal_sql_addr
273    );
274
275    let (client, _handle) = postgres_client(&mz_system_url, state.default_timeout)
276        .await
277        .context("connecting as mz_system to query enable_rbac_checks")?;
278
279    let row = client
280        .query_one("SHOW enable_rbac_checks", &[])
281        .await
282        .context("querying enable_rbac_checks")?;
283
284    let original_value: String = row.get(0);
285
286    // Create a testdrive script to check that all statements have finished executing.
287    // We disable RBAC checks so we can query mz_internal tables, similar to statement-logging.td.
288    // We restore the setting to its original value at the end.
289    let check_script = format!(
290        r#"
291$ postgres-execute connection=postgres://mz_system:materialize@{0}
292ALTER SYSTEM SET enable_rbac_checks = false
293
294> SELECT count(*)
295  FROM mz_internal.mz_recent_activity_log
296  WHERE
297    (finished_at IS NULL OR finished_status IS NULL)
298    AND sql NOT LIKE '%__FILTER-OUT-THIS-QUERY__%'
299    AND finished_status != 'aborted';
3000
301
302$ postgres-execute connection=postgres://mz_system:materialize@{0}
303ALTER SYSTEM SET enable_rbac_checks = {1}
304"#,
305        state.materialize.internal_sql_addr, original_value
306    );
307
308    let mut line_reader = LineReader::new(&check_script);
309    let cmds = parse(&mut line_reader).map_err(|e| anyhow!("{}", e.source))?;
310
311    for cmd in cmds {
312        cmd.run(&mut state)
313            .await
314            .map_err(|e| anyhow!("{}", e.source))?;
315    }
316
317    drop(state);
318    state_cleanup.await?;
319
320    Ok(())
321}
322
323/// Checks if the provided `shard_id` is a tombstone, returning an error if it's not.
324async fn check_shard_tombstone(state: &State, shard_id: &str) -> Result<(), anyhow::Error> {
325    println!("$ check-shard-tombstone {shard_id}");
326
327    let (Some(consensus_uri), Some(blob_uri)) =
328        (&state.persist_consensus_url, &state.persist_blob_url)
329    else {
330        // TODO(parkmycar): Testdrive on Cloud Test doesn't currently supply the Persist URLs.
331        tracing::warn!("Persist consensus or blob URL not known");
332        return Ok(());
333    };
334
335    let location = PersistLocation {
336        blob_uri: blob_uri.clone(),
337        consensus_uri: consensus_uri.clone(),
338    };
339    let client = state
340        .persist_clients
341        .open(location)
342        .await
343        .context("openning persist client")?;
344    let shard_id = ShardId::from_str(shard_id).map_err(|s| anyhow!("invalid ShardId: {s}"))?;
345
346    // It might take the storage-controller a moment to drop it's handles, so do a couple retries.
347    let (_client, result) = Retry::default()
348        .max_duration(state.timeout)
349        .retry_async_with_state(client, |retry_state, client| async move {
350            let inspect_state = client
351                .inspect_shard::<mz_repr::Timestamp>(&shard_id)
352                .await
353                .context("inspecting shard")
354                .and_then(|state| serde_json::to_value(state).context("to json"))
355                .and_then(|state| {
356                    serde_json::from_value::<ShardState>(state).context("to shard state")
357                });
358
359            let result = match inspect_state {
360                Ok(state) if state.is_tombstone() => RetryResult::Ok(()),
361                Ok(state) => {
362                    if retry_state.i == 0 {
363                        print!("shard isn't tombstoned; sleeping to see if it gets cleaned up.");
364                    }
365                    if let Some(backoff) = retry_state.next_backoff {
366                        if !backoff.is_zero() {
367                            print!(" {:.0?}", backoff);
368                        }
369                    }
370                    std::io::stdout().flush().expect("flushing stdout");
371
372                    RetryResult::RetryableErr(anyhow!("non-tombstone state: {state:?}"))
373                }
374                Result::Err(e) => RetryResult::FatalErr(e),
375            };
376
377            (client, result)
378        })
379        .await;
380
381    result
382}
383
384/// Parts of a shard's state that we read to determine if it's a tombstone.
385#[derive(Debug, Serialize, Deserialize)]
386struct ShardState {
387    leased_readers: BTreeMap<String, serde_json::Value>,
388    critical_readers: BTreeMap<String, serde_json::Value>,
389    writers: BTreeMap<String, serde_json::Value>,
390    since: Vec<mz_repr::Timestamp>,
391    upper: Vec<mz_repr::Timestamp>,
392}
393
394impl ShardState {
395    /// Returns if this shard is currently a tombstsone.
396    fn is_tombstone(&self) -> bool {
397        self.upper.is_empty()
398            && self.since.is_empty()
399            && self.writers.is_empty()
400            && self.leased_readers.is_empty()
401            && self.critical_readers.is_empty()
402    }
403}