mz_testdrive/action/
s3.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::pin::Pin;
11use std::str;
12use std::thread;
13use std::time::Duration;
14
15use anyhow::Context;
16use anyhow::bail;
17use arrow::util::display::ArrayFormatter;
18use arrow::util::display::FormatOptions;
19use async_compression::tokio::bufread::{BzEncoder, GzipEncoder, XzEncoder, ZstdEncoder};
20use regex::Regex;
21use tokio::io::{AsyncRead, AsyncReadExt};
22
23use crate::action::file::Compression;
24use crate::action::file::build_compression;
25use crate::action::file::build_contents;
26use crate::action::{ControlFlow, State};
27use crate::parser::BuiltinCommand;
28
29pub async fn run_verify_data(
30    mut cmd: BuiltinCommand,
31    state: &State,
32) -> Result<ControlFlow, anyhow::Error> {
33    let mut expected_body = cmd
34        .input
35        .into_iter()
36        // Strip suffix to allow lines with trailing whitespace
37        .map(|line| {
38            line.trim_end_matches("// allow-trailing-whitespace")
39                .to_string()
40        })
41        .collect::<Vec<String>>();
42    let bucket: String = cmd.args.parse("bucket")?;
43    let key: String = cmd.args.parse("key")?;
44    let sort_rows = cmd.args.opt_bool("sort-rows")?.unwrap_or(false);
45    cmd.args.done()?;
46
47    println!("Verifying contents of S3 bucket {bucket} key {key}...");
48
49    let client = mz_aws_util::s3::new_client(&state.aws_config);
50
51    // List the path until the INCOMPLETE sentinel file disappears so we know the
52    // data is complete.
53    let mut attempts = 0;
54    let all_files;
55    loop {
56        attempts += 1;
57        if attempts > 10 {
58            bail!("found incomplete sentinel file in path {key} after 10 attempts")
59        }
60
61        let files = client
62            .list_objects_v2()
63            .bucket(&bucket)
64            .prefix(&format!("{}/", key))
65            .send()
66            .await?;
67        match files.contents {
68            Some(files)
69                if files
70                    .iter()
71                    .any(|obj| obj.key().map_or(false, |key| key.contains("INCOMPLETE"))) =>
72            {
73                thread::sleep(Duration::from_secs(1))
74            }
75            None => bail!("no files found in bucket {bucket} key {key}"),
76            Some(files) => {
77                all_files = files;
78                break;
79            }
80        }
81    }
82
83    let mut rows = vec![];
84    for obj in all_files.iter() {
85        let file = client
86            .get_object()
87            .bucket(&bucket)
88            .key(obj.key().unwrap())
89            .send()
90            .await?;
91        let bytes = file.body.collect().await?.into_bytes();
92
93        let new_rows = match obj.key().unwrap() {
94            key if key.ends_with(".csv") => {
95                let actual_body = str::from_utf8(bytes.as_ref())?;
96                actual_body.lines().map(|l| l.to_string()).collect()
97            }
98            key if key.ends_with(".parquet") => rows_from_parquet(bytes),
99            key => bail!("unexpected file type: {key}"),
100        };
101        rows.extend(new_rows);
102    }
103    if sort_rows {
104        expected_body.sort();
105        rows.sort();
106    }
107    if rows != expected_body {
108        bail!(
109            "content did not match\nexpected:\n{:?}\n\nactual:\n{:?}",
110            expected_body,
111            rows
112        );
113    }
114
115    Ok(ControlFlow::Continue)
116}
117
118pub async fn run_verify_keys(
119    mut cmd: BuiltinCommand,
120    state: &State,
121) -> Result<ControlFlow, anyhow::Error> {
122    let bucket: String = cmd.args.parse("bucket")?;
123    let prefix_path: String = cmd.args.parse("prefix-path")?;
124    let key_pattern: Regex = cmd.args.parse("key-pattern")?;
125    let num_attempts = cmd.args.opt_parse("num-attempts")?.unwrap_or(30);
126    cmd.args.done()?;
127
128    println!("Verifying {key_pattern} in S3 bucket {bucket} path {prefix_path}...");
129
130    let client = mz_aws_util::s3::new_client(&state.aws_config);
131
132    let mut attempts = 0;
133    while attempts <= num_attempts {
134        attempts += 1;
135        let files = client
136            .list_objects_v2()
137            .bucket(&bucket)
138            .prefix(&format!("{}/", prefix_path))
139            .send()
140            .await?;
141        match files.contents {
142            Some(files) => {
143                let files: Vec<_> = files
144                    .iter()
145                    .filter(|obj| key_pattern.is_match(obj.key().unwrap()))
146                    .map(|obj| obj.key().unwrap())
147                    .collect();
148                if !files.is_empty() {
149                    println!("Found matching files: {files:?}");
150                    return Ok(ControlFlow::Continue);
151                }
152            }
153            _ => thread::sleep(Duration::from_secs(1)),
154        }
155    }
156
157    bail!("Did not find matching files in bucket {bucket} prefix {prefix_path}");
158}
159
160fn rows_from_parquet(bytes: bytes::Bytes) -> Vec<String> {
161    let reader =
162        parquet::arrow::arrow_reader::ParquetRecordBatchReader::try_new(bytes, 1_000_000).unwrap();
163
164    let mut ret = vec![];
165    let format_options = FormatOptions::default();
166    for batch in reader {
167        let batch = batch.unwrap();
168        let converters = batch
169            .columns()
170            .iter()
171            .map(|a| ArrayFormatter::try_new(a.as_ref(), &format_options).unwrap())
172            .collect::<Vec<_>>();
173
174        for row_idx in 0..batch.num_rows() {
175            let mut buf = String::new();
176            for (col_idx, converter) in converters.iter().enumerate() {
177                if col_idx > 0 {
178                    buf.push_str(" ");
179                }
180                converter.value(row_idx).write(&mut buf).unwrap();
181            }
182            ret.push(buf);
183        }
184    }
185    ret
186}
187
188pub async fn run_upload(
189    mut cmd: BuiltinCommand,
190    state: &State,
191) -> Result<ControlFlow, anyhow::Error> {
192    let key = cmd.args.string("key")?;
193    let bucket = cmd.args.string("bucket")?;
194
195    let compression = build_compression(&mut cmd)?;
196    let content = build_contents(&mut cmd)?;
197
198    let aws_client = mz_aws_util::s3::new_client(&state.aws_config);
199
200    // TODO(parkmycar): Stream data to S3. The ByteStream type from the AWS config is a bit
201    // cumbersome to work with, so for now just stick with this.
202    let mut body = vec![];
203    for line in content {
204        body.extend(&line);
205        body.push(b'\n');
206    }
207
208    let mut reader: Pin<Box<dyn AsyncRead + Send + Sync>> = match compression {
209        Compression::None => Box::pin(&body[..]),
210        Compression::Gzip => Box::pin(GzipEncoder::new(&body[..])),
211        Compression::Bzip2 => Box::pin(BzEncoder::new(&body[..])),
212        Compression::Xz => Box::pin(XzEncoder::new(&body[..])),
213        Compression::Zstd => Box::pin(ZstdEncoder::new(&body[..])),
214    };
215    let mut content = vec![];
216    reader
217        .read_to_end(&mut content)
218        .await
219        .context("compressing")?;
220
221    println!("Uploading file to S3 bucket {bucket}/{key}");
222
223    // Upload the file to S3.
224    aws_client
225        .put_object()
226        .bucket(&bucket)
227        .key(&key)
228        .body(content.into())
229        .send()
230        .await
231        .context("s3 put")?;
232
233    Ok(ControlFlow::Continue)
234}
235
236pub async fn run_set_presigned_url(
237    mut cmd: BuiltinCommand,
238    state: &mut State,
239) -> Result<ControlFlow, anyhow::Error> {
240    let key = cmd.args.string("key")?;
241    let bucket = cmd.args.string("bucket")?;
242    let var_name = cmd.args.string("var-name")?;
243
244    let aws_client = mz_aws_util::s3::new_client(&state.aws_config);
245    let presign_config = mz_aws_util::s3::new_presigned_config();
246    let request = aws_client
247        .get_object()
248        .bucket(&bucket)
249        .key(&key)
250        .presigned(presign_config)
251        .await
252        .context("s3 presign")?;
253
254    println!("Setting '{var_name}' to presigned URL for {bucket}/{key}");
255    state.cmd_vars.insert(var_name, request.uri().to_string());
256
257    Ok(ControlFlow::Continue)
258}