mz_testdrive/action/
s3.rs
1use std::pin::Pin;
11use std::str;
12use std::thread;
13use std::time::Duration;
14
15use anyhow::Context;
16use anyhow::bail;
17use arrow::util::display::ArrayFormatter;
18use arrow::util::display::FormatOptions;
19use async_compression::tokio::bufread::{BzEncoder, GzipEncoder, XzEncoder, ZstdEncoder};
20use regex::Regex;
21use tokio::io::{AsyncRead, AsyncReadExt};
22
23use crate::action::file::Compression;
24use crate::action::file::build_compression;
25use crate::action::file::build_contents;
26use crate::action::{ControlFlow, State};
27use crate::parser::BuiltinCommand;
28
29pub async fn run_verify_data(
30 mut cmd: BuiltinCommand,
31 state: &State,
32) -> Result<ControlFlow, anyhow::Error> {
33 let mut expected_body = cmd
34 .input
35 .into_iter()
36 .map(|line| {
38 line.trim_end_matches("// allow-trailing-whitespace")
39 .to_string()
40 })
41 .collect::<Vec<String>>();
42 let bucket: String = cmd.args.parse("bucket")?;
43 let key: String = cmd.args.parse("key")?;
44 let sort_rows = cmd.args.opt_bool("sort-rows")?.unwrap_or(false);
45 cmd.args.done()?;
46
47 println!("Verifying contents of S3 bucket {bucket} key {key}...");
48
49 let client = mz_aws_util::s3::new_client(&state.aws_config);
50
51 let mut attempts = 0;
54 let all_files;
55 loop {
56 attempts += 1;
57 if attempts > 10 {
58 bail!("found incomplete sentinel file in path {key} after 10 attempts")
59 }
60
61 let files = client
62 .list_objects_v2()
63 .bucket(&bucket)
64 .prefix(&format!("{}/", key))
65 .send()
66 .await?;
67 match files.contents {
68 Some(files)
69 if files
70 .iter()
71 .any(|obj| obj.key().map_or(false, |key| key.contains("INCOMPLETE"))) =>
72 {
73 thread::sleep(Duration::from_secs(1))
74 }
75 None => bail!("no files found in bucket {bucket} key {key}"),
76 Some(files) => {
77 all_files = files;
78 break;
79 }
80 }
81 }
82
83 let mut rows = vec![];
84 for obj in all_files.iter() {
85 let file = client
86 .get_object()
87 .bucket(&bucket)
88 .key(obj.key().unwrap())
89 .send()
90 .await?;
91 let bytes = file.body.collect().await?.into_bytes();
92
93 let new_rows = match obj.key().unwrap() {
94 key if key.ends_with(".csv") => {
95 let actual_body = str::from_utf8(bytes.as_ref())?;
96 actual_body.lines().map(|l| l.to_string()).collect()
97 }
98 key if key.ends_with(".parquet") => rows_from_parquet(bytes),
99 key => bail!("unexpected file type: {key}"),
100 };
101 rows.extend(new_rows);
102 }
103 if sort_rows {
104 expected_body.sort();
105 rows.sort();
106 }
107 if rows != expected_body {
108 bail!(
109 "content did not match\nexpected:\n{:?}\n\nactual:\n{:?}",
110 expected_body,
111 rows
112 );
113 }
114
115 Ok(ControlFlow::Continue)
116}
117
118pub async fn run_verify_keys(
119 mut cmd: BuiltinCommand,
120 state: &State,
121) -> Result<ControlFlow, anyhow::Error> {
122 let bucket: String = cmd.args.parse("bucket")?;
123 let prefix_path: String = cmd.args.parse("prefix-path")?;
124 let key_pattern: Regex = cmd.args.parse("key-pattern")?;
125 let num_attempts = cmd.args.opt_parse("num-attempts")?.unwrap_or(30);
126 cmd.args.done()?;
127
128 println!("Verifying {key_pattern} in S3 bucket {bucket} path {prefix_path}...");
129
130 let client = mz_aws_util::s3::new_client(&state.aws_config);
131
132 let mut attempts = 0;
133 while attempts <= num_attempts {
134 attempts += 1;
135 let files = client
136 .list_objects_v2()
137 .bucket(&bucket)
138 .prefix(&format!("{}/", prefix_path))
139 .send()
140 .await?;
141 match files.contents {
142 Some(files) => {
143 let files: Vec<_> = files
144 .iter()
145 .filter(|obj| key_pattern.is_match(obj.key().unwrap()))
146 .map(|obj| obj.key().unwrap())
147 .collect();
148 if !files.is_empty() {
149 println!("Found matching files: {files:?}");
150 return Ok(ControlFlow::Continue);
151 }
152 }
153 _ => thread::sleep(Duration::from_secs(1)),
154 }
155 }
156
157 bail!("Did not find matching files in bucket {bucket} prefix {prefix_path}");
158}
159
160fn rows_from_parquet(bytes: bytes::Bytes) -> Vec<String> {
161 let reader =
162 parquet::arrow::arrow_reader::ParquetRecordBatchReader::try_new(bytes, 1_000_000).unwrap();
163
164 let mut ret = vec![];
165 let format_options = FormatOptions::default();
166 for batch in reader {
167 let batch = batch.unwrap();
168 let converters = batch
169 .columns()
170 .iter()
171 .map(|a| ArrayFormatter::try_new(a.as_ref(), &format_options).unwrap())
172 .collect::<Vec<_>>();
173
174 for row_idx in 0..batch.num_rows() {
175 let mut buf = String::new();
176 for (col_idx, converter) in converters.iter().enumerate() {
177 if col_idx > 0 {
178 buf.push_str(" ");
179 }
180 converter.value(row_idx).write(&mut buf).unwrap();
181 }
182 ret.push(buf);
183 }
184 }
185 ret
186}
187
188pub async fn run_upload(
189 mut cmd: BuiltinCommand,
190 state: &State,
191) -> Result<ControlFlow, anyhow::Error> {
192 let key = cmd.args.string("key")?;
193 let bucket = cmd.args.string("bucket")?;
194
195 let compression = build_compression(&mut cmd)?;
196 let content = build_contents(&mut cmd)?;
197
198 let aws_client = mz_aws_util::s3::new_client(&state.aws_config);
199
200 let mut body = vec![];
203 for line in content {
204 body.extend(&line);
205 body.push(b'\n');
206 }
207
208 let mut reader: Pin<Box<dyn AsyncRead + Send + Sync>> = match compression {
209 Compression::None => Box::pin(&body[..]),
210 Compression::Gzip => Box::pin(GzipEncoder::new(&body[..])),
211 Compression::Bzip2 => Box::pin(BzEncoder::new(&body[..])),
212 Compression::Xz => Box::pin(XzEncoder::new(&body[..])),
213 Compression::Zstd => Box::pin(ZstdEncoder::new(&body[..])),
214 };
215 let mut content = vec![];
216 reader
217 .read_to_end(&mut content)
218 .await
219 .context("compressing")?;
220
221 println!("Uploading file to S3 bucket {bucket}/{key}");
222
223 aws_client
225 .put_object()
226 .bucket(&bucket)
227 .key(&key)
228 .body(content.into())
229 .send()
230 .await
231 .context("s3 put")?;
232
233 Ok(ControlFlow::Continue)
234}
235
236pub async fn run_set_presigned_url(
237 mut cmd: BuiltinCommand,
238 state: &mut State,
239) -> Result<ControlFlow, anyhow::Error> {
240 let key = cmd.args.string("key")?;
241 let bucket = cmd.args.string("bucket")?;
242 let var_name = cmd.args.string("var-name")?;
243
244 let aws_client = mz_aws_util::s3::new_client(&state.aws_config);
245 let presign_config = mz_aws_util::s3::new_presigned_config();
246 let request = aws_client
247 .get_object()
248 .bucket(&bucket)
249 .key(&key)
250 .presigned(presign_config)
251 .await
252 .context("s3 presign")?;
253
254 println!("Setting '{var_name}' to presigned URL for {bucket}/{key}");
255 state.cmd_vars.insert(var_name, request.uri().to_string());
256
257 Ok(ControlFlow::Continue)
258}