1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Write;
use std::io::Write as _;
use std::str::FromStr;
use std::time::Duration;

use anyhow::{anyhow, bail, Context};
use mz_ore::retry::{Retry, RetryResult};
use mz_persist_client::{PersistLocation, ShardId};
use reqwest::StatusCode;
use serde::{Deserialize, Serialize};

use crate::action::{ControlFlow, State};
use crate::parser::BuiltinCommand;

/// Level of consistency checks we should enable on a testdrive run.
#[derive(clap::ValueEnum, Default, Debug, Copy, Clone, PartialEq, Eq)]
pub enum Level {
    /// Run the consistency checks after the completion of a test file.
    #[default]
    File,
    /// Run the consistency checks after each statement, good for debugging.
    Statement,
    /// Disable consistency checks entirely.
    Disable,
}

impl FromStr for Level {
    type Err = String;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        match s {
            "file" => Ok(Level::File),
            "statement" => Ok(Level::Statement),
            "disable" => Ok(Level::Disable),
            s => Err(format!("Unknown consistency check level: {s}")),
        }
    }
}

/// Skips consistency checks for the current file.
pub fn skip_consistency_checks(
    mut cmd: BuiltinCommand,
    state: &mut State,
) -> Result<ControlFlow, anyhow::Error> {
    let reason = cmd
        .args
        .string("reason")
        .context("must provide reason for skipping")?;
    tracing::info!(reason, "Skipping consistency checks as requested.");

    state.consistency_checks_adhoc_skip = true;
    Ok(ControlFlow::Continue)
}

/// Runs consistency checks against multiple parts of Materialize to make sure we haven't violated
/// our invariants or leaked resources.
pub async fn run_consistency_checks(state: &State) -> Result<ControlFlow, anyhow::Error> {
    // Return early if the user adhoc disabled consistency checks for the current file.
    if state.consistency_checks_adhoc_skip {
        return Ok(ControlFlow::Continue);
    }

    let coordinator = check_coordinator(state).await.context("coordinator");
    let catalog_state = check_catalog_state(state).await.context("catalog state");
    // TODO(parkmycar): Fix subsources so they don't leak their shards and then add a leaked shards
    // consistency check.

    // Make sure to report all inconsistencies, not just the first.
    let mut msg = String::new();
    if let Err(e) = coordinator {
        writeln!(&mut msg, "coordinator inconsistency: {e:?}")?;
    }
    if let Err(e) = catalog_state {
        writeln!(&mut msg, "catalog inconsistency: {e:?}")?;
    }

    if msg.is_empty() {
        Ok(ControlFlow::Continue)
    } else {
        Err(anyhow!("{msg}"))
    }
}

/// Checks if a shard in Persist has been tombstoned.
///
/// TODO(parkmycar): Run this as part of the consistency checks, instead of as a specific command.
pub async fn run_check_shard_tombstoned(
    mut cmd: BuiltinCommand,
    state: &State,
) -> Result<ControlFlow, anyhow::Error> {
    let shard_id = cmd.args.string("shard-id")?;
    check_shard_tombstoned(state, &shard_id).await?;
    Ok(ControlFlow::Continue)
}

/// Asks the Coordinator to run it's own internal consistency checks.
async fn check_coordinator(state: &State) -> Result<(), anyhow::Error> {
    // Make sure we can dump the Coordinator state.
    let response = reqwest::get(&format!(
        "http://{}/api/coordinator/dump",
        state.materialize.internal_http_addr
    ))
    .await?;
    // We allow NOT_FOUND to support upgrade tests where this endpoint doesn't yet exist.
    if !response.status().is_success() && response.status() != StatusCode::NOT_FOUND {
        let response: Result<serde_json::Value, _> = response.json().await;
        bail!("Coordinator failed to dump state: {:?}", response);
    }

    // Run the consistency checks.
    let response = Retry::default()
        .max_duration(Duration::from_secs(2))
        .retry_async(|_| async {
            reqwest::get(&format!(
                "http://{}/api/coordinator/check",
                state.materialize.internal_http_addr,
            ))
            .await
        })
        .await
        .context("querying coordinator")?;
    if response.status() == StatusCode::NOT_FOUND {
        bail!("Coordinator consistency check not available");
    }

    let inconsistencies: serde_json::Value =
        response.json().await.context("deserialize response")?;

    match inconsistencies {
        serde_json::Value::String(x) if x.is_empty() => Ok(()),
        other => Err(anyhow!("coordinator inconsistencies! {other:?}")),
    }
}

/// Checks that the in-memory catalog matches what we have persisted on disk.
async fn check_catalog_state(state: &State) -> Result<(), anyhow::Error> {
    #[derive(Debug, Deserialize)]
    struct StorageMetadata {
        unfinalized_shards: Option<BTreeSet<String>>,
    }

    #[derive(Debug, Deserialize)]
    struct CatalogDump {
        system_parameter_defaults: Option<BTreeMap<String, String>>,
        storage_metadata: Option<StorageMetadata>,
    }

    // Dump the in-memory catalog state of the Materialize environment that we're
    // connected to.
    let memory_catalog = reqwest::get(&format!(
        "http://{}/api/catalog/dump",
        state.materialize.internal_http_addr,
    ))
    .await
    .context("GET catalog")?
    .text()
    .await
    .context("deserialize catalog")?;

    // Pull out the system parameter defaults from the in-memory catalog, as we
    // need to load the disk catalog with the same defaults.
    let dump: CatalogDump = serde_json::from_str(&memory_catalog).context("decoding catalog")?;

    let Some(system_parameter_defaults) = dump.system_parameter_defaults else {
        // TODO(parkmycar, def-): Ideally this could be an error, but a lot of test suites fail. We
        // should explicitly disable consistency check in these test suites.
        tracing::warn!(
            "Missing system_parameter_defaults in memory catalog state, skipping consistency check"
        );
        return Ok(());
    };

    let unfinalized_shards = dump
        .storage_metadata
        .and_then(|storage_metadata| storage_metadata.unfinalized_shards);

    // Load the on-disk catalog and dump its state.
    let version: semver::Version = state.build_info.version.parse().expect("invalid version");
    let maybe_disk_catalog = state
        .with_catalog_copy(system_parameter_defaults, version, |catalog| {
            catalog.state().clone()
        })
        .await
        .map_err(|e| anyhow!("failed to read on-disk catalog state: {e}"))?
        .map(|catalog| {
            catalog
                // The set of unfinalized shards in the catalog are updated asynchronously by
                // background processes. As a result, the value may legitimately change after
                // fetching the memory catalog but before fetching the disk catalog, causing the
                // comparison to fail. This is a gross hack that always sets the disk catalog's
                // unfinalized shards equal to the memory catalog's unfinalized shards to ignore
                // false negatives. Unfortunately, we also end up ignoring true negatives.
                .dump(unfinalized_shards)
                .expect("state must be dumpable")
        });
    let Some(disk_catalog) = maybe_disk_catalog else {
        // TODO(parkmycar, def-): Ideally this could be an error, but a lot of test suites fail. We
        // should explicitly disable consistency check in these test suites.
        tracing::warn!("No Catalog state on disk, skipping consistency check");
        return Ok(());
    };

    if disk_catalog != memory_catalog {
        // The state objects here are around 100k lines pretty printed, so find the
        // first lines that differs and show context around it.
        let diff = similar::TextDiff::from_lines(&memory_catalog, &disk_catalog)
            .unified_diff()
            .context_radius(50)
            .to_string()
            .lines()
            .take(200)
            .collect::<Vec<_>>()
            .join("\n");

        bail!("the in-memory state of the catalog does not match its on-disk state:\n{diff}");
    }

    Ok(())
}

/// Checks if the provided `shard_id` is a tombstone, returning an error if it's not.
async fn check_shard_tombstoned(state: &State, shard_id: &str) -> Result<(), anyhow::Error> {
    println!("$ check-shard-tombstoned {shard_id}");

    let (Some(consensus_uri), Some(blob_uri)) =
        (&state.persist_consensus_url, &state.persist_blob_url)
    else {
        // TODO(parkmycar): Testdrive on Cloud Test doesn't currently supply the Persist URLs.
        tracing::warn!("Persist consensus or blob URL not known");
        return Ok(());
    };

    let location = PersistLocation {
        blob_uri: blob_uri.clone(),
        consensus_uri: consensus_uri.clone(),
    };
    let client = state
        .persist_clients
        .open(location)
        .await
        .context("openning persist client")?;
    let shard_id = ShardId::from_str(shard_id).map_err(|s| anyhow!("invalid ShardId: {s}"))?;

    // It might take the storage-controller a moment to drop it's handles, so do a couple retries.
    let (_client, result) = Retry::default()
        .max_duration(state.timeout)
        .retry_async_with_state(client, |retry_state, client| async move {
            let inspect_state = client
                .inspect_shard::<mz_repr::Timestamp>(&shard_id)
                .await
                .context("inspecting shard")
                .and_then(|state| serde_json::to_value(state).context("to json"))
                .and_then(|state| {
                    serde_json::from_value::<ShardState>(state).context("to shard state")
                });

            let result = match inspect_state {
                Ok(state) if state.is_tombstone() => RetryResult::Ok(()),
                Ok(state) => {
                    if retry_state.i == 0 {
                        print!("shard isn't tombstoned; sleeping to see if it gets cleaned up.");
                    }
                    if let Some(backoff) = retry_state.next_backoff {
                        if !backoff.is_zero() {
                            print!(" {:.0?}", backoff);
                        }
                    }
                    std::io::stdout().flush().expect("flushing stdout");

                    RetryResult::RetryableErr(anyhow!("non-tombstone state: {state:?}"))
                }
                Result::Err(e) => RetryResult::FatalErr(e),
            };

            (client, result)
        })
        .await;

    result
}

/// Parts of a shard's state that we read to determine if it's a tombstone.
#[derive(Debug, Serialize, Deserialize)]
struct ShardState {
    leased_readers: BTreeMap<String, serde_json::Value>,
    critical_readers: BTreeMap<String, serde_json::Value>,
    writers: BTreeMap<String, serde_json::Value>,
    since: Vec<mz_repr::Timestamp>,
    upper: Vec<mz_repr::Timestamp>,
}

impl ShardState {
    /// Returns if this shard is currently a tombstsone.
    fn is_tombstone(&self) -> bool {
        self.upper.is_empty()
            && self.since.is_empty()
            && self.writers.is_empty()
            && self.leased_readers.is_empty()
            && self.critical_readers.is_empty()
    }
}