1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910use anyhow::anyhow;
11use aws_types::sdk_config::SdkConfig;
12use mz_aws_util::s3_uploader::{
13 CompletedUpload, S3MultiPartUploadError, S3MultiPartUploader, S3MultiPartUploaderConfig,
14};
15use mz_ore::assert_none;
16use mz_ore::cast::CastFrom;
17use mz_ore::task::JoinHandleExt;
18use mz_pgcopy::{CopyFormatParams, encode_copy_format, encode_copy_format_header};
19use mz_repr::{GlobalId, RelationDesc, Row};
20use mz_storage_types::sinks::s3_oneshot_sink::S3KeyManager;
21use mz_storage_types::sinks::{S3SinkFormat, S3UploadInfo};
22use tracing::info;
2324use super::{CopyToParameters, CopyToS3Uploader};
2526/// Required state to upload batches to S3
27pub(super) struct PgCopyUploader {
28/// The output description.
29desc: RelationDesc,
30/// Params to format the data.
31format: CopyFormatParams<'static>,
32/// The index of the current file within the batch.
33file_index: usize,
34/// Provides the appropriate bucket and object keys to use for uploads
35key_manager: S3KeyManager,
36/// Identifies the batch that files uploaded by this uploader belong to
37batch: u64,
38/// The desired file size. A new file upload will be started
39 /// when the size exceeds this amount.
40max_file_size: u64,
41/// The aws sdk config.
42 /// This is an option so that we can get an owned value later to move to a
43 /// spawned tokio task.
44sdk_config: Option<SdkConfig>,
45/// Multi-part uploader for the current file.
46 /// Keeping the uploader in an `Option` to later take owned value.
47current_file_uploader: Option<S3MultiPartUploader>,
48/// Upload parameters.
49params: CopyToParameters,
50}
5152impl CopyToS3Uploader for PgCopyUploader {
53fn new(
54 sdk_config: SdkConfig,
55 connection_details: S3UploadInfo,
56 sink_id: &GlobalId,
57 batch: u64,
58 params: CopyToParameters,
59 ) -> Result<PgCopyUploader, anyhow::Error> {
60match connection_details.format {
61 S3SinkFormat::PgCopy(format_params) => Ok(PgCopyUploader {
62 desc: connection_details.desc,
63 sdk_config: Some(sdk_config),
64 format: format_params,
65 key_manager: S3KeyManager::new(sink_id, &connection_details.uri),
66 batch,
67 max_file_size: connection_details.max_file_size,
68 file_index: 0,
69 current_file_uploader: None,
70 params,
71 }),
72_ => anyhow::bail!("Expected PgCopy format"),
73 }
74 }
7576/// Finishes any remaining in-progress upload.
77async fn finish(&mut self) -> Result<(), anyhow::Error> {
78if let Some(uploader) = self.current_file_uploader.take() {
79// Moving the aws s3 calls onto tokio tasks instead of using timely runtime.
80let handle =
81 mz_ore::task::spawn(|| "s3_uploader::finish", async { uploader.finish().await });
82let CompletedUpload {
83 part_count,
84 total_bytes_uploaded,
85 bucket,
86 key,
87 } = handle.wait_and_assert_finished().await?;
88info!(
89"finished upload: bucket {}, key {}, bytes_uploaded {}, parts_uploaded {}",
90 bucket, key, total_bytes_uploaded, part_count
91 );
92 }
93Ok(())
94 }
9596/// Appends the row to the in-progress upload where it is buffered till it reaches the configured
97 /// `part_size_limit` after which the `S3MultiPartUploader` will upload that part. In case it will
98 /// exceed the max file size of the ongoing upload, then a new `S3MultiPartUploader` for a new file will
99 /// be created and the row data will be appended there.
100async fn append_row(&mut self, row: &Row) -> Result<(), anyhow::Error> {
101let mut buf: Vec<u8> = vec![];
102// encode the row and write to temp buffer.
103encode_copy_format(&self.format, row, self.desc.typ(), &mut buf)
104 .map_err(|_| anyhow!("error encoding row"))?;
105106if self.current_file_uploader.is_none() {
107self.start_new_file_upload().await?;
108 }
109let mut uploader = self.current_file_uploader.as_mut().expect("known exists");
110111match uploader.buffer_chunk(&buf) {
112Ok(_) => Ok(()),
113Err(S3MultiPartUploadError::UploadExceedsMaxFileLimit(_)) => {
114// Start a multi part upload of next file.
115self.start_new_file_upload().await?;
116 uploader = self.current_file_uploader.as_mut().expect("known exists");
117 uploader.buffer_chunk(&buf)?;
118Ok(())
119 }
120Err(e) => Err(e.into()),
121 }
122 }
123124async fn force_new_file(&mut self) -> Result<(), anyhow::Error> {
125self.start_new_file_upload().await
126}
127}
128129impl PgCopyUploader {
130/// Creates the uploader for the next file and starts the multi part upload.
131async fn start_new_file_upload(&mut self) -> Result<(), anyhow::Error> {
132self.finish().await?;
133assert_none!(self.current_file_uploader);
134135self.file_index += 1;
136let object_key =
137self.key_manager
138 .data_key(self.batch, self.file_index, self.format.file_extension());
139let bucket = self.key_manager.bucket.clone();
140info!("starting upload: bucket {}, key {}", &bucket, &object_key);
141let sdk_config = self
142.sdk_config
143 .take()
144 .expect("sdk_config should always be present");
145let max_file_size = self.max_file_size;
146// Moving the aws s3 calls onto tokio tasks instead of using timely runtime.
147let part_size_limit = u64::cast_from(self.params.s3_multipart_part_size_bytes);
148let handle = mz_ore::task::spawn(|| "s3_uploader::try_new", async move {
149let uploader = S3MultiPartUploader::try_new(
150&sdk_config,
151 bucket,
152 object_key,
153 S3MultiPartUploaderConfig {
154 part_size_limit,
155 file_size_limit: max_file_size,
156 },
157 )
158 .await;
159 (uploader, sdk_config)
160 });
161let (uploader, sdk_config) = handle.wait_and_assert_finished().await;
162self.sdk_config = Some(sdk_config);
163let mut uploader = uploader?;
164if self.format.requires_header() {
165let mut buf: Vec<u8> = vec![];
166 encode_copy_format_header(&self.format, &self.desc, &mut buf)
167 .map_err(|_| anyhow!("error encoding header"))?;
168 uploader.buffer_chunk(&buf)?;
169 }
170self.current_file_uploader = Some(uploader);
171Ok(())
172 }
173}
174175/// On CI, these tests are enabled by adding the scratch-aws-access plugin
176/// to the `cargo-test` step in `ci/test/pipeline.template.yml` and setting
177/// `MZ_S3_UPLOADER_TEST_S3_BUCKET` in
178/// `ci/test/cargo-test/mzcompose.py`.
179///
180/// For a Materialize developer, to opt in to these tests locally for
181/// development, follow the AWS access guide:
182///
183/// ```text
184/// https://www.notion.so/materialize/AWS-access-5fbd9513dcdc4e11a7591e8caa5f63fe
185/// ```
186///
187/// then running `source src/aws-util/src/setup_test_env_mz.sh`. You will also have
188/// to run `aws sso login` if you haven't recently.
189#[cfg(test)]
190mod tests {
191use bytesize::ByteSize;
192use mz_pgcopy::CopyFormatParams;
193use mz_repr::{ColumnName, ColumnType, Datum, RelationType};
194use uuid::Uuid;
195196use super::*;
197198fn s3_bucket_path_for_test() -> Option<(String, String)> {
199let bucket = match std::env::var("MZ_S3_UPLOADER_TEST_S3_BUCKET") {
200Ok(bucket) => bucket,
201Err(_) => {
202if mz_ore::env::is_var_truthy("CI") {
203panic!("CI is supposed to run this test but something has gone wrong!");
204 }
205return None;
206 }
207 };
208209let prefix = Uuid::new_v4().to_string();
210let path = format!("cargo_test/{}/file", prefix);
211Some((bucket, path))
212 }
213214#[mz_ore::test(tokio::test(flavor = "multi_thread"))]
215 #[cfg_attr(coverage, ignore)] // https://github.com/MaterializeInc/database-issues/issues/5586
216#[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_method` on OS `linux`
217#[ignore] // TODO: Reenable against minio so it can run locally
218async fn test_multiple_files() -> Result<(), anyhow::Error> {
219let sdk_config = mz_aws_util::defaults().load().await;
220let (bucket, path) = match s3_bucket_path_for_test() {
221Some(tuple) => tuple,
222None => return Ok(()),
223 };
224let sink_id = GlobalId::User(123);
225let batch = 456;
226let typ: RelationType = RelationType::new(vec![ColumnType {
227 scalar_type: mz_repr::ScalarType::String,
228 nullable: true,
229 }]);
230let column_names = vec![ColumnName::from("col1")];
231let desc = RelationDesc::new(typ, column_names.into_iter());
232let mut uploader = PgCopyUploader::new(
233 sdk_config.clone(),
234 S3UploadInfo {
235 uri: format!("s3://{}/{}", bucket, path),
236// this is only for testing, users will not be able to set value smaller than 16MB.
237max_file_size: ByteSize::b(6).as_u64(),
238 desc,
239 format: S3SinkFormat::PgCopy(CopyFormatParams::Csv(Default::default())),
240 },
241&sink_id,
242 batch,
243 CopyToParameters {
244 s3_multipart_part_size_bytes: 10 * 1024 * 1024,
245 arrow_builder_buffer_ratio: 100,
246 parquet_row_group_ratio: 100,
247 },
248 )?;
249let mut row = Row::default();
250// Even though this will exceed max_file_size, it should be successfully uploaded in a single file.
251row.packer().push(Datum::from("1234567"));
252 uploader.append_row(&row).await?;
253254// Since the max_file_size is 6B, this row will be uploaded to a new file.
255row.packer().push(Datum::Null);
256 uploader.append_row(&row).await?;
257258 row.packer().push(Datum::from("5678"));
259 uploader.append_row(&row).await?;
260261 uploader.finish().await?;
262263// Based on the max_file_size, the uploader should have uploaded two
264 // files, part-0001.csv and part-0002.csv
265let s3_client = mz_aws_util::s3::new_client(&sdk_config);
266let first_file = s3_client
267 .get_object()
268 .bucket(bucket.clone())
269 .key(format!(
270"{}/mz-{}-batch-{:04}-0001.csv",
271 path, sink_id, batch
272 ))
273 .send()
274 .await
275.unwrap();
276277let body = first_file.body.collect().await.unwrap().into_bytes();
278let expected_body: &[u8] = b"1234567\n";
279assert_eq!(body, *expected_body);
280281let second_file = s3_client
282 .get_object()
283 .bucket(bucket)
284 .key(format!(
285"{}/mz-{}-batch-{:04}-0002.csv",
286 path, sink_id, batch
287 ))
288 .send()
289 .await
290.unwrap();
291292let body = second_file.body.collect().await.unwrap().into_bytes();
293let expected_body: &[u8] = b"\n5678\n";
294assert_eq!(body, *expected_body);
295296Ok(())
297 }
298}