1#![recursion_limit = "256"]
13#![warn(missing_docs)]
14
15use std::fs::File;
16use std::io::{self, Read, Write};
17use std::path::Path;
18
19use action::Run;
20use anyhow::{Context, anyhow};
21use mz_ore::error::ErrorExt;
22use tempfile::NamedTempFile;
23use tracing::debug;
24
25use crate::action::ControlFlow;
26use crate::error::{ErrorLocation, PosError};
27use crate::parser::{BuiltinCommand, Command, LineReader};
28
29mod action;
30mod error;
31mod format;
32mod parser;
33mod util;
34
35pub use crate::action::consistency::Level as ConsistencyCheckLevel;
36pub use crate::action::{CatalogConfig, Config};
37pub use crate::error::Error;
38
39pub async fn run_file(config: &Config, filename: &Path) -> Result<(), Error> {
41 let mut file =
42 File::open(filename).with_context(|| format!("opening {}", filename.display()))?;
43 let mut contents = String::new();
44 file.read_to_string(&mut contents)
45 .with_context(|| format!("reading {}", filename.display()))?;
46 run_string(config, Some(filename), &contents).await
47}
48
49pub async fn run_stdin(config: &Config) -> Result<(), Error> {
51 let mut contents = String::new();
52 io::stdin()
53 .read_to_string(&mut contents)
54 .context("reading <stdin>")?;
55 run_string(config, None, &contents).await
56}
57
58pub async fn run_string(
64 config: &Config,
65 filename: Option<&Path>,
66 contents: &str,
67) -> Result<(), Error> {
68 if let Some(f) = filename {
69 println!("--- {}", f.display());
70 }
71
72 let mut line_reader = LineReader::new(contents);
73 run_line_reader(config, &mut line_reader, contents, filename)
74 .await
75 .map_err(|e| {
76 let location = e.pos.map(|pos| {
77 let (line, col) = line_reader.line_col(pos);
78 ErrorLocation::new(filename, contents, line, col)
79 });
80 Error::new(e.source, location)
81 })
82}
83
84pub(crate) async fn run_line_reader(
85 config: &Config,
86 line_reader: &mut LineReader<'_>,
87 contents: &str,
88 filename: Option<&Path>,
89) -> Result<(), PosError> {
90 let cmds = parser::parse(line_reader)?;
94
95 if cmds.is_empty() {
96 return Err(PosError::from(anyhow!("No input provided!")));
97 } else {
98 debug!("Received {} commands to run", cmds.len());
99 }
100
101 let has_kafka_cmd = cmds.iter().any(|cmd| {
102 matches!(
103 &cmd.command,
104 Command::Builtin(BuiltinCommand { name, .. }, _) if name.starts_with("kafka-"),
105 )
106 });
107
108 let (mut state, state_cleanup) = action::create_state(config).await?;
109
110 if config.reset {
111 state.reset_materialize().await?;
118
119 if has_kafka_cmd {
123 state.reset_kafka().await?;
124 }
125 }
126
127 let mut errors = Vec::new();
128
129 let mut skipping = false;
130
131 for cmd in cmds {
132 if skipping {
133 if let Command::Builtin(builtin, _) = cmd.command {
134 if builtin.name == "skip-end" {
135 println!("skip-end reached");
136 skipping = false;
137 } else if builtin.name == "skip-if" {
138 errors.push(PosError {
139 source: anyhow!("nested skip-if not allowed"),
140 pos: Some(cmd.pos),
141 });
142 break;
143 }
144 }
145 continue;
146 }
147
148 match cmd.run(&mut state).await {
149 Ok(ControlFlow::Continue) => (),
150 Ok(ControlFlow::SkipBegin) => {
151 skipping = true;
152 ()
153 }
154 Ok(ControlFlow::SkipEnd) => (),
156 Err(e) => {
157 errors.push(e);
158 break;
159 }
160 }
161 }
162 let mut consistency_checks_succeeded = true;
163 if config.consistency_checks == action::consistency::Level::File {
164 if let Err(e) = action::consistency::run_consistency_checks(&state).await {
165 consistency_checks_succeeded = false;
166 errors.push(e.into());
167 }
168 }
169 state.clear_skip_consistency_checks();
170
171 if config.rewrite_results && consistency_checks_succeeded {
172 let mut f = NamedTempFile::new_in(filename.unwrap().parent().unwrap()).unwrap();
173 let mut pos = 0;
174 for rewrite in &state.rewrites {
175 write!(f, "{}", &contents[pos..rewrite.start]).expect("rewriting results");
176 write!(f, "{}", rewrite.content).expect("rewriting results");
177 pos = rewrite.end;
178 }
179 write!(f, "{}", &contents[pos..]).expect("rewriting results");
180 f.persist(filename.unwrap()).expect("rewriting results");
181 }
182
183 if config.reset {
184 drop(state);
185 if let Err(e) = state_cleanup.await {
186 errors.push(anyhow!("cleanup failed: error: {}", e.to_string_with_causes()).into());
187 }
188 }
189
190 if errors.is_empty() {
191 Ok(())
192 } else {
193 Err(errors.remove(0))
195 }
196}