mz_testdrive/
lib.rs
1#![warn(missing_docs)]
13
14use std::fs::File;
15use std::io::{self, Read, Write};
16use std::path::Path;
17
18use action::Run;
19use anyhow::{Context, anyhow};
20use mz_ore::error::ErrorExt;
21use tempfile::NamedTempFile;
22use tracing::debug;
23
24use crate::action::ControlFlow;
25use crate::error::{ErrorLocation, PosError};
26use crate::parser::{BuiltinCommand, Command, LineReader};
27
28mod action;
29mod error;
30mod format;
31mod parser;
32mod util;
33
34pub use crate::action::consistency::Level as ConsistencyCheckLevel;
35pub use crate::action::{CatalogConfig, Config};
36pub use crate::error::Error;
37
38pub async fn run_file(config: &Config, filename: &Path) -> Result<(), Error> {
40 let mut file =
41 File::open(filename).with_context(|| format!("opening {}", filename.display()))?;
42 let mut contents = String::new();
43 file.read_to_string(&mut contents)
44 .with_context(|| format!("reading {}", filename.display()))?;
45 run_string(config, Some(filename), &contents).await
46}
47
48pub async fn run_stdin(config: &Config) -> Result<(), Error> {
50 let mut contents = String::new();
51 io::stdin()
52 .read_to_string(&mut contents)
53 .context("reading <stdin>")?;
54 run_string(config, None, &contents).await
55}
56
57pub async fn run_string(
63 config: &Config,
64 filename: Option<&Path>,
65 contents: &str,
66) -> Result<(), Error> {
67 if let Some(f) = filename {
68 println!("--- {}", f.display());
69 }
70
71 let mut line_reader = LineReader::new(contents);
72 run_line_reader(config, &mut line_reader, contents, filename)
73 .await
74 .map_err(|e| {
75 let location = e.pos.map(|pos| {
76 let (line, col) = line_reader.line_col(pos);
77 ErrorLocation::new(filename, contents, line, col)
78 });
79 Error::new(e.source, location)
80 })
81}
82
83pub(crate) async fn run_line_reader(
84 config: &Config,
85 line_reader: &mut LineReader<'_>,
86 contents: &str,
87 filename: Option<&Path>,
88) -> Result<(), PosError> {
89 let cmds = parser::parse(line_reader)?;
93
94 if cmds.is_empty() {
95 return Err(PosError::from(anyhow!("No input provided!")));
96 } else {
97 debug!("Received {} commands to run", cmds.len());
98 }
99
100 let has_kafka_cmd = cmds.iter().any(|cmd| {
101 matches!(
102 &cmd.command,
103 Command::Builtin(BuiltinCommand { name, .. }, _) if name.starts_with("kafka-"),
104 )
105 });
106
107 let (mut state, state_cleanup) = action::create_state(config).await?;
108
109 if config.reset {
110 state.reset_materialize().await?;
117
118 if has_kafka_cmd {
122 state.reset_kafka().await?;
123 }
124 }
125
126 let mut errors = Vec::new();
127
128 let mut skipping = false;
129
130 for cmd in cmds {
131 if skipping {
132 if let Command::Builtin(builtin, _) = cmd.command {
133 if builtin.name == "skip-end" {
134 println!("skip-end reached");
135 skipping = false;
136 } else if builtin.name == "skip-if" {
137 errors.push(PosError {
138 source: anyhow!("nested skip-if not allowed"),
139 pos: Some(cmd.pos),
140 });
141 break;
142 }
143 }
144 continue;
145 }
146
147 match cmd.run(&mut state).await {
148 Ok(ControlFlow::Continue) => (),
149 Ok(ControlFlow::SkipBegin) => {
150 skipping = true;
151 ()
152 }
153 Ok(ControlFlow::SkipEnd) => (),
155 Err(e) => {
156 errors.push(e);
157 break;
158 }
159 }
160 }
161 if config.consistency_checks == action::consistency::Level::File {
162 if let Err(e) = action::consistency::run_consistency_checks(&state).await {
163 errors.push(e.into());
164 }
165 }
166 state.clear_skip_consistency_checks();
167
168 if config.rewrite_results {
169 let mut f = NamedTempFile::new_in(filename.unwrap().parent().unwrap()).unwrap();
170 let mut pos = 0;
171 for rewrite in &state.rewrites {
172 write!(f, "{}", &contents[pos..rewrite.start]).expect("rewriting results");
173 write!(f, "{}", rewrite.content).expect("rewriting results");
174 pos = rewrite.end;
175 }
176 write!(f, "{}", &contents[pos..]).expect("rewriting results");
177 f.persist(filename.unwrap()).expect("rewriting results");
178 }
179
180 if config.reset {
181 drop(state);
182 if let Err(e) = state_cleanup.await {
183 errors.push(anyhow!("cleanup failed: error: {}", e.to_string_with_causes()).into());
184 }
185 }
186
187 if errors.is_empty() {
188 Ok(())
189 } else {
190 Err(errors.remove(0))
192 }
193}