mz_testdrive/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Integration test driver for Materialize.
11
12#![warn(missing_docs)]
13
14use std::fs::File;
15use std::io::{self, Read, Write};
16use std::path::Path;
17
18use action::Run;
19use anyhow::{Context, anyhow};
20use mz_ore::error::ErrorExt;
21use tempfile::NamedTempFile;
22use tracing::debug;
23
24use crate::action::ControlFlow;
25use crate::error::{ErrorLocation, PosError};
26use crate::parser::{BuiltinCommand, Command, LineReader};
27
28mod action;
29mod error;
30mod format;
31mod parser;
32mod util;
33
34pub use crate::action::consistency::Level as ConsistencyCheckLevel;
35pub use crate::action::{CatalogConfig, Config};
36pub use crate::error::Error;
37
38/// Runs a testdrive script stored in a file.
39pub async fn run_file(config: &Config, filename: &Path) -> Result<(), Error> {
40    let mut file =
41        File::open(filename).with_context(|| format!("opening {}", filename.display()))?;
42    let mut contents = String::new();
43    file.read_to_string(&mut contents)
44        .with_context(|| format!("reading {}", filename.display()))?;
45    run_string(config, Some(filename), &contents).await
46}
47
48/// Runs a testdrive script from the standard input.
49pub async fn run_stdin(config: &Config) -> Result<(), Error> {
50    let mut contents = String::new();
51    io::stdin()
52        .read_to_string(&mut contents)
53        .context("reading <stdin>")?;
54    run_string(config, None, &contents).await
55}
56
57/// Runs a testdrive script stored in a string.
58///
59/// The script in `contents` is used verbatim. The provided `filename` is used
60/// only as output in error messages and such. No attempt is made to read
61/// `filename`.
62pub async fn run_string(
63    config: &Config,
64    filename: Option<&Path>,
65    contents: &str,
66) -> Result<(), Error> {
67    if let Some(f) = filename {
68        println!("--- {}", f.display());
69    }
70
71    let mut line_reader = LineReader::new(contents);
72    run_line_reader(config, &mut line_reader, contents, filename)
73        .await
74        .map_err(|e| {
75            let location = e.pos.map(|pos| {
76                let (line, col) = line_reader.line_col(pos);
77                ErrorLocation::new(filename, contents, line, col)
78            });
79            Error::new(e.source, location)
80        })
81}
82
83pub(crate) async fn run_line_reader(
84    config: &Config,
85    line_reader: &mut LineReader<'_>,
86    contents: &str,
87    filename: Option<&Path>,
88) -> Result<(), PosError> {
89    // TODO(benesch): consider sharing state between files, to avoid
90    // reconnections for every file. For now it's nice to not open any
91    // connections until after parsing.
92    let cmds = parser::parse(line_reader)?;
93
94    if cmds.is_empty() {
95        return Err(PosError::from(anyhow!("No input provided!")));
96    } else {
97        debug!("Received {} commands to run", cmds.len());
98    }
99
100    let has_kafka_cmd = cmds.iter().any(|cmd| {
101        matches!(
102            &cmd.command,
103            Command::Builtin(BuiltinCommand { name, .. }, _) if name.starts_with("kafka-"),
104        )
105    });
106
107    let (mut state, state_cleanup) = action::create_state(config).await?;
108
109    if config.reset {
110        // Delete any existing Materialize and Kafka state *before* the test
111        // script starts. We don't clean up Materialize or Kafka state at the
112        // end of the script because it's useful to leave the state around,
113        // e.g., for debugging, or when using a testdrive script to set up
114        // Materialize for further tinkering.
115
116        state.reset_materialize().await?;
117
118        // Only try to clean up Kafka state if the test script uses a Kafka
119        // action. Tests that don't use Kafka likely don't have a Kafka
120        // broker available.
121        if has_kafka_cmd {
122            state.reset_kafka().await?;
123        }
124    }
125
126    let mut errors = Vec::new();
127
128    let mut skipping = false;
129
130    for cmd in cmds {
131        if skipping {
132            if let Command::Builtin(builtin, _) = cmd.command {
133                if builtin.name == "skip-end" {
134                    println!("skip-end reached");
135                    skipping = false;
136                } else if builtin.name == "skip-if" {
137                    errors.push(PosError {
138                        source: anyhow!("nested skip-if not allowed"),
139                        pos: Some(cmd.pos),
140                    });
141                    break;
142                }
143            }
144            continue;
145        }
146
147        match cmd.run(&mut state).await {
148            Ok(ControlFlow::Continue) => (),
149            Ok(ControlFlow::SkipBegin) => {
150                skipping = true;
151                ()
152            }
153            // ignore, already handled above
154            Ok(ControlFlow::SkipEnd) => (),
155            Err(e) => {
156                errors.push(e);
157                break;
158            }
159        }
160    }
161    let mut consistency_checks_succeeded = true;
162    if config.consistency_checks == action::consistency::Level::File {
163        if let Err(e) = action::consistency::run_consistency_checks(&state).await {
164            consistency_checks_succeeded = false;
165            errors.push(e.into());
166        }
167    }
168    state.clear_skip_consistency_checks();
169
170    if config.rewrite_results && consistency_checks_succeeded {
171        let mut f = NamedTempFile::new_in(filename.unwrap().parent().unwrap()).unwrap();
172        let mut pos = 0;
173        for rewrite in &state.rewrites {
174            write!(f, "{}", &contents[pos..rewrite.start]).expect("rewriting results");
175            write!(f, "{}", rewrite.content).expect("rewriting results");
176            pos = rewrite.end;
177        }
178        write!(f, "{}", &contents[pos..]).expect("rewriting results");
179        f.persist(filename.unwrap()).expect("rewriting results");
180    }
181
182    if config.reset {
183        drop(state);
184        if let Err(e) = state_cleanup.await {
185            errors.push(anyhow!("cleanup failed: error: {}", e.to_string_with_causes()).into());
186        }
187    }
188
189    if errors.is_empty() {
190        Ok(())
191    } else {
192        // Only surface the first error encountered for sake of simplicity
193        Err(errors.remove(0))
194    }
195}