mz_testdrive/action/
file.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::path::{self, PathBuf};
11use std::str::FromStr;
12
13use anyhow::bail;
14use async_compression::tokio::write::{BzEncoder, GzipEncoder, XzEncoder, ZstdEncoder};
15use tokio::fs::{self, OpenOptions};
16use tokio::io::{AsyncWrite, AsyncWriteExt};
17
18use crate::action::{ControlFlow, State};
19use crate::format::bytes;
20use crate::parser::BuiltinCommand;
21
22pub enum Compression {
23    Bzip2,
24    Gzip,
25    Xz,
26    Zstd,
27    None,
28}
29
30impl FromStr for Compression {
31    type Err = anyhow::Error;
32
33    fn from_str(s: &str) -> Result<Self, anyhow::Error> {
34        match s {
35            "bzip2" => Ok(Compression::Bzip2),
36            "gzip" => Ok(Compression::Gzip),
37            "xz" => Ok(Compression::Xz),
38            "zstd" => Ok(Compression::Zstd),
39            "none" => Ok(Compression::None),
40            f => bail!("unknown compression format: {}", f),
41        }
42    }
43}
44
45pub(crate) fn build_compression(cmd: &mut BuiltinCommand) -> Result<Compression, anyhow::Error> {
46    match cmd.args.opt_string("compression") {
47        Some(s) => s.parse(),
48        None => Ok(Compression::None),
49    }
50}
51
52/// Returns an iterator of lines that form the content of a file.
53pub(crate) fn build_contents(
54    cmd: &mut BuiltinCommand,
55) -> Result<Box<dyn Iterator<Item = Vec<u8>> + Send + Sync + 'static>, anyhow::Error> {
56    let header = cmd.args.opt_string("header");
57    let trailing_newline = cmd.args.opt_bool("trailing-newline")?.unwrap_or(true);
58    let repeat = cmd.args.opt_parse("repeat")?.unwrap_or(1);
59
60    // Collect our contents into a buffer.
61    let mut contents = vec![];
62    for line in &cmd.input {
63        contents.push(bytes::unescape(line.as_bytes())?);
64    }
65    if !trailing_newline {
66        contents.pop();
67    }
68
69    let header_line = header.into_iter().map(|val| val.as_bytes().to_vec());
70    let content_lines = std::iter::repeat_n(contents, repeat).flatten();
71
72    Ok(Box::new(header_line.chain(content_lines)))
73}
74
75fn build_path(state: &State, cmd: &mut BuiltinCommand) -> Result<PathBuf, anyhow::Error> {
76    let path = cmd.args.string("path")?;
77    let container = cmd.args.opt_string("container");
78
79    if path.contains(path::MAIN_SEPARATOR) {
80        // The goal isn't security, but preventing mistakes.
81        bail!("separators in paths are forbidden")
82    }
83
84    match container.as_deref() {
85        None => Ok(state.temp_path.join(path)),
86        Some("fivetran") => Ok(PathBuf::from(&state.fivetran_destination_files_path).join(path)),
87        Some(x) => bail!("Unrecognized container '{x}'"),
88    }
89}
90
91pub async fn run_append(
92    mut cmd: BuiltinCommand,
93    state: &State,
94) -> Result<ControlFlow, anyhow::Error> {
95    let path = build_path(state, &mut cmd)?;
96    let compression = build_compression(&mut cmd)?;
97    let contents = build_contents(&mut cmd)?;
98    cmd.args.done()?;
99
100    println!("Appending to file {}", path.display());
101    let file = OpenOptions::new()
102        .create(true)
103        .append(true)
104        .open(&path)
105        .await?;
106
107    let mut file: Box<dyn AsyncWrite + Unpin + Send> = match compression {
108        Compression::Gzip => Box::new(GzipEncoder::new(file)),
109        Compression::Bzip2 => Box::new(BzEncoder::new(file)),
110        Compression::Xz => Box::new(XzEncoder::new(file)),
111        Compression::Zstd => Box::new(ZstdEncoder::new(file)),
112        Compression::None => Box::new(file),
113    };
114
115    for line in contents {
116        file.write_all(&line).await?;
117        file.write_all("\n".as_bytes()).await?;
118    }
119    file.shutdown().await?;
120
121    Ok(ControlFlow::Continue)
122}
123
124pub async fn run_delete(
125    mut cmd: BuiltinCommand,
126    state: &State,
127) -> Result<ControlFlow, anyhow::Error> {
128    let path = build_path(state, &mut cmd)?;
129    cmd.args.done()?;
130    println!("Deleting file {}", path.display());
131    fs::remove_file(&path).await?;
132    Ok(ControlFlow::Continue)
133}