1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use anyhow::anyhow;
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadError;
use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadError;
use aws_sdk_s3::operation::upload_part::UploadPartError;
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
use aws_sdk_s3::Client;
use aws_types::sdk_config::SdkConfig;
use bytes::{Bytes, BytesMut};
use bytesize::ByteSize;
use mz_ore::cast::CastFrom;
use mz_ore::error::ErrorExt;
use mz_ore::task::{spawn, JoinHandle, JoinHandleExt};

/// A multi part uploader which can upload a single object across multiple parts
/// and keeps track of state to eventually finish the upload process.
/// The caller does not need to know the final number of parts beforehand.
///
/// The caller should get an instance by calling `S3MultiPartUploader::try_new` first.
/// Each part can be added by calling `add_chunk`, and can be called one or more times
/// and eventually finish the multi part upload by calling `finish` method.
#[derive(Debug)]
pub struct S3MultiPartUploader {
    client: Client,
    // Config settings for this particular multi part upload.
    config: S3MultiPartUploaderConfig,
    // The s3 bucket.
    bucket: String,
    // The s3 key of the file being uploaded.
    key: String,
    // The upload ID for the ongoing multi part upload.
    upload_id: String,
    // The current part count.
    part_count: i32,
    // Number of bytes uploaded till now.
    total_bytes_uploaded: u64,
    // A buffer to accumulate data till it reaches `part_size_limit` in size, when it
    // will be uploaded as a part for the multi-part upload.
    buffer: BytesMut,
    // The task handles for each part upload.
    upload_handles: Vec<JoinHandle<Result<(Option<String>, i32), S3MultiPartUploadError>>>,
}

/// The smallest allowable part number (inclusive).
///
/// From <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
const AWS_S3_MIN_PART_COUNT: i32 = 1;
/// The largest allowable part number (inclusive).
///
/// From <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
pub const AWS_S3_MAX_PART_COUNT: i32 = 10_000;
/// The minimum size of a part in a multipart upload.
///
/// This minimum doesn't apply to the last chunk, which can be any size.
///
/// From <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
const AWS_S3_MIN_PART_SIZE: ByteSize = ByteSize::mib(5);
/// The maximum size of a part in a multipart upload.
///
/// From <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
const AWS_S3_MAX_PART_SIZE: ByteSize = ByteSize::gib(5);
/// The maximum size of an object in s3.
///
/// From <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
const AWS_S3_MAX_OBJECT_SIZE: ByteSize = ByteSize::tib(5);

/// Information about a completed multi part upload after `finish` is called.
#[derive(Debug)]
pub struct CompletedUpload {
    /// The total number of parts in the multi part upload.
    pub part_count: u32,
    /// The total number of bytes uploaded in the multi part upload.
    pub total_bytes_uploaded: u64,
    pub bucket: String,
    pub key: String,
}

/// Configuration object to configure the behaviour of the `S3MultiPartUploader`.
#[derive(Debug)]
pub struct S3MultiPartUploaderConfig {
    /// Size of data buffered in memory before being uploaded as a part.
    pub part_size_limit: u64,
    /// The max file size of the file uploaded to s3 by an `S3MultiPartUploader` instance.
    pub file_size_limit: u64,
}

impl S3MultiPartUploaderConfig {
    /// Choosing a reasonable default for the maximum file size which
    /// this uploader can upload. This can be overridden in the
    /// `S3MultiPartUploaderConfig` config.
    const DEFAULT_MAX_FILE_SIZE: ByteSize = ByteSize::gib(5);
    /// Choosing a reasonable default for a part size. This can be overridden in the
    /// `S3MultiPartUploaderConfig` config.
    const DEFAULT_PART_SIZE_LIMIT: ByteSize = ByteSize::mib(10);

    /// As per S3 limits, the part size cannot be less than 5MiB and cannot exceed 5GiB.
    /// As per S3 limits, the object size cannot exceed 5TiB.
    fn validate(&self) -> Result<(), anyhow::Error> {
        let S3MultiPartUploaderConfig {
            part_size_limit,
            file_size_limit,
        } = self;
        if part_size_limit < &AWS_S3_MIN_PART_SIZE.as_u64()
            || part_size_limit > &AWS_S3_MAX_PART_SIZE.as_u64()
        {
            return Err(anyhow!(format!(
                "invalid part size: {}, should be between {} and {} bytes",
                part_size_limit,
                AWS_S3_MIN_PART_SIZE.as_u64(),
                AWS_S3_MAX_PART_SIZE.as_u64()
            )));
        }
        if file_size_limit > &AWS_S3_MAX_OBJECT_SIZE.as_u64() {
            return Err(anyhow!(format!(
                "invalid file size: {}, cannot exceed {} bytes",
                file_size_limit,
                AWS_S3_MAX_OBJECT_SIZE.as_u64()
            )));
        }
        let max_parts_count: u64 = AWS_S3_MAX_PART_COUNT.try_into().expect("i32 to u64");
        // Using `div_ceil` because we want the fraction to be rounded up i.e. 4.5 should be rounded
        // to 5 instead of 4 to accurately get the required number of parts.
        let estimated_parts_count = file_size_limit.div_ceil(*part_size_limit);
        if estimated_parts_count > max_parts_count {
            return Err(anyhow!(format!(
                "total number of possible parts (file_size_limit / part_size_limit): {}, cannot exceed {}",
                estimated_parts_count,
                AWS_S3_MAX_PART_COUNT
            )));
        }
        Ok(())
    }
}

impl Default for S3MultiPartUploaderConfig {
    fn default() -> Self {
        Self {
            part_size_limit: Self::DEFAULT_PART_SIZE_LIMIT.as_u64(),
            file_size_limit: Self::DEFAULT_MAX_FILE_SIZE.as_u64(),
        }
    }
}

impl S3MultiPartUploader {
    /// Creates a an instance of `S3MultiPartUploader` for the given `bucket` and `path`.
    /// This starts the multi part upload by making a `create_multipart_upload` call, and
    /// initializes all the internal state required to track the ongoing upload.
    pub async fn try_new(
        sdk_config: &SdkConfig,
        bucket: String,
        key: String,
        config: S3MultiPartUploaderConfig,
    ) -> Result<S3MultiPartUploader, S3MultiPartUploadError> {
        // Validate the config
        config.validate()?;

        let client = crate::s3::new_client(sdk_config);
        let res = client
            .create_multipart_upload()
            .bucket(&bucket)
            .key(&key)
            .send()
            .await?;
        let upload_id = res
            .upload_id()
            .ok_or(anyhow!(
                "create_multipart_upload response missing upload id"
            ))?
            .to_string();
        Ok(S3MultiPartUploader {
            client,
            bucket,
            key,
            upload_id,
            part_count: 0,
            total_bytes_uploaded: 0,
            buffer: Default::default(),
            config,
            upload_handles: Default::default(),
        })
    }

    /// Adds the `data` to the internal buffer and flushes the buffer if it is more than
    /// the part threshold defined in `S3MultiPartUploaderConfig`.
    /// Returns an `UploadExceedsMaxFileLimit` error if the upload will exceed the configured `file_size_limit`,
    /// unless no data has been added yet. In which case, it will try to do an upload if the data size
    /// is under `part_size_limit` * 10000.
    pub fn buffer_chunk(&mut self, data: &[u8]) -> Result<(), S3MultiPartUploadError> {
        let data_len = u64::cast_from(data.len());

        let aws_max_part_count: u64 = AWS_S3_MAX_PART_COUNT.try_into().expect("i32 to u64");
        let absolute_max_file_limit = std::cmp::min(
            self.config.part_size_limit * aws_max_part_count,
            AWS_S3_MAX_OBJECT_SIZE.as_u64(),
        );

        // If no data has been uploaded yet, we can still do an upload upto `absolute_max_file_limit`.
        let can_force_first_upload = self.added_bytes() == 0 && data_len <= absolute_max_file_limit;

        if data_len <= self.remaining_bytes_limit() || can_force_first_upload {
            self.buffer.extend_from_slice(data);
            self.flush_chunks()?;
            Ok(())
        } else {
            Err(S3MultiPartUploadError::UploadExceedsMaxFileLimit(
                self.config.file_size_limit,
            ))
        }
    }

    /// Method to finish the multi part upload. If the buffer is not empty,
    /// it flushes the buffer first and then makes a call to `complete_multipart_upload`.
    /// Returns the number of parts and number of bytes uploaded.
    pub async fn finish(mut self) -> Result<CompletedUpload, S3MultiPartUploadError> {
        if self.buffer.len() > 0 {
            let remaining = self.buffer.split();
            self.upload_part_internal(remaining.freeze())?;
        }

        if self.part_count < AWS_S3_MIN_PART_COUNT {
            return Err(S3MultiPartUploadError::AtLeastMinPartNumber);
        }

        let mut parts: Vec<CompletedPart> = Vec::with_capacity(self.upload_handles.len());
        for handle in self.upload_handles {
            let (etag, part_num) = handle.wait_and_assert_finished().await?;
            match etag {
                Some(etag) => {
                    parts.push(
                        CompletedPart::builder()
                            .e_tag(etag)
                            .part_number(part_num)
                            .build(),
                    );
                }
                None => Err(anyhow!("etag for part {part_num} is None"))?,
            }
        }

        self.client
            .complete_multipart_upload()
            .bucket(&self.bucket)
            .key(&self.key)
            .upload_id(self.upload_id.clone())
            .multipart_upload(
                CompletedMultipartUpload::builder()
                    .set_parts(Some(parts))
                    .build(),
            )
            .send()
            .await?;
        Ok(CompletedUpload {
            part_count: self.part_count.try_into().expect("i32 to u32"),
            total_bytes_uploaded: self.total_bytes_uploaded,
            bucket: self.bucket,
            key: self.key,
        })
    }

    fn buffer_size(&self) -> u64 {
        u64::cast_from(self.buffer.len())
    }

    /// Internal method, returns the amount of bytes which can still be added to the multi-part upload.
    /// without exceeding `file_size_limit`.
    fn remaining_bytes_limit(&self) -> u64 {
        self.config
            .file_size_limit
            .saturating_sub(self.added_bytes())
    }

    /// Internal method, returns the number of bytes processed till now.
    pub fn added_bytes(&self) -> u64 {
        self.total_bytes_uploaded + self.buffer_size()
    }

    /// Internal method to continuously flush and upload part from the buffer till it is
    /// under the configured `part_size_limit`.
    fn flush_chunks(&mut self) -> Result<(), S3MultiPartUploadError> {
        let part_size_limit = self.config.part_size_limit;
        // TODO (mouli): can probably parallelize the calls here.
        while self.buffer_size() > part_size_limit {
            let data = self.buffer.split_to(usize::cast_from(part_size_limit));
            self.upload_part_internal(data.freeze())?;
        }
        Ok(())
    }

    /// Internal method which actually uploads a single part and updates state.
    fn upload_part_internal(&mut self, data: Bytes) -> Result<(), S3MultiPartUploadError> {
        let num_of_bytes: u64 = u64::cast_from(data.len());

        let next_part_number = self.part_count + 1;
        if next_part_number > AWS_S3_MAX_PART_COUNT {
            return Err(S3MultiPartUploadError::ExceedsMaxPartNumber);
        }
        let client = self.client.clone();
        let bucket = self.bucket.clone();
        let key = self.key.clone();
        let upload_id = self.upload_id.clone();

        let handle = spawn(|| "s3::upload_part", async move {
            let res = client
                .upload_part()
                .bucket(&bucket)
                .key(&key)
                .upload_id(upload_id)
                .part_number(next_part_number)
                .body(ByteStream::from(data))
                .send()
                .await?;
            Ok((res.e_tag, next_part_number))
        });
        self.upload_handles.push(handle);

        self.part_count = next_part_number;
        self.total_bytes_uploaded += num_of_bytes;
        Ok(())
    }
}

#[derive(thiserror::Error, Debug)]
pub enum S3MultiPartUploadError {
    #[error(
        "multi-part upload cannot have more than {} parts",
        AWS_S3_MAX_PART_COUNT
    )]
    ExceedsMaxPartNumber,
    #[error(
        "multi-part upload should have at least {} part",
        AWS_S3_MIN_PART_COUNT
    )]
    AtLeastMinPartNumber,
    #[error("multi-part upload will exceed configured file_size_limit: {} bytes", .0)]
    UploadExceedsMaxFileLimit(u64),
    #[error("{}", .0.display_with_causes())]
    CreateMultipartUploadError(#[from] SdkError<CreateMultipartUploadError>),
    #[error("{}", .0.display_with_causes())]
    UploadPartError(#[from] SdkError<UploadPartError>),
    #[error("{}", .0.display_with_causes())]
    CompleteMultipartUploadError(#[from] SdkError<CompleteMultipartUploadError>),
    #[error("{}", .0.display_with_causes())]
    Other(#[from] anyhow::Error),
}

/// On CI, these tests are enabled by adding the scratch-aws-access plugin
/// to the `cargo-test` step in `ci/test/pipeline.template.yml` and setting
/// `MZ_S3_UPLOADER_TEST_S3_BUCKET` in
/// `ci/test/cargo-test/mzcompose.py`.
///
/// For a Materialize developer, to opt in to these tests locally for
/// development, follow the AWS access guide:
///
/// ```text
/// https://www.notion.so/materialize/AWS-access-5fbd9513dcdc4e11a7591e8caa5f63fe
/// ```
///
/// then running `source src/aws-util/src/setup_test_env_mz.sh`. You will also have
/// to run `aws sso login` if you haven't recently.
#[cfg(test)]
mod tests {
    use bytes::Bytes;
    use uuid::Uuid;

    use super::*;
    use crate::{defaults, s3};

    fn s3_bucket_key_for_test() -> Option<(String, String)> {
        let bucket = match std::env::var("MZ_S3_UPLOADER_TEST_S3_BUCKET") {
            Ok(bucket) => bucket,
            Err(_) => {
                if mz_ore::env::is_var_truthy("CI") {
                    panic!("CI is supposed to run this test but something has gone wrong!");
                }
                return None;
            }
        };

        let prefix = Uuid::new_v4().to_string();
        let key = format!("cargo_test/{}/file", prefix);
        Some((bucket, key))
    }

    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
    #[cfg_attr(coverage, ignore)] // https://github.com/MaterializeInc/materialize/issues/18898
    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_method` on OS `linux`
    async fn multi_part_upload_success() -> Result<(), S3MultiPartUploadError> {
        let sdk_config = defaults().load().await;
        let (bucket, key) = match s3_bucket_key_for_test() {
            Some(tuple) => tuple,
            None => return Ok(()),
        };

        let config = S3MultiPartUploaderConfig::default();
        let mut uploader =
            S3MultiPartUploader::try_new(&sdk_config, bucket.clone(), key.clone(), config).await?;

        let expected_data = "onetwothree";
        uploader.buffer_chunk(b"one")?;
        uploader.buffer_chunk(b"two")?;
        uploader.buffer_chunk(b"three")?;

        // This should trigger one single part upload.
        let CompletedUpload {
            part_count,
            total_bytes_uploaded,
            bucket: _,
            key: _,
        } = uploader.finish().await?;

        // Getting the uploaded object from s3 and validating the contents.
        let s3_client = s3::new_client(&sdk_config);
        let uploaded_object = s3_client
            .get_object()
            .bucket(bucket)
            .key(key)
            .part_number(1) // fetching a particular part, so that the `parts_count` is populated in the result
            .send()
            .await
            .unwrap();

        let uploaded_parts_count: u32 = uploaded_object.parts_count().unwrap().try_into().unwrap();
        assert_eq!(uploaded_parts_count, part_count);
        assert_eq!(part_count, 1);

        let body = uploaded_object.body.collect().await.unwrap().into_bytes();
        assert_eq!(body, expected_data);

        let expected_bytes: u64 = Bytes::from(expected_data).len().try_into().unwrap();
        assert_eq!(total_bytes_uploaded, expected_bytes);

        Ok(())
    }

    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
    #[cfg_attr(coverage, ignore)] // https://github.com/MaterializeInc/materialize/issues/18898
    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_method` on OS `linux`
    async fn multi_part_upload_buffer() -> Result<(), S3MultiPartUploadError> {
        let sdk_config = defaults().load().await;
        let (bucket, key) = match s3_bucket_key_for_test() {
            Some(tuple) => tuple,
            None => return Ok(()),
        };

        let config = S3MultiPartUploaderConfig {
            part_size_limit: ByteSize::mib(5).as_u64(),
            file_size_limit: ByteSize::mib(10).as_u64(),
        };
        let mut uploader =
            S3MultiPartUploader::try_new(&sdk_config, bucket.clone(), key.clone(), config).await?;

        // Adding a chunk of 6MiB, should trigger an upload part since part_size_limit is 5MiB
        let expected_data = vec![97; 6291456]; // 6MiB
        let expected_bytes: u64 = u64::cast_from(expected_data.len());
        uploader.buffer_chunk(&expected_data)?;

        assert_eq!(uploader.remaining_bytes_limit(), ByteSize::mib(4).as_u64());

        // Adding another 6MiB should return an error since file_size_limit is 10MiB
        let error = uploader.buffer_chunk(&expected_data).unwrap_err();
        assert!(matches!(
            error,
            S3MultiPartUploadError::UploadExceedsMaxFileLimit(_)
        ));

        let CompletedUpload {
            part_count,
            total_bytes_uploaded,
            bucket: _,
            key: _,
        } = uploader.finish().await?;

        // Getting the uploaded object from s3 and validating the contents.
        let s3_client = s3::new_client(&sdk_config);
        let uploaded_object = s3_client
            .get_object()
            .bucket(bucket)
            .key(key)
            .send()
            .await
            .unwrap();

        assert_eq!(part_count, 2); // 6MiB should be split into two parts, 5MiB and 1MiB

        let body = uploaded_object.body.collect().await.unwrap().into_bytes();
        assert_eq!(body, *expected_data);

        assert_eq!(total_bytes_uploaded, expected_bytes);

        Ok(())
    }

    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
    #[cfg_attr(coverage, ignore)] // https://github.com/MaterializeInc/materialize/issues/18898
    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_method` on OS `linux`
    async fn multi_part_upload_error() -> Result<(), S3MultiPartUploadError> {
        let sdk_config = defaults().load().await;
        let (bucket, key) = match s3_bucket_key_for_test() {
            Some(tuple) => tuple,
            None => return Ok(()),
        };

        let config = Default::default();
        let uploader =
            S3MultiPartUploader::try_new(&sdk_config, bucket.clone(), key.clone(), config).await?;

        // Calling finish without adding any data should error
        let err = uploader.finish().await.unwrap_err();
        assert_eq!(
            err.to_string(),
            "multi-part upload should have at least 1 part"
        );

        Ok(())
    }

    #[mz_ore::test]
    fn test_invalid_configs() {
        let config = S3MultiPartUploaderConfig {
            part_size_limit: ByteSize::mib(5).as_u64() - 1,
            file_size_limit: ByteSize::gib(5).as_u64(),
        };
        let error = config.validate().unwrap_err();

        assert_eq!(
            error.to_string(),
            "invalid part size: 5242879, should be between 5242880 and 5368709120 bytes"
        );

        let config = S3MultiPartUploaderConfig {
            part_size_limit: ByteSize::mib(5).as_u64(),
            // Subtracting 1 so that the overall multiplier is a fraction between 10000 and 10001
            // to test rounding.
            file_size_limit: (ByteSize::mib(5).as_u64() * 10001) - 1,
        };
        let error = config.validate().unwrap_err();
        assert_eq!(
            error.to_string(), "total number of possible parts (file_size_limit / part_size_limit): 10001, cannot exceed 10000",
        );
    }
}