Skip to main content

mz_testdrive/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Integration test driver for Materialize.
11
12#![recursion_limit = "256"]
13#![warn(missing_docs)]
14
15use std::fs::File;
16use std::io::{self, Read, Write};
17use std::path::Path;
18
19use action::Run;
20use anyhow::{Context, anyhow};
21use mz_ore::error::ErrorExt;
22use tempfile::NamedTempFile;
23use tracing::debug;
24
25use crate::action::ControlFlow;
26use crate::error::{ErrorLocation, PosError};
27use crate::parser::{BuiltinCommand, Command, LineReader};
28
29mod action;
30mod error;
31mod format;
32mod parser;
33mod util;
34
35pub use crate::action::consistency::Level as ConsistencyCheckLevel;
36pub use crate::action::{CatalogConfig, Config};
37pub use crate::error::Error;
38
39/// Runs a testdrive script stored in a file.
40pub async fn run_file(config: &Config, filename: &Path) -> Result<(), Error> {
41    let mut file =
42        File::open(filename).with_context(|| format!("opening {}", filename.display()))?;
43    let mut contents = String::new();
44    file.read_to_string(&mut contents)
45        .with_context(|| format!("reading {}", filename.display()))?;
46    run_string(config, Some(filename), &contents).await
47}
48
49/// Runs a testdrive script from the standard input.
50pub async fn run_stdin(config: &Config) -> Result<(), Error> {
51    let mut contents = String::new();
52    io::stdin()
53        .read_to_string(&mut contents)
54        .context("reading <stdin>")?;
55    run_string(config, None, &contents).await
56}
57
58/// Runs a testdrive script stored in a string.
59///
60/// The script in `contents` is used verbatim. The provided `filename` is used
61/// only as output in error messages and such. No attempt is made to read
62/// `filename`.
63pub async fn run_string(
64    config: &Config,
65    filename: Option<&Path>,
66    contents: &str,
67) -> Result<(), Error> {
68    if let Some(f) = filename {
69        println!("--- {}", f.display());
70    }
71
72    let mut line_reader = LineReader::new(contents);
73    run_line_reader(config, &mut line_reader, contents, filename)
74        .await
75        .map_err(|e| {
76            let location = e.pos.map(|pos| {
77                let (line, col) = line_reader.line_col(pos);
78                ErrorLocation::new(filename, contents, line, col)
79            });
80            Error::new(e.source, location)
81        })
82}
83
84pub(crate) async fn run_line_reader(
85    config: &Config,
86    line_reader: &mut LineReader<'_>,
87    contents: &str,
88    filename: Option<&Path>,
89) -> Result<(), PosError> {
90    // TODO(benesch): consider sharing state between files, to avoid
91    // reconnections for every file. For now it's nice to not open any
92    // connections until after parsing.
93    let cmds = parser::parse(line_reader)?;
94
95    if cmds.is_empty() {
96        return Err(PosError::from(anyhow!("No input provided!")));
97    } else {
98        debug!("Received {} commands to run", cmds.len());
99    }
100
101    let has_kafka_cmd = cmds.iter().any(|cmd| {
102        matches!(
103            &cmd.command,
104            Command::Builtin(BuiltinCommand { name, .. }, _) if name.starts_with("kafka-"),
105        )
106    });
107
108    let (mut state, state_cleanup) = action::create_state(config).await?;
109
110    if config.reset {
111        // Delete any existing Materialize and Kafka state *before* the test
112        // script starts. We don't clean up Materialize or Kafka state at the
113        // end of the script because it's useful to leave the state around,
114        // e.g., for debugging, or when using a testdrive script to set up
115        // Materialize for further tinkering.
116
117        state.reset_materialize().await?;
118
119        // Only try to clean up Kafka state if the test script uses a Kafka
120        // action. Tests that don't use Kafka likely don't have a Kafka
121        // broker available.
122        if has_kafka_cmd {
123            state.reset_kafka().await?;
124        }
125    }
126
127    let mut errors = Vec::new();
128
129    let mut skipping = false;
130
131    for cmd in cmds {
132        if skipping {
133            if let Command::Builtin(builtin, _) = cmd.command {
134                if builtin.name == "skip-end" {
135                    println!("skip-end reached");
136                    skipping = false;
137                } else if builtin.name == "skip-if" {
138                    errors.push(PosError {
139                        source: anyhow!("nested skip-if not allowed"),
140                        pos: Some(cmd.pos),
141                    });
142                    break;
143                }
144            }
145            continue;
146        }
147
148        match cmd.run(&mut state).await {
149            Ok(ControlFlow::Continue) => (),
150            Ok(ControlFlow::SkipBegin) => {
151                skipping = true;
152                ()
153            }
154            // ignore, already handled above
155            Ok(ControlFlow::SkipEnd) => (),
156            Err(e) => {
157                errors.push(e);
158                break;
159            }
160        }
161    }
162    let mut consistency_checks_succeeded = true;
163    if config.consistency_checks == action::consistency::Level::File {
164        if let Err(e) = action::consistency::run_consistency_checks(&state).await {
165            consistency_checks_succeeded = false;
166            errors.push(e.into());
167        }
168    }
169    state.clear_skip_consistency_checks();
170
171    if config.rewrite_results && consistency_checks_succeeded {
172        let mut f = NamedTempFile::new_in(filename.unwrap().parent().unwrap()).unwrap();
173        let mut pos = 0;
174        for rewrite in &state.rewrites {
175            write!(f, "{}", &contents[pos..rewrite.start]).expect("rewriting results");
176            write!(f, "{}", rewrite.content).expect("rewriting results");
177            pos = rewrite.end;
178        }
179        write!(f, "{}", &contents[pos..]).expect("rewriting results");
180        f.persist(filename.unwrap()).expect("rewriting results");
181    }
182
183    if config.reset {
184        drop(state);
185        if let Err(e) = state_cleanup.await {
186            errors.push(anyhow!("cleanup failed: error: {}", e.to_string_with_causes()).into());
187        }
188    }
189
190    if errors.is_empty() {
191        Ok(())
192    } else {
193        // Only surface the first error encountered for sake of simplicity
194        Err(errors.remove(0))
195    }
196}