1#![warn(missing_docs)]
13
14use std::fs::File;
15use std::io::{self, Read, Write};
16use std::path::Path;
17
18use action::Run;
19use anyhow::{Context, anyhow};
20use mz_ore::error::ErrorExt;
21use tempfile::NamedTempFile;
22use tracing::debug;
23
24use crate::action::ControlFlow;
25use crate::error::{ErrorLocation, PosError};
26use crate::parser::{BuiltinCommand, Command, LineReader};
27
28mod action;
29mod error;
30mod format;
31mod parser;
32mod util;
33
34pub use crate::action::consistency::Level as ConsistencyCheckLevel;
35pub use crate::action::{CatalogConfig, Config};
36pub use crate::error::Error;
37
38pub async fn run_file(config: &Config, filename: &Path) -> Result<(), Error> {
40 let mut file =
41 File::open(filename).with_context(|| format!("opening {}", filename.display()))?;
42 let mut contents = String::new();
43 file.read_to_string(&mut contents)
44 .with_context(|| format!("reading {}", filename.display()))?;
45 run_string(config, Some(filename), &contents).await
46}
47
48pub async fn run_stdin(config: &Config) -> Result<(), Error> {
50 let mut contents = String::new();
51 io::stdin()
52 .read_to_string(&mut contents)
53 .context("reading <stdin>")?;
54 run_string(config, None, &contents).await
55}
56
57pub async fn run_string(
63 config: &Config,
64 filename: Option<&Path>,
65 contents: &str,
66) -> Result<(), Error> {
67 if let Some(f) = filename {
68 println!("--- {}", f.display());
69 }
70
71 let mut line_reader = LineReader::new(contents);
72 run_line_reader(config, &mut line_reader, contents, filename)
73 .await
74 .map_err(|e| {
75 let location = e.pos.map(|pos| {
76 let (line, col) = line_reader.line_col(pos);
77 ErrorLocation::new(filename, contents, line, col)
78 });
79 Error::new(e.source, location)
80 })
81}
82
83pub(crate) async fn run_line_reader(
84 config: &Config,
85 line_reader: &mut LineReader<'_>,
86 contents: &str,
87 filename: Option<&Path>,
88) -> Result<(), PosError> {
89 let cmds = parser::parse(line_reader)?;
93
94 if cmds.is_empty() {
95 return Err(PosError::from(anyhow!("No input provided!")));
96 } else {
97 debug!("Received {} commands to run", cmds.len());
98 }
99
100 let has_kafka_cmd = cmds.iter().any(|cmd| {
101 matches!(
102 &cmd.command,
103 Command::Builtin(BuiltinCommand { name, .. }, _) if name.starts_with("kafka-"),
104 )
105 });
106
107 let (mut state, state_cleanup) = action::create_state(config).await?;
108
109 if config.reset {
110 state.reset_materialize().await?;
117
118 if has_kafka_cmd {
122 state.reset_kafka().await?;
123 }
124 }
125
126 let mut errors = Vec::new();
127
128 let mut skipping = false;
129
130 for cmd in cmds {
131 if skipping {
132 if let Command::Builtin(builtin, _) = cmd.command {
133 if builtin.name == "skip-end" {
134 println!("skip-end reached");
135 skipping = false;
136 } else if builtin.name == "skip-if" {
137 errors.push(PosError {
138 source: anyhow!("nested skip-if not allowed"),
139 pos: Some(cmd.pos),
140 });
141 break;
142 }
143 }
144 continue;
145 }
146
147 match cmd.run(&mut state).await {
148 Ok(ControlFlow::Continue) => (),
149 Ok(ControlFlow::SkipBegin) => {
150 skipping = true;
151 ()
152 }
153 Ok(ControlFlow::SkipEnd) => (),
155 Err(e) => {
156 errors.push(e);
157 break;
158 }
159 }
160 }
161 let mut consistency_checks_succeeded = true;
162 if config.consistency_checks == action::consistency::Level::File {
163 if let Err(e) = action::consistency::run_consistency_checks(&state).await {
164 consistency_checks_succeeded = false;
165 errors.push(e.into());
166 }
167 }
168 state.clear_skip_consistency_checks();
169
170 if config.rewrite_results && consistency_checks_succeeded {
171 let mut f = NamedTempFile::new_in(filename.unwrap().parent().unwrap()).unwrap();
172 let mut pos = 0;
173 for rewrite in &state.rewrites {
174 write!(f, "{}", &contents[pos..rewrite.start]).expect("rewriting results");
175 write!(f, "{}", rewrite.content).expect("rewriting results");
176 pos = rewrite.end;
177 }
178 write!(f, "{}", &contents[pos..]).expect("rewriting results");
179 f.persist(filename.unwrap()).expect("rewriting results");
180 }
181
182 if config.reset {
183 drop(state);
184 if let Err(e) = state_cleanup.await {
185 errors.push(anyhow!("cleanup failed: error: {}", e.to_string_with_causes()).into());
186 }
187 }
188
189 if errors.is_empty() {
190 Ok(())
191 } else {
192 Err(errors.remove(0))
194 }
195}