mz_testdrive/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Integration test driver for Materialize.
11
12#![warn(missing_docs)]
13
14use std::fs::File;
15use std::io::{self, Read, Write};
16use std::path::Path;
17
18use action::Run;
19use anyhow::{Context, anyhow};
20use mz_ore::error::ErrorExt;
21use tempfile::NamedTempFile;
22use tracing::debug;
23
24use crate::action::ControlFlow;
25use crate::error::{ErrorLocation, PosError};
26use crate::parser::{BuiltinCommand, Command, LineReader};
27
28mod action;
29mod error;
30mod format;
31mod parser;
32mod util;
33
34pub use crate::action::consistency::Level as ConsistencyCheckLevel;
35pub use crate::action::{CatalogConfig, Config};
36pub use crate::error::Error;
37
38/// Runs a testdrive script stored in a file.
39pub async fn run_file(config: &Config, filename: &Path) -> Result<(), Error> {
40    let mut file =
41        File::open(filename).with_context(|| format!("opening {}", filename.display()))?;
42    let mut contents = String::new();
43    file.read_to_string(&mut contents)
44        .with_context(|| format!("reading {}", filename.display()))?;
45    run_string(config, Some(filename), &contents).await
46}
47
48/// Runs a testdrive script from the standard input.
49pub async fn run_stdin(config: &Config) -> Result<(), Error> {
50    let mut contents = String::new();
51    io::stdin()
52        .read_to_string(&mut contents)
53        .context("reading <stdin>")?;
54    run_string(config, None, &contents).await
55}
56
57/// Runs a testdrive script stored in a string.
58///
59/// The script in `contents` is used verbatim. The provided `filename` is used
60/// only as output in error messages and such. No attempt is made to read
61/// `filename`.
62pub async fn run_string(
63    config: &Config,
64    filename: Option<&Path>,
65    contents: &str,
66) -> Result<(), Error> {
67    if let Some(f) = filename {
68        println!("--- {}", f.display());
69    }
70
71    let mut line_reader = LineReader::new(contents);
72    run_line_reader(config, &mut line_reader, contents, filename)
73        .await
74        .map_err(|e| {
75            let location = e.pos.map(|pos| {
76                let (line, col) = line_reader.line_col(pos);
77                ErrorLocation::new(filename, contents, line, col)
78            });
79            Error::new(e.source, location)
80        })
81}
82
83pub(crate) async fn run_line_reader(
84    config: &Config,
85    line_reader: &mut LineReader<'_>,
86    contents: &str,
87    filename: Option<&Path>,
88) -> Result<(), PosError> {
89    // TODO(benesch): consider sharing state between files, to avoid
90    // reconnections for every file. For now it's nice to not open any
91    // connections until after parsing.
92    let cmds = parser::parse(line_reader)?;
93
94    if cmds.is_empty() {
95        return Err(PosError::from(anyhow!("No input provided!")));
96    } else {
97        debug!("Received {} commands to run", cmds.len());
98    }
99
100    let has_kafka_cmd = cmds.iter().any(|cmd| {
101        matches!(
102            &cmd.command,
103            Command::Builtin(BuiltinCommand { name, .. }, _) if name.starts_with("kafka-"),
104        )
105    });
106
107    let (mut state, state_cleanup) = action::create_state(config).await?;
108
109    if config.reset {
110        // Delete any existing Materialize and Kafka state *before* the test
111        // script starts. We don't clean up Materialize or Kafka state at the
112        // end of the script because it's useful to leave the state around,
113        // e.g., for debugging, or when using a testdrive script to set up
114        // Materialize for further tinkering.
115
116        state.reset_materialize().await?;
117
118        // Only try to clean up Kafka state if the test script uses a Kafka
119        // action. Tests that don't use Kafka likely don't have a Kafka
120        // broker available.
121        if has_kafka_cmd {
122            state.reset_kafka().await?;
123        }
124    }
125
126    let mut errors = Vec::new();
127
128    let mut skipping = false;
129
130    for cmd in cmds {
131        if skipping {
132            if let Command::Builtin(builtin, _) = cmd.command {
133                if builtin.name == "skip-end" {
134                    println!("skip-end reached");
135                    skipping = false;
136                } else if builtin.name == "skip-if" {
137                    errors.push(PosError {
138                        source: anyhow!("nested skip-if not allowed"),
139                        pos: Some(cmd.pos),
140                    });
141                    break;
142                }
143            }
144            continue;
145        }
146
147        match cmd.run(&mut state).await {
148            Ok(ControlFlow::Continue) => (),
149            Ok(ControlFlow::SkipBegin) => {
150                skipping = true;
151                ()
152            }
153            // ignore, already handled above
154            Ok(ControlFlow::SkipEnd) => (),
155            Err(e) => {
156                errors.push(e);
157                break;
158            }
159        }
160    }
161    if config.consistency_checks == action::consistency::Level::File {
162        if let Err(e) = action::consistency::run_consistency_checks(&state).await {
163            errors.push(e.into());
164        }
165    }
166    state.clear_skip_consistency_checks();
167
168    if config.rewrite_results {
169        let mut f = NamedTempFile::new_in(filename.unwrap().parent().unwrap()).unwrap();
170        let mut pos = 0;
171        for rewrite in &state.rewrites {
172            write!(f, "{}", &contents[pos..rewrite.start]).expect("rewriting results");
173            write!(f, "{}", rewrite.content).expect("rewriting results");
174            pos = rewrite.end;
175        }
176        write!(f, "{}", &contents[pos..]).expect("rewriting results");
177        f.persist(filename.unwrap()).expect("rewriting results");
178    }
179
180    if config.reset {
181        drop(state);
182        if let Err(e) = state_cleanup.await {
183            errors.push(anyhow!("cleanup failed: error: {}", e.to_string_with_causes()).into());
184        }
185    }
186
187    if errors.is_empty() {
188        Ok(())
189    } else {
190        // Only surface the first error encountered for sake of simplicity
191        Err(errors.remove(0))
192    }
193}