mz_testdrive/action/
file.rs
1use std::path::{self, PathBuf};
11use std::str::FromStr;
12
13use anyhow::bail;
14use async_compression::tokio::write::{BzEncoder, GzipEncoder, XzEncoder, ZstdEncoder};
15use tokio::fs::{self, OpenOptions};
16use tokio::io::{AsyncWrite, AsyncWriteExt};
17
18use crate::action::{ControlFlow, State};
19use crate::format::bytes;
20use crate::parser::BuiltinCommand;
21
22pub enum Compression {
23 Bzip2,
24 Gzip,
25 Xz,
26 Zstd,
27 None,
28}
29
30impl FromStr for Compression {
31 type Err = anyhow::Error;
32
33 fn from_str(s: &str) -> Result<Self, anyhow::Error> {
34 match s {
35 "bzip2" => Ok(Compression::Bzip2),
36 "gzip" => Ok(Compression::Gzip),
37 "xz" => Ok(Compression::Xz),
38 "zstd" => Ok(Compression::Zstd),
39 "none" => Ok(Compression::None),
40 f => bail!("unknown compression format: {}", f),
41 }
42 }
43}
44
45pub(crate) fn build_compression(cmd: &mut BuiltinCommand) -> Result<Compression, anyhow::Error> {
46 match cmd.args.opt_string("compression") {
47 Some(s) => s.parse(),
48 None => Ok(Compression::None),
49 }
50}
51
52pub(crate) fn build_contents(
54 cmd: &mut BuiltinCommand,
55) -> Result<Box<dyn Iterator<Item = Vec<u8>> + Send + Sync + 'static>, anyhow::Error> {
56 let header = cmd.args.opt_string("header");
57 let trailing_newline = cmd.args.opt_bool("trailing-newline")?.unwrap_or(true);
58 let repeat = cmd.args.opt_parse("repeat")?.unwrap_or(1);
59
60 let mut contents = vec![];
62 for line in &cmd.input {
63 contents.push(bytes::unescape(line.as_bytes())?);
64 }
65 if !trailing_newline {
66 contents.pop();
67 }
68
69 let header_line = header.into_iter().map(|val| val.as_bytes().to_vec());
70 let content_lines = std::iter::repeat_n(contents, repeat).flatten();
71
72 Ok(Box::new(header_line.chain(content_lines)))
73}
74
75fn build_path(state: &State, cmd: &mut BuiltinCommand) -> Result<PathBuf, anyhow::Error> {
76 let path = cmd.args.string("path")?;
77 let container = cmd.args.opt_string("container");
78
79 if path.contains(path::MAIN_SEPARATOR) {
80 bail!("separators in paths are forbidden")
82 }
83
84 match container.as_deref() {
85 None => Ok(state.temp_path.join(path)),
86 Some("fivetran") => Ok(PathBuf::from(&state.fivetran_destination_files_path).join(path)),
87 Some(x) => bail!("Unrecognized container '{x}'"),
88 }
89}
90
91pub async fn run_append(
92 mut cmd: BuiltinCommand,
93 state: &State,
94) -> Result<ControlFlow, anyhow::Error> {
95 let path = build_path(state, &mut cmd)?;
96 let compression = build_compression(&mut cmd)?;
97 let contents = build_contents(&mut cmd)?;
98 cmd.args.done()?;
99
100 println!("Appending to file {}", path.display());
101 let file = OpenOptions::new()
102 .create(true)
103 .append(true)
104 .open(&path)
105 .await?;
106
107 let mut file: Box<dyn AsyncWrite + Unpin + Send> = match compression {
108 Compression::Gzip => Box::new(GzipEncoder::new(file)),
109 Compression::Bzip2 => Box::new(BzEncoder::new(file)),
110 Compression::Xz => Box::new(XzEncoder::new(file)),
111 Compression::Zstd => Box::new(ZstdEncoder::new(file)),
112 Compression::None => Box::new(file),
113 };
114
115 for line in contents {
116 file.write_all(&line).await?;
117 file.write_all("\n".as_bytes()).await?;
118 }
119 file.shutdown().await?;
120
121 Ok(ControlFlow::Continue)
122}
123
124pub async fn run_delete(
125 mut cmd: BuiltinCommand,
126 state: &State,
127) -> Result<ControlFlow, anyhow::Error> {
128 let path = build_path(state, &mut cmd)?;
129 cmd.args.done()?;
130 println!("Deleting file {}", path.display());
131 fs::remove_file(&path).await?;
132 Ok(ControlFlow::Continue)
133}