mz_aws_util/
s3_uploader.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use anyhow::anyhow;
11use aws_sdk_s3::Client;
12use aws_sdk_s3::error::SdkError;
13use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadError;
14use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadError;
15use aws_sdk_s3::operation::upload_part::UploadPartError;
16use aws_sdk_s3::primitives::ByteStream;
17use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
18use aws_types::sdk_config::SdkConfig;
19use bytes::{Bytes, BytesMut};
20use bytesize::ByteSize;
21use mz_ore::cast::CastFrom;
22use mz_ore::error::ErrorExt;
23use mz_ore::task::{JoinHandle, JoinHandleExt, spawn};
24
25/// A multi part uploader which can upload a single object across multiple parts
26/// and keeps track of state to eventually finish the upload process.
27/// The caller does not need to know the final number of parts beforehand.
28///
29/// The caller should get an instance by calling `S3MultiPartUploader::try_new` first.
30/// Each part can be added by calling `add_chunk`, and can be called one or more times
31/// and eventually finish the multi part upload by calling `finish` method.
32#[derive(Debug)]
33pub struct S3MultiPartUploader {
34    client: Client,
35    // Config settings for this particular multi part upload.
36    config: S3MultiPartUploaderConfig,
37    // The s3 bucket.
38    bucket: String,
39    // The s3 key of the file being uploaded.
40    key: String,
41    // The upload ID for the ongoing multi part upload.
42    upload_id: String,
43    // The current part count.
44    part_count: i32,
45    // Number of bytes uploaded till now.
46    total_bytes_uploaded: u64,
47    // A buffer to accumulate data till it reaches `part_size_limit` in size, when it
48    // will be uploaded as a part for the multi-part upload.
49    buffer: BytesMut,
50    // The task handles for each part upload.
51    upload_handles: Vec<JoinHandle<Result<(Option<String>, i32), S3MultiPartUploadError>>>,
52}
53
54/// The largest allowable part number (inclusive).
55///
56/// From <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
57pub const AWS_S3_MAX_PART_COUNT: i32 = 10_000;
58/// The minimum size of a part in a multipart upload.
59///
60/// This minimum doesn't apply to the last chunk, which can be any size.
61///
62/// From <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
63const AWS_S3_MIN_PART_SIZE: ByteSize = ByteSize::mib(5);
64/// The maximum size of a part in a multipart upload.
65///
66/// From <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
67const AWS_S3_MAX_PART_SIZE: ByteSize = ByteSize::gib(5);
68/// The maximum size of an object in s3.
69///
70/// From <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
71const AWS_S3_MAX_OBJECT_SIZE: ByteSize = ByteSize::tib(5);
72
73/// Information about a completed multi part upload after `finish` is called.
74#[derive(Debug)]
75pub struct CompletedUpload {
76    /// The total number of parts in the multi part upload.
77    pub part_count: u32,
78    /// The total number of bytes uploaded in the multi part upload.
79    pub total_bytes_uploaded: u64,
80    pub bucket: String,
81    pub key: String,
82}
83
84/// Configuration object to configure the behaviour of the `S3MultiPartUploader`.
85#[derive(Debug)]
86pub struct S3MultiPartUploaderConfig {
87    /// Size of data buffered in memory before being uploaded as a part.
88    pub part_size_limit: u64,
89    /// The max file size of the file uploaded to s3 by an `S3MultiPartUploader` instance.
90    pub file_size_limit: u64,
91}
92
93impl S3MultiPartUploaderConfig {
94    /// Choosing a reasonable default for the maximum file size which
95    /// this uploader can upload. This can be overridden in the
96    /// `S3MultiPartUploaderConfig` config.
97    const DEFAULT_MAX_FILE_SIZE: ByteSize = ByteSize::gib(5);
98    /// Choosing a reasonable default for a part size. This can be overridden in the
99    /// `S3MultiPartUploaderConfig` config.
100    const DEFAULT_PART_SIZE_LIMIT: ByteSize = ByteSize::mib(10);
101
102    /// As per S3 limits, the part size cannot be less than 5MiB and cannot exceed 5GiB.
103    /// As per S3 limits, the object size cannot exceed 5TiB.
104    fn validate(&self) -> Result<(), anyhow::Error> {
105        let S3MultiPartUploaderConfig {
106            part_size_limit,
107            file_size_limit,
108        } = self;
109        if part_size_limit < &AWS_S3_MIN_PART_SIZE.as_u64()
110            || part_size_limit > &AWS_S3_MAX_PART_SIZE.as_u64()
111        {
112            return Err(anyhow!(format!(
113                "invalid part size: {}, should be between {} and {} bytes",
114                part_size_limit,
115                AWS_S3_MIN_PART_SIZE.as_u64(),
116                AWS_S3_MAX_PART_SIZE.as_u64()
117            )));
118        }
119        if file_size_limit > &AWS_S3_MAX_OBJECT_SIZE.as_u64() {
120            return Err(anyhow!(format!(
121                "invalid file size: {}, cannot exceed {} bytes",
122                file_size_limit,
123                AWS_S3_MAX_OBJECT_SIZE.as_u64()
124            )));
125        }
126        let max_parts_count: u64 = AWS_S3_MAX_PART_COUNT.try_into().expect("i32 to u64");
127        // Using `div_ceil` because we want the fraction to be rounded up i.e. 4.5 should be rounded
128        // to 5 instead of 4 to accurately get the required number of parts.
129        let estimated_parts_count = file_size_limit.div_ceil(*part_size_limit);
130        if estimated_parts_count > max_parts_count {
131            return Err(anyhow!(format!(
132                "total number of possible parts (file_size_limit / part_size_limit): {}, cannot exceed {}",
133                estimated_parts_count, AWS_S3_MAX_PART_COUNT
134            )));
135        }
136        Ok(())
137    }
138}
139
140impl Default for S3MultiPartUploaderConfig {
141    fn default() -> Self {
142        Self {
143            part_size_limit: Self::DEFAULT_PART_SIZE_LIMIT.as_u64(),
144            file_size_limit: Self::DEFAULT_MAX_FILE_SIZE.as_u64(),
145        }
146    }
147}
148
149impl S3MultiPartUploader {
150    /// Creates a an instance of `S3MultiPartUploader` for the given `bucket` and `path`.
151    /// This starts the multi part upload by making a `create_multipart_upload` call, and
152    /// initializes all the internal state required to track the ongoing upload.
153    pub async fn try_new(
154        sdk_config: &SdkConfig,
155        bucket: String,
156        key: String,
157        config: S3MultiPartUploaderConfig,
158    ) -> Result<S3MultiPartUploader, S3MultiPartUploadError> {
159        // Validate the config
160        config.validate()?;
161
162        let client = crate::s3::new_client(sdk_config);
163        let res = client
164            .create_multipart_upload()
165            .bucket(&bucket)
166            .key(&key)
167            .send()
168            .await?;
169        let upload_id = res
170            .upload_id()
171            .ok_or_else(|| anyhow!("create_multipart_upload response missing upload id"))?
172            .to_string();
173        Ok(S3MultiPartUploader {
174            client,
175            bucket,
176            key,
177            upload_id,
178            part_count: 0,
179            total_bytes_uploaded: 0,
180            buffer: Default::default(),
181            config,
182            upload_handles: Default::default(),
183        })
184    }
185
186    /// Adds the `data` to the internal buffer and flushes the buffer if it is more than
187    /// the part threshold defined in `S3MultiPartUploaderConfig`.
188    /// Returns an `UploadExceedsMaxFileLimit` error if the upload will exceed the configured `file_size_limit`,
189    /// unless no data has been added yet. In which case, it will try to do an upload if the data size
190    /// is under `part_size_limit` * 10000.
191    pub fn buffer_chunk(&mut self, data: &[u8]) -> Result<(), S3MultiPartUploadError> {
192        let data_len = u64::cast_from(data.len());
193
194        let aws_max_part_count: u64 = AWS_S3_MAX_PART_COUNT.try_into().expect("i32 to u64");
195        let absolute_max_file_limit = std::cmp::min(
196            self.config.part_size_limit * aws_max_part_count,
197            AWS_S3_MAX_OBJECT_SIZE.as_u64(),
198        );
199
200        // If no data has been uploaded yet, we can still do an upload upto `absolute_max_file_limit`.
201        let can_force_first_upload = self.added_bytes() == 0 && data_len <= absolute_max_file_limit;
202
203        if data_len <= self.remaining_bytes_limit() || can_force_first_upload {
204            self.buffer.extend_from_slice(data);
205            self.flush_chunks()?;
206            Ok(())
207        } else {
208            Err(S3MultiPartUploadError::UploadExceedsMaxFileLimit(
209                self.config.file_size_limit,
210            ))
211        }
212    }
213
214    /// Finishes the multi part upload.
215    ///
216    /// Returns the number of parts and number of bytes uploaded.
217    pub async fn finish(mut self) -> Result<CompletedUpload, S3MultiPartUploadError> {
218        let remaining = self.buffer.split();
219        self.upload_part_internal(remaining.freeze())?;
220
221        let mut parts: Vec<CompletedPart> = Vec::with_capacity(self.upload_handles.len());
222        for handle in self.upload_handles {
223            let (etag, part_num) = handle.wait_and_assert_finished().await?;
224            match etag {
225                Some(etag) => {
226                    parts.push(
227                        CompletedPart::builder()
228                            .e_tag(etag)
229                            .part_number(part_num)
230                            .build(),
231                    );
232                }
233                None => Err(anyhow!("etag for part {part_num} is None"))?,
234            }
235        }
236
237        self.client
238            .complete_multipart_upload()
239            .bucket(&self.bucket)
240            .key(&self.key)
241            .upload_id(self.upload_id.clone())
242            .multipart_upload(
243                CompletedMultipartUpload::builder()
244                    .set_parts(Some(parts))
245                    .build(),
246            )
247            .send()
248            .await?;
249        Ok(CompletedUpload {
250            part_count: self.part_count.try_into().expect("i32 to u32"),
251            total_bytes_uploaded: self.total_bytes_uploaded,
252            bucket: self.bucket,
253            key: self.key,
254        })
255    }
256
257    fn buffer_size(&self) -> u64 {
258        u64::cast_from(self.buffer.len())
259    }
260
261    /// Internal method, returns the amount of bytes which can still be added to the multi-part upload.
262    /// without exceeding `file_size_limit`.
263    fn remaining_bytes_limit(&self) -> u64 {
264        self.config
265            .file_size_limit
266            .saturating_sub(self.added_bytes())
267    }
268
269    /// Internal method, returns the number of bytes processed till now.
270    pub fn added_bytes(&self) -> u64 {
271        self.total_bytes_uploaded + self.buffer_size()
272    }
273
274    /// Internal method to continuously flush and upload part from the buffer till it is
275    /// under the configured `part_size_limit`.
276    fn flush_chunks(&mut self) -> Result<(), S3MultiPartUploadError> {
277        let part_size_limit = self.config.part_size_limit;
278        // TODO (mouli): can probably parallelize the calls here.
279        while self.buffer_size() > part_size_limit {
280            let data = self.buffer.split_to(usize::cast_from(part_size_limit));
281            self.upload_part_internal(data.freeze())?;
282        }
283        Ok(())
284    }
285
286    /// Internal method which actually uploads a single part and updates state.
287    fn upload_part_internal(&mut self, data: Bytes) -> Result<(), S3MultiPartUploadError> {
288        let num_of_bytes: u64 = u64::cast_from(data.len());
289
290        let next_part_number = self.part_count + 1;
291        if next_part_number > AWS_S3_MAX_PART_COUNT {
292            return Err(S3MultiPartUploadError::ExceedsMaxPartNumber);
293        }
294        let client = self.client.clone();
295        let bucket = self.bucket.clone();
296        let key = self.key.clone();
297        let upload_id = self.upload_id.clone();
298
299        let handle = spawn(|| "s3::upload_part", async move {
300            let res = client
301                .upload_part()
302                .bucket(&bucket)
303                .key(&key)
304                .upload_id(upload_id)
305                .part_number(next_part_number)
306                .body(ByteStream::from(data))
307                .send()
308                .await?;
309            Ok((res.e_tag, next_part_number))
310        });
311        self.upload_handles.push(handle);
312
313        self.part_count = next_part_number;
314        self.total_bytes_uploaded += num_of_bytes;
315        Ok(())
316    }
317}
318
319#[derive(thiserror::Error, Debug)]
320pub enum S3MultiPartUploadError {
321    #[error(
322        "multi-part upload cannot have more than {} parts",
323        AWS_S3_MAX_PART_COUNT
324    )]
325    ExceedsMaxPartNumber,
326    #[error("multi-part upload will exceed configured file_size_limit: {} bytes", .0)]
327    UploadExceedsMaxFileLimit(u64),
328    #[error("{}", .0.display_with_causes())]
329    CreateMultipartUploadError(#[from] SdkError<CreateMultipartUploadError>),
330    #[error("{}", .0.display_with_causes())]
331    UploadPartError(#[from] SdkError<UploadPartError>),
332    #[error("{}", .0.display_with_causes())]
333    CompleteMultipartUploadError(#[from] SdkError<CompleteMultipartUploadError>),
334    #[error("{}", .0.display_with_causes())]
335    Other(#[from] anyhow::Error),
336}
337
338/// On CI, these tests are enabled by adding the scratch-aws-access plugin
339/// to the `cargo-test` step in `ci/test/pipeline.template.yml` and setting
340/// `MZ_S3_UPLOADER_TEST_S3_BUCKET` in
341/// `ci/test/cargo-test/mzcompose.py`.
342///
343/// For a Materialize developer, to opt in to these tests locally for
344/// development, follow the AWS access guide:
345///
346/// ```text
347/// https://www.notion.so/materialize/AWS-access-5fbd9513dcdc4e11a7591e8caa5f63fe
348/// ```
349///
350/// then running `source src/aws-util/src/setup_test_env_mz.sh`. You will also have
351/// to run `aws sso login` if you haven't recently.
352#[cfg(test)]
353mod tests {
354    use bytes::Bytes;
355    use uuid::Uuid;
356
357    use super::*;
358    use crate::{defaults, s3};
359
360    fn s3_bucket_key_for_test() -> Option<(String, String)> {
361        let bucket = match std::env::var("MZ_S3_UPLOADER_TEST_S3_BUCKET") {
362            Ok(bucket) => bucket,
363            Err(_) => {
364                if mz_ore::env::is_var_truthy("CI") {
365                    panic!("CI is supposed to run this test but something has gone wrong!");
366                }
367                return None;
368            }
369        };
370
371        let prefix = Uuid::new_v4().to_string();
372        let key = format!("cargo_test/{}/file", prefix);
373        Some((bucket, key))
374    }
375
376    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
377    #[cfg_attr(coverage, ignore)] // https://github.com/MaterializeInc/database-issues/issues/5586
378    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_method` on OS `linux`
379    #[ignore] // TODO: Reenable against minio so it can run locally
380    async fn multi_part_upload_success() -> Result<(), S3MultiPartUploadError> {
381        let sdk_config = defaults().load().await;
382        let (bucket, key) = match s3_bucket_key_for_test() {
383            Some(tuple) => tuple,
384            None => return Ok(()),
385        };
386
387        let config = S3MultiPartUploaderConfig::default();
388        let mut uploader =
389            S3MultiPartUploader::try_new(&sdk_config, bucket.clone(), key.clone(), config).await?;
390
391        let expected_data = "onetwothree";
392        uploader.buffer_chunk(b"one")?;
393        uploader.buffer_chunk(b"two")?;
394        uploader.buffer_chunk(b"three")?;
395
396        // This should trigger one single part upload.
397        let CompletedUpload {
398            part_count,
399            total_bytes_uploaded,
400            bucket: _,
401            key: _,
402        } = uploader.finish().await?;
403
404        // Getting the uploaded object from s3 and validating the contents.
405        let s3_client = s3::new_client(&sdk_config);
406        let uploaded_object = s3_client
407            .get_object()
408            .bucket(bucket)
409            .key(key)
410            .part_number(1) // fetching a particular part, so that the `parts_count` is populated in the result
411            .send()
412            .await
413            .unwrap();
414
415        let uploaded_parts_count: u32 = uploaded_object.parts_count().unwrap().try_into().unwrap();
416        assert_eq!(uploaded_parts_count, part_count);
417        assert_eq!(part_count, 1);
418
419        let body = uploaded_object.body.collect().await.unwrap().into_bytes();
420        assert_eq!(body, expected_data);
421
422        let expected_bytes: u64 = Bytes::from(expected_data).len().try_into().unwrap();
423        assert_eq!(total_bytes_uploaded, expected_bytes);
424
425        Ok(())
426    }
427
428    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
429    #[cfg_attr(coverage, ignore)] // https://github.com/MaterializeInc/database-issues/issues/5586
430    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_method` on OS `linux`
431    #[ignore] // TODO: Reenable against minio so it can run locally
432    async fn multi_part_upload_buffer() -> Result<(), S3MultiPartUploadError> {
433        let sdk_config = defaults().load().await;
434        let (bucket, key) = match s3_bucket_key_for_test() {
435            Some(tuple) => tuple,
436            None => return Ok(()),
437        };
438
439        let config = S3MultiPartUploaderConfig {
440            part_size_limit: ByteSize::mib(5).as_u64(),
441            file_size_limit: ByteSize::mib(10).as_u64(),
442        };
443        let mut uploader =
444            S3MultiPartUploader::try_new(&sdk_config, bucket.clone(), key.clone(), config).await?;
445
446        // Adding a chunk of 6MiB, should trigger an upload part since part_size_limit is 5MiB
447        let expected_data = vec![97; 6291456]; // 6MiB
448        let expected_bytes: u64 = u64::cast_from(expected_data.len());
449        uploader.buffer_chunk(&expected_data)?;
450
451        assert_eq!(uploader.remaining_bytes_limit(), ByteSize::mib(4).as_u64());
452
453        // Adding another 6MiB should return an error since file_size_limit is 10MiB
454        let error = uploader.buffer_chunk(&expected_data).unwrap_err();
455        assert!(matches!(
456            error,
457            S3MultiPartUploadError::UploadExceedsMaxFileLimit(_)
458        ));
459
460        let CompletedUpload {
461            part_count,
462            total_bytes_uploaded,
463            bucket: _,
464            key: _,
465        } = uploader.finish().await?;
466
467        // Getting the uploaded object from s3 and validating the contents.
468        let s3_client = s3::new_client(&sdk_config);
469        let uploaded_object = s3_client
470            .get_object()
471            .bucket(bucket)
472            .key(key)
473            .send()
474            .await
475            .unwrap();
476
477        assert_eq!(part_count, 2); // 6MiB should be split into two parts, 5MiB and 1MiB
478
479        let body = uploaded_object.body.collect().await.unwrap().into_bytes();
480        assert_eq!(body, *expected_data);
481
482        assert_eq!(total_bytes_uploaded, expected_bytes);
483
484        Ok(())
485    }
486
487    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
488    #[cfg_attr(coverage, ignore)] // https://github.com/MaterializeInc/database-issues/issues/5586
489    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_method` on OS `linux`
490    #[ignore] // TODO: Reenable against minio so it can run locally
491    async fn multi_part_upload_no_data() -> Result<(), S3MultiPartUploadError> {
492        let sdk_config = defaults().load().await;
493        let (bucket, key) = match s3_bucket_key_for_test() {
494            Some(tuple) => tuple,
495            None => return Ok(()),
496        };
497
498        let config = Default::default();
499        let uploader =
500            S3MultiPartUploader::try_new(&sdk_config, bucket.clone(), key.clone(), config).await?;
501
502        // Calling finish without adding any data should succeed.
503        uploader.finish().await.unwrap();
504
505        // The file should exist but have no content.
506        let s3_client = s3::new_client(&sdk_config);
507        let uploaded_object = s3_client
508            .get_object()
509            .bucket(bucket)
510            .key(key)
511            .send()
512            .await
513            .unwrap();
514
515        assert_eq!(uploaded_object.content_length(), Some(0));
516
517        Ok(())
518    }
519
520    #[mz_ore::test]
521    fn test_invalid_configs() {
522        let config = S3MultiPartUploaderConfig {
523            part_size_limit: ByteSize::mib(5).as_u64() - 1,
524            file_size_limit: ByteSize::gib(5).as_u64(),
525        };
526        let error = config.validate().unwrap_err();
527
528        assert_eq!(
529            error.to_string(),
530            "invalid part size: 5242879, should be between 5242880 and 5368709120 bytes"
531        );
532
533        let config = S3MultiPartUploaderConfig {
534            part_size_limit: ByteSize::mib(5).as_u64(),
535            // Subtracting 1 so that the overall multiplier is a fraction between 10000 and 10001
536            // to test rounding.
537            file_size_limit: (ByteSize::mib(5).as_u64() * 10001) - 1,
538        };
539        let error = config.validate().unwrap_err();
540        assert_eq!(
541            error.to_string(),
542            "total number of possible parts (file_size_limit / part_size_limit): 10001, cannot exceed 10000",
543        );
544    }
545}