#![warn(missing_docs)]
use std::fs::File;
use std::io::{self, Read, Write};
use std::path::Path;
use action::Run;
use anyhow::{anyhow, Context};
use mz_ore::error::ErrorExt;
use tempfile::NamedTempFile;
use tracing::debug;
use crate::action::ControlFlow;
use crate::error::{ErrorLocation, PosError};
use crate::parser::{BuiltinCommand, Command, LineReader};
mod action;
mod error;
mod format;
mod parser;
mod util;
pub use crate::action::consistency::Level as ConsistencyCheckLevel;
pub use crate::action::{CatalogConfig, Config};
pub use crate::error::Error;
pub async fn run_file(config: &Config, filename: &Path) -> Result<(), Error> {
let mut file =
File::open(filename).with_context(|| format!("opening {}", filename.display()))?;
let mut contents = String::new();
file.read_to_string(&mut contents)
.with_context(|| format!("reading {}", filename.display()))?;
run_string(config, Some(filename), &contents).await
}
pub async fn run_stdin(config: &Config) -> Result<(), Error> {
let mut contents = String::new();
io::stdin()
.read_to_string(&mut contents)
.context("reading <stdin>")?;
run_string(config, None, &contents).await
}
pub async fn run_string(
config: &Config,
filename: Option<&Path>,
contents: &str,
) -> Result<(), Error> {
if let Some(f) = filename {
println!("--- {}", f.display());
}
let mut line_reader = LineReader::new(contents);
run_line_reader(config, &mut line_reader, contents, filename)
.await
.map_err(|e| {
let location = e.pos.map(|pos| {
let (line, col) = line_reader.line_col(pos);
ErrorLocation::new(filename, contents, line, col)
});
Error::new(e.source, location)
})
}
pub(crate) async fn run_line_reader(
config: &Config,
line_reader: &mut LineReader<'_>,
contents: &str,
filename: Option<&Path>,
) -> Result<(), PosError> {
let cmds = parser::parse(line_reader)?;
if cmds.is_empty() {
return Err(PosError::from(anyhow!("No input provided!")));
} else {
debug!("Received {} commands to run", cmds.len());
}
let has_kafka_cmd = cmds.iter().any(|cmd| {
matches!(
&cmd.command,
Command::Builtin(BuiltinCommand { name, .. }, _) if name.starts_with("kafka-"),
)
});
let (mut state, state_cleanup) = action::create_state(config).await?;
if config.reset {
state.reset_materialize().await?;
if has_kafka_cmd {
state.reset_kafka().await?;
}
}
let mut errors = Vec::new();
let mut skipping = false;
for cmd in cmds {
if skipping {
if let Command::Builtin(builtin, _) = cmd.command {
if builtin.name == "skip-end" {
println!("skip-end reached");
skipping = false;
} else if builtin.name == "skip-if" {
errors.push(PosError {
source: anyhow!("nested skip-if not allowed"),
pos: Some(cmd.pos),
});
break;
}
}
continue;
}
match cmd.run(&mut state).await {
Ok(ControlFlow::Continue) => (),
Ok(ControlFlow::SkipBegin) => {
skipping = true;
()
}
Ok(ControlFlow::SkipEnd) => (),
Err(e) => {
errors.push(e);
break;
}
}
}
if config.consistency_checks == action::consistency::Level::File {
if let Err(e) = action::consistency::run_consistency_checks(&state).await {
errors.push(e.into());
}
}
state.clear_skip_consistency_checks();
if config.rewrite_results {
let mut f = NamedTempFile::new_in(filename.unwrap().parent().unwrap()).unwrap();
let mut pos = 0;
for rewrite in &state.rewrites {
write!(f, "{}", &contents[pos..rewrite.start]).expect("rewriting results");
write!(f, "{}", rewrite.content).expect("rewriting results");
pos = rewrite.end;
}
write!(f, "{}", &contents[pos..]).expect("rewriting results");
f.persist(filename.unwrap()).expect("rewriting results");
}
if config.reset {
drop(state);
if let Err(e) = state_cleanup.await {
errors.push(anyhow!("cleanup failed: error: {}", e.to_string_with_causes()).into());
}
}
if errors.is_empty() {
Ok(())
} else {
Err(errors.remove(0))
}
}