mz_persist/
s3.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An S3 implementation of [Blob] storage.
11
12use std::cmp;
13use std::fmt::{Debug, Formatter};
14use std::ops::Range;
15use std::sync::Arc;
16use std::sync::atomic::{self, AtomicU64};
17use std::time::{Duration, Instant};
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use aws_config::sts::AssumeRoleProvider;
22use aws_config::timeout::TimeoutConfig;
23use aws_credential_types::Credentials;
24use aws_sdk_s3::Client as S3Client;
25use aws_sdk_s3::config::{AsyncSleep, Sleep};
26use aws_sdk_s3::error::{ProvideErrorMetadata, SdkError};
27use aws_sdk_s3::primitives::ByteStream;
28use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
29use aws_types::region::Region;
30use bytes::Bytes;
31use futures_util::stream::FuturesOrdered;
32use futures_util::{FutureExt, StreamExt};
33use mz_dyncfg::{Config, ConfigSet};
34use mz_ore::bytes::SegmentedBytes;
35use mz_ore::cast::CastFrom;
36use mz_ore::lgbytes::MetricsRegion;
37use mz_ore::metrics::MetricsRegistry;
38use mz_ore::task::RuntimeExt;
39use tokio::runtime::Handle as AsyncHandle;
40use tracing::{Instrument, debug, debug_span, trace, trace_span};
41use uuid::Uuid;
42
43use crate::cfg::BlobKnobs;
44use crate::error::Error;
45use crate::location::{Blob, BlobMetadata, Determinate, ExternalError};
46use crate::metrics::S3BlobMetrics;
47
48/// Configuration for opening an [S3Blob].
49#[derive(Clone, Debug)]
50pub struct S3BlobConfig {
51    metrics: S3BlobMetrics,
52    client: S3Client,
53    bucket: String,
54    prefix: String,
55    cfg: Arc<ConfigSet>,
56    is_cc_active: bool,
57}
58
59// There is no simple way to hook into the S3 client to capture when its various timeouts
60// are hit. Instead, we pass along marker values that inform our [MetricsSleep] impl which
61// type of timeout was requested so it can substitute in a dynamic value set by config
62// from the caller.
63const OPERATION_TIMEOUT_MARKER: Duration = Duration::new(111, 1111);
64const OPERATION_ATTEMPT_TIMEOUT_MARKER: Duration = Duration::new(222, 2222);
65const CONNECT_TIMEOUT_MARKER: Duration = Duration::new(333, 3333);
66const READ_TIMEOUT_MARKER: Duration = Duration::new(444, 4444);
67
68#[derive(Debug)]
69struct MetricsSleep {
70    knobs: Box<dyn BlobKnobs>,
71    metrics: S3BlobMetrics,
72}
73
74impl AsyncSleep for MetricsSleep {
75    fn sleep(&self, duration: Duration) -> Sleep {
76        let (duration, metric) = match duration {
77            OPERATION_TIMEOUT_MARKER => (
78                self.knobs.operation_timeout(),
79                Some(self.metrics.operation_timeouts.clone()),
80            ),
81            OPERATION_ATTEMPT_TIMEOUT_MARKER => (
82                self.knobs.operation_attempt_timeout(),
83                Some(self.metrics.operation_attempt_timeouts.clone()),
84            ),
85            CONNECT_TIMEOUT_MARKER => (
86                self.knobs.connect_timeout(),
87                Some(self.metrics.connect_timeouts.clone()),
88            ),
89            READ_TIMEOUT_MARKER => (
90                self.knobs.read_timeout(),
91                Some(self.metrics.read_timeouts.clone()),
92            ),
93            duration => (duration, None),
94        };
95
96        // the sleep future we return here will only be polled to
97        // completion if its corresponding http request to S3 times
98        // out, meaning we can chain incrementing the appropriate
99        // timeout counter to when it finishes
100        Sleep::new(tokio::time::sleep(duration).map(|x| {
101            if let Some(counter) = metric {
102                counter.inc();
103            }
104            x
105        }))
106    }
107}
108
109impl S3BlobConfig {
110    const EXTERNAL_TESTS_S3_BUCKET: &'static str = "MZ_PERSIST_EXTERNAL_STORAGE_TEST_S3_BUCKET";
111
112    /// Returns a new [S3BlobConfig] for use in production.
113    ///
114    /// Stores objects in the given bucket prepended with the (possibly empty)
115    /// prefix. S3 credentials and region must be available in the process or
116    /// environment.
117    pub async fn new(
118        bucket: String,
119        prefix: String,
120        role_arn: Option<String>,
121        endpoint: Option<String>,
122        region: Option<String>,
123        credentials: Option<(String, String)>,
124        knobs: Box<dyn BlobKnobs>,
125        metrics: S3BlobMetrics,
126        cfg: Arc<ConfigSet>,
127    ) -> Result<Self, Error> {
128        let is_cc_active = knobs.is_cc_active();
129        let mut loader = mz_aws_util::defaults();
130
131        if let Some(region) = region {
132            loader = loader.region(Region::new(region));
133        };
134
135        if let Some(role_arn) = role_arn {
136            let assume_role_sdk_config = mz_aws_util::defaults().load().await;
137            let role_provider = AssumeRoleProvider::builder(role_arn)
138                .configure(&assume_role_sdk_config)
139                .session_name("persist")
140                .build()
141                .await;
142            loader = loader.credentials_provider(role_provider);
143        }
144
145        if let Some((access_key_id, secret_access_key)) = credentials {
146            loader = loader.credentials_provider(Credentials::from_keys(
147                access_key_id,
148                secret_access_key,
149                None,
150            ));
151        }
152
153        if let Some(endpoint) = endpoint {
154            loader = loader.endpoint_url(endpoint)
155        }
156
157        // NB: we must always use the custom sleep impl if we use the timeout marker values
158        loader = loader.sleep_impl(MetricsSleep {
159            knobs,
160            metrics: metrics.clone(),
161        });
162        loader = loader.timeout_config(
163            TimeoutConfig::builder()
164                // maximum time allowed for a top-level S3 API call (including internal retries)
165                .operation_timeout(OPERATION_TIMEOUT_MARKER)
166                // maximum time allowed for a single network call
167                .operation_attempt_timeout(OPERATION_ATTEMPT_TIMEOUT_MARKER)
168                // maximum time until a connection succeeds
169                .connect_timeout(CONNECT_TIMEOUT_MARKER)
170                // maximum time to read the first byte of a response
171                .read_timeout(READ_TIMEOUT_MARKER)
172                .build(),
173        );
174
175        let client = mz_aws_util::s3::new_client(&loader.load().await);
176        Ok(S3BlobConfig {
177            metrics,
178            client,
179            bucket,
180            prefix,
181            cfg,
182            is_cc_active,
183        })
184    }
185
186    /// Returns a new [S3BlobConfig] for use in unit tests.
187    ///
188    /// By default, persist tests that use external storage (like s3) are
189    /// no-ops, so that `cargo test` does the right thing without any
190    /// configuration. To activate the tests, set the
191    /// `MZ_PERSIST_EXTERNAL_STORAGE_TEST_S3_BUCKET` environment variable and
192    /// ensure you have valid AWS credentials available in a location where the
193    /// AWS Rust SDK can discovery them.
194    ///
195    /// This intentionally uses the `MZ_PERSIST_EXTERNAL_STORAGE_TEST_S3_BUCKET`
196    /// env as the switch for test no-op-ness instead of the presence of a valid
197    /// AWS authentication configuration envs because a developers might have
198    /// valid credentials present and this isn't an explicit enough signal from
199    /// a developer running `cargo test` that it's okay to use these
200    /// credentials. It also intentionally does not use the local drop-in s3
201    /// replacement to keep persist unit tests light.
202    ///
203    /// On CI, these tests are enabled by adding the scratch-aws-access plugin
204    /// to the `cargo-test` step in `ci/test/pipeline.template.yml` and setting
205    /// `MZ_PERSIST_EXTERNAL_STORAGE_TEST_S3_BUCKET` in
206    /// `ci/test/cargo-test/mzcompose.py`.
207    ///
208    /// For a Materialize developer, to opt in to these tests locally for
209    /// development, follow the AWS access guide:
210    ///
211    /// ```text
212    /// https://github.com/MaterializeInc/i2/blob/main/doc/aws-access.md
213    /// ```
214    ///
215    /// then running `source src/persist/s3_test_env_mz.sh`. You will also have
216    /// to run `aws sso login` if you haven't recently.
217    ///
218    /// Non-Materialize developers will have to set up their own auto-deleting
219    /// bucket and export the same env vars that s3_test_env_mz.sh does.
220    ///
221    /// Only public for use in src/benches.
222    pub async fn new_for_test() -> Result<Option<Self>, Error> {
223        let bucket = match std::env::var(Self::EXTERNAL_TESTS_S3_BUCKET) {
224            Ok(bucket) => bucket,
225            Err(_) => {
226                if mz_ore::env::is_var_truthy("CI") {
227                    panic!("CI is supposed to run this test but something has gone wrong!");
228                }
229                return Ok(None);
230            }
231        };
232
233        struct TestBlobKnobs;
234        impl std::fmt::Debug for TestBlobKnobs {
235            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
236                f.debug_struct("TestBlobKnobs").finish_non_exhaustive()
237            }
238        }
239        impl BlobKnobs for TestBlobKnobs {
240            fn operation_timeout(&self) -> Duration {
241                OPERATION_TIMEOUT_MARKER
242            }
243
244            fn operation_attempt_timeout(&self) -> Duration {
245                OPERATION_ATTEMPT_TIMEOUT_MARKER
246            }
247
248            fn connect_timeout(&self) -> Duration {
249                CONNECT_TIMEOUT_MARKER
250            }
251
252            fn read_timeout(&self) -> Duration {
253                READ_TIMEOUT_MARKER
254            }
255
256            fn is_cc_active(&self) -> bool {
257                false
258            }
259        }
260
261        // Give each test a unique prefix so they don't conflict. We don't have
262        // to worry about deleting any data that we create because the bucket is
263        // set to auto-delete after 1 day.
264        let prefix = Uuid::new_v4().to_string();
265        let role_arn = None;
266        let metrics = S3BlobMetrics::new(&MetricsRegistry::new());
267        let config = S3BlobConfig::new(
268            bucket,
269            prefix,
270            role_arn,
271            None,
272            None,
273            None,
274            Box::new(TestBlobKnobs),
275            metrics,
276            Arc::new(
277                ConfigSet::default()
278                    .add(&ENABLE_S3_LGALLOC_CC_SIZES)
279                    .add(&ENABLE_S3_LGALLOC_NONCC_SIZES),
280            ),
281        )
282        .await?;
283        Ok(Some(config))
284    }
285
286    /// Returns a clone of Self with a new v4 uuid prefix.
287    pub fn clone_with_new_uuid_prefix(&self) -> Self {
288        let mut ret = self.clone();
289        ret.prefix = Uuid::new_v4().to_string();
290        ret
291    }
292}
293
294/// Implementation of [Blob] backed by S3.
295#[derive(Debug)]
296pub struct S3Blob {
297    metrics: S3BlobMetrics,
298    client: S3Client,
299    bucket: String,
300    prefix: String,
301    // Maximum number of keys we get information about per list-objects request.
302    //
303    // Defaults to 1000 which is the current AWS max.
304    max_keys: i32,
305    multipart_config: MultipartConfig,
306    cfg: Arc<ConfigSet>,
307    is_cc_active: bool,
308}
309
310impl S3Blob {
311    /// Opens the given location for non-exclusive read-write access.
312    pub async fn open(config: S3BlobConfig) -> Result<Self, ExternalError> {
313        let ret = S3Blob {
314            metrics: config.metrics,
315            client: config.client,
316            bucket: config.bucket,
317            prefix: config.prefix,
318            max_keys: 1_000,
319            multipart_config: MultipartConfig::default(),
320            cfg: config.cfg,
321            is_cc_active: config.is_cc_active,
322        };
323        // Connect before returning success. We don't particularly care about
324        // what's stored in this blob (nothing writes to it, so presumably it's
325        // empty) just that we were able and allowed to fetch it.
326        let _ = ret.get("HEALTH_CHECK").await?;
327        Ok(ret)
328    }
329
330    fn get_path(&self, key: &str) -> String {
331        format!("{}/{}", self.prefix, key)
332    }
333}
334
335pub(crate) const ENABLE_S3_LGALLOC_CC_SIZES: Config<bool> = Config::new(
336    "persist_enable_s3_lgalloc_cc_sizes",
337    true,
338    "An incident flag to disable copying fetched s3 data into lgalloc on cc sized clusters.",
339);
340
341pub(crate) const ENABLE_S3_LGALLOC_NONCC_SIZES: Config<bool> = Config::new(
342    "persist_enable_s3_lgalloc_noncc_sizes",
343    false,
344    "A feature flag to enable copying fetched s3 data into lgalloc on non-cc sized clusters.",
345);
346
347#[async_trait]
348impl Blob for S3Blob {
349    async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
350        let start_overall = Instant::now();
351        let path = self.get_path(key);
352
353        // S3 advises that it's fastest to download large objects along the part
354        // boundaries they were originally uploaded with [1].
355        //
356        // [1]: https://docs.aws.amazon.com/whitepapers/latest/s3-optimizing-performance-best-practices/use-byte-range-fetches.html
357        //
358        // One option is to run the same logic as multipart does and do the
359        // requests using the resulting byte ranges, but if we ever changed the
360        // multipart chunking logic, they wouldn't line up for old blobs written
361        // by a previous version.
362        //
363        // Another option is to store the part boundaries in the metadata we
364        // keep about the batch, but this would be large and wasteful.
365        //
366        // Luckily, s3 exposes a part_number param on GetObject requests that we
367        // can use. If an object was created with multipart, it allows
368        // requesting each part as they were originally uploaded by the part
369        // number index. With this, we can simply send off requests for part
370        // number 1..=num_parts and reassemble the results.
371        //
372        // We could roundtrip the number of parts through persist batch
373        // metadata, but with some cleverness, we can avoid even this. Turns
374        // out, if multipart upload wasn't used (it was just a normal PutObject
375        // request), s3 will still happily return it for a request specifying a
376        // part_number of 1. This lets us fire off a first request, which
377        // contains the metadata we need to determine how many additional parts
378        // we need, if any.
379        //
380        // So, the following call sends this first request. The SDK even returns
381        // the headers before the full data body has completed. This gives us
382        // the number of parts. We can then proceed to fetch the body of the
383        // first request concurrently with the rest of the parts of the object.
384
385        // For each header and body that we fetch, we track the fastest, and
386        // any large deviations from it.
387        let min_body_elapsed = Arc::new(MinElapsed::default());
388        let min_header_elapsed = Arc::new(MinElapsed::default());
389        self.metrics.get_part.inc();
390
391        // Fetch our first header, this tells us how many more are left.
392        let header_start = Instant::now();
393        let object = self
394            .client
395            .get_object()
396            .bucket(&self.bucket)
397            .key(&path)
398            .part_number(1)
399            .send()
400            .await;
401        let elapsed = header_start.elapsed();
402        min_header_elapsed.observe(elapsed, "s3 download first part header");
403
404        let first_part = match object {
405            Ok(object) => object,
406            Err(SdkError::ServiceError(err)) if err.err().is_no_such_key() => return Ok(None),
407            Err(err) => {
408                self.update_error_metrics("GetObject", &err);
409                Err(anyhow!(err).context("s3 get meta err"))?
410            }
411        };
412
413        // Get the remaining number of parts
414        let num_parts = match first_part.parts_count() {
415            // For a non-multipart upload, parts_count will be None. The rest of  the code works
416            // perfectly well if we just pretend this was a multipart upload of 1 part.
417            None => 1,
418            // For any positive value greater than 0, just return it.
419            Some(parts @ 1..) => parts,
420            // A non-positive value is invalid.
421            Some(bad) => {
422                assert!(bad <= 0);
423                return Err(anyhow!("unexpected number of s3 object parts: {}", bad).into());
424            }
425        };
426
427        trace!(
428            "s3 download first header took {:?} ({num_parts} parts)",
429            start_overall.elapsed(),
430        );
431
432        let mut body_futures = FuturesOrdered::new();
433        let mut first_part = Some(first_part);
434
435        // Fetch the headers of the rest of the parts. (Starting at part 2 because we already
436        // did part 1.)
437        for part_num in 1..=num_parts {
438            // Clone a handle to our MinElapsed trackers so we can give one to
439            // each download task.
440            let min_header_elapsed = Arc::clone(&min_header_elapsed);
441            let min_body_elapsed = Arc::clone(&min_body_elapsed);
442            let get_invalid_resp = self.metrics.get_invalid_resp.clone();
443            let first_part = first_part.take();
444            let path = &path;
445            let request_future = async move {
446                // Fetch the headers of the rest of the parts. (Using the existing headers
447                // for part 1.
448                let mut object = match first_part {
449                    Some(first_part) => {
450                        assert_eq!(part_num, 1, "only the first part should be prefetched");
451                        first_part
452                    }
453                    None => {
454                        assert_ne!(part_num, 1, "first part should be prefetched");
455                        // Request our headers.
456                        let header_start = Instant::now();
457                        let object = self
458                            .client
459                            .get_object()
460                            .bucket(&self.bucket)
461                            .key(path)
462                            .part_number(part_num)
463                            .send()
464                            .await
465                            .inspect_err(|err| self.update_error_metrics("GetObject", err))
466                            .context("s3 get meta err")?;
467                        min_header_elapsed
468                            .observe(header_start.elapsed(), "s3 download part header");
469                        object
470                    }
471                };
472
473                // Request the body.
474                let body_start = Instant::now();
475                let mut body_parts: Vec<Bytes> = Vec::new();
476
477                // Get the data into lgalloc at the absolute earliest possible
478                // point without (yet) having to fork the s3 client library.
479                let enable_s3_lgalloc = if self.is_cc_active {
480                    ENABLE_S3_LGALLOC_CC_SIZES.get(&self.cfg)
481                } else {
482                    ENABLE_S3_LGALLOC_NONCC_SIZES.get(&self.cfg)
483                };
484
485                // Copy all of the bytes off the network and into a single allocation.
486                let mut buffer = match object.content_length() {
487                    Some(len @ 1..) => {
488                        let len: u64 = len.try_into().expect("positive integer");
489                        // N.B. `lgalloc` cannot reallocate so we need to make sure the initial
490                        // allocation is large enough to fit then entire blob.
491                        let buf: MetricsRegion<u8> = self
492                            .metrics
493                            .lgbytes
494                            .persist_s3
495                            .new_region(usize::cast_from(len));
496                        Some(buf)
497                    }
498                    // content-length of 0 isn't necessarily invalid.
499                    Some(len @ ..=-1) => {
500                        tracing::trace!(?len, "found invalid content-length, falling back");
501                        get_invalid_resp.inc();
502                        None
503                    }
504                    Some(0) | None => None,
505                };
506
507                while let Some(data) = object.body.next().await {
508                    let data = data.context("s3 get body err")?;
509                    match &mut buffer {
510                        // Write to our single allocation, if it's enabled.
511                        Some(buf) => buf.extend_from_slice(&data[..]),
512                        // Fallback to spilling into lgalloc is quick as possible.
513                        None if enable_s3_lgalloc => {
514                            body_parts.push(self.metrics.lgbytes.persist_s3.try_mmap_bytes(data));
515                        }
516                        // If all else false just heap allocate.
517                        None => {
518                            // In the CYA fallback case, make sure we skip the
519                            // memcpy to preserve the previous behavior as closely
520                            // as possible.
521                            //
522                            // TODO: Once we've validated the LgBytes path, change
523                            // this fallback path to be a heap allocated LgBytes.
524                            // Then we can remove the pub from MaybeLgBytes.
525                            body_parts.push(data);
526                        }
527                    }
528                }
529
530                // Append our single segment, if it exists.
531                if let Some(body) = buffer {
532                    // If we're writing into a single buffer we shouldn't have
533                    // pushed anything else into our segments.
534                    assert!(body_parts.is_empty());
535                    body_parts.push(body.into());
536                }
537
538                let body_elapsed = body_start.elapsed();
539                min_body_elapsed.observe(body_elapsed, "s3 download part body");
540
541                Ok::<_, anyhow::Error>(body_parts)
542            };
543
544            body_futures.push_back(request_future);
545        }
546
547        // Await on all of our parts requests.
548        let mut segments = vec![];
549        while let Some(result) = body_futures.next().await {
550            // Download failure, we failed to fetch the body from S3.
551            let mut part_body = result
552                .inspect_err(|e| {
553                    self.metrics
554                        .error_counts
555                        .with_label_values(&["GetObjectStream", e.to_string().as_str()])
556                        .inc()
557                })
558                .context("s3 get body err")?;
559
560            // Collect all of our segments.
561            segments.append(&mut part_body);
562        }
563
564        debug!(
565            "s3 GetObject took {:?} ({} parts)",
566            start_overall.elapsed(),
567            num_parts
568        );
569        Ok(Some(SegmentedBytes::from(segments)))
570    }
571
572    async fn list_keys_and_metadata(
573        &self,
574        key_prefix: &str,
575        f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
576    ) -> Result<(), ExternalError> {
577        let mut continuation_token = None;
578        // we only want to return keys that match the specified blob key prefix
579        let blob_key_prefix = self.get_path(key_prefix);
580        // but we want to exclude the shared root prefix from our returned keys,
581        // so only the blob key itself is passed in to `f`
582        let strippable_root_prefix = format!("{}/", self.prefix);
583
584        loop {
585            self.metrics.list_objects.inc();
586            let resp = self
587                .client
588                .list_objects_v2()
589                .bucket(&self.bucket)
590                .prefix(&blob_key_prefix)
591                .max_keys(self.max_keys)
592                .set_continuation_token(continuation_token)
593                .send()
594                .await
595                .inspect_err(|err| self.update_error_metrics("ListObjectsV2", err))
596                .context("list bucket error")?;
597            if let Some(contents) = resp.contents {
598                for object in contents.iter() {
599                    if let Some(key) = object.key.as_ref() {
600                        if let Some(key) = key.strip_prefix(&strippable_root_prefix) {
601                            let size_in_bytes = match object.size {
602                                None => {
603                                    return Err(ExternalError::from(anyhow!(
604                                        "object missing size: {key}"
605                                    )));
606                                }
607                                Some(size) => size
608                                    .try_into()
609                                    .expect("file in S3 cannot have negative size"),
610                            };
611                            f(BlobMetadata { key, size_in_bytes });
612                        } else {
613                            return Err(ExternalError::from(anyhow!(
614                                "found key with invalid prefix: {}",
615                                key
616                            )));
617                        }
618                    }
619                }
620            }
621
622            if resp.next_continuation_token.is_some() {
623                continuation_token = resp.next_continuation_token;
624            } else {
625                break;
626            }
627        }
628
629        Ok(())
630    }
631
632    async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
633        let value_len = value.len();
634        if self
635            .multipart_config
636            .should_multipart(value_len)
637            .map_err(anyhow::Error::msg)?
638        {
639            self.set_multi_part(key, value)
640                .instrument(debug_span!("s3set_multi", payload_len = value_len))
641                .await
642        } else {
643            self.set_single_part(key, value).await
644        }
645    }
646
647    async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
648        // There is a race condition here where, if two delete calls for the
649        // same key occur simultaneously, both might think they did the actual
650        // deletion. This return value is only used for metrics, so it's
651        // unfortunate, but fine.
652        let path = self.get_path(key);
653        self.metrics.delete_head.inc();
654        let head_res = self
655            .client
656            .head_object()
657            .bucket(&self.bucket)
658            .key(&path)
659            .send()
660            .await;
661        let size_bytes = match head_res {
662            Ok(x) => match x.content_length {
663                None => {
664                    return Err(ExternalError::from(anyhow!(
665                        "s3 delete content length was none"
666                    )));
667                }
668                Some(content_length) => {
669                    u64::try_from(content_length).expect("file in S3 cannot have negative size")
670                }
671            },
672            Err(SdkError::ServiceError(err)) if err.err().is_not_found() => return Ok(None),
673            Err(err) => {
674                self.update_error_metrics("HeadObject", &err);
675                return Err(ExternalError::from(
676                    anyhow!(err).context("s3 delete head err"),
677                ));
678            }
679        };
680        self.metrics.delete_object.inc();
681        let _ = self
682            .client
683            .delete_object()
684            .bucket(&self.bucket)
685            .key(&path)
686            .send()
687            .await
688            .inspect_err(|err| self.update_error_metrics("DeleteObject", err))
689            .context("s3 delete object err")?;
690        Ok(Some(usize::cast_from(size_bytes)))
691    }
692
693    async fn restore(&self, key: &str) -> Result<(), ExternalError> {
694        let path = self.get_path(key);
695        // Fetch the latest version of the object. If it's a normal version, return true;
696        // if it's a delete marker, delete it and loop; if there is no such version,
697        // return false.
698        // TODO: limit the number of delete markers we'll peel back?
699        loop {
700            // S3 only lets us fetch the versions of an object with a list requests.
701            // Seems a bit wasteful to just fetch one at a time, but otherwise we can only
702            // guess the order of versions via the timestamp, and that feels brittle.
703            let list_res = self
704                .client
705                .list_object_versions()
706                .bucket(&self.bucket)
707                .prefix(&path)
708                .max_keys(1)
709                .send()
710                .await
711                .inspect_err(|err| self.update_error_metrics("ListObjectVersions", err))
712                .context("listing object versions during restore")?;
713
714            let current_delete = list_res
715                .delete_markers()
716                .into_iter()
717                .filter(|d| {
718                    // We need to check that any versions we're looking at have the right key,
719                    // not just a key with our key as a prefix.
720                    d.key() == Some(path.as_str())
721                })
722                .find(|d| d.is_latest().unwrap_or(false))
723                .and_then(|d| d.version_id());
724
725            if let Some(version) = current_delete {
726                let deleted = self
727                    .client
728                    .delete_object()
729                    .bucket(&self.bucket)
730                    .key(&path)
731                    .version_id(version)
732                    .send()
733                    .await
734                    .inspect_err(|err| self.update_error_metrics("DeleteObject", err))
735                    .context("deleting a delete marker")?;
736                assert!(
737                    deleted.delete_marker().unwrap_or(false),
738                    "deleting a delete marker"
739                );
740            } else {
741                let has_current_version = list_res
742                    .versions()
743                    .into_iter()
744                    .filter(|d| d.key() == Some(path.as_str()))
745                    .any(|v| v.is_latest().unwrap_or(false));
746
747                if !has_current_version {
748                    return Err(Determinate::new(anyhow!(
749                        "unable to restore {key} in s3: no valid version exists"
750                    ))
751                    .into());
752                }
753                return Ok(());
754            }
755        }
756    }
757}
758
759impl S3Blob {
760    async fn set_single_part(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
761        let start_overall = Instant::now();
762        let path = self.get_path(key);
763
764        let value_len = value.len();
765        let part_span = trace_span!("s3set_single", payload_len = value_len);
766        self.metrics.set_single.inc();
767        self.client
768            .put_object()
769            .bucket(&self.bucket)
770            .key(path)
771            .body(ByteStream::from(value))
772            .send()
773            .instrument(part_span)
774            .await
775            .inspect_err(|err| self.update_error_metrics("PutObject", err))
776            .context("set single part")?;
777        debug!(
778            "s3 PutObject single done {}b / {:?}",
779            value_len,
780            start_overall.elapsed()
781        );
782        Ok(())
783    }
784
785    // TODO(benesch): remove this once this function no longer makes use of
786    // potentially dangerous `as` conversions.
787    #[allow(clippy::as_conversions)]
788    async fn set_multi_part(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
789        let start_overall = Instant::now();
790        let path = self.get_path(key);
791
792        // Start the multi part request and get an upload id.
793        trace!("s3 PutObject multi start {}b", value.len());
794        self.metrics.set_multi_create.inc();
795        let upload_res = self
796            .client
797            .create_multipart_upload()
798            .bucket(&self.bucket)
799            .key(&path)
800            .customize()
801            .mutate_request(|req| {
802                // By default the Rust AWS SDK does not set the Content-Length
803                // header on POST calls with empty bodies. This is fine for S3,
804                // but when running against GCS's S3 interop mode these calls
805                // will be rejected unless we set this header manually.
806                req.headers_mut().insert("Content-Length", "0");
807            })
808            .send()
809            .instrument(debug_span!("s3set_multi_start"))
810            .await
811            .inspect_err(|err| self.update_error_metrics("CreateMultipartUpload", err))
812            .context("create_multipart_upload err")?;
813        let upload_id = upload_res
814            .upload_id()
815            .ok_or_else(|| anyhow!("create_multipart_upload response missing upload_id"))?;
816        trace!(
817            "s3 create_multipart_upload took {:?}",
818            start_overall.elapsed()
819        );
820
821        let async_runtime = AsyncHandle::try_current().map_err(anyhow::Error::new)?;
822
823        // Fire off all the individual parts.
824        //
825        // TODO: The aws cli throttles how many of these are outstanding at any
826        // given point. We'll likely want to do the same at some point.
827        let start_parts = Instant::now();
828        let mut part_futs = Vec::new();
829        for (part_num, part_range) in self.multipart_config.part_iter(value.len()) {
830            // NB: Without this spawn, these will execute serially. This is rust
831            // async 101 stuff, but there isn't much async in the persist
832            // codebase (yet?) so I thought it worth calling out.
833            let part_span = debug_span!("s3set_multi_part", payload_len = part_range.len());
834            let part_fut = async_runtime.spawn_named(
835                // TODO: Add the key and part number once this can be annotated
836                // with metadata.
837                || "persist_s3blob_put_part",
838                {
839                    self.metrics.set_multi_part.inc();
840                    self.client
841                        .upload_part()
842                        .bucket(&self.bucket)
843                        .key(&path)
844                        .upload_id(upload_id)
845                        .part_number(part_num as i32)
846                        .body(ByteStream::from(value.slice(part_range)))
847                        .send()
848                        .instrument(part_span)
849                        .map(move |res| (start_parts.elapsed(), res))
850                },
851            );
852            part_futs.push((part_num, part_fut));
853        }
854        let parts_len = part_futs.len();
855
856        // Wait on all the parts to finish. This is done in part order, no need
857        // for joining them in the order they finish.
858        //
859        // TODO: Consider using something like futures::future::join_all() for
860        // this. That would cancel outstanding requests for us if any of them
861        // fails. However, it might not play well with using retries for tail
862        // latencies. Investigate.
863        let min_part_elapsed = MinElapsed::default();
864        let mut parts = Vec::with_capacity(parts_len);
865        for (part_num, part_fut) in part_futs.into_iter() {
866            let (this_part_elapsed, part_res) = part_fut
867                .await
868                .inspect(|_| {
869                    self.metrics
870                        .error_counts
871                        .with_label_values(&["UploadPart", "AsyncSpawnError"])
872                        .inc()
873                })
874                .map_err(|err| anyhow!(err).context("s3 spawn err"))?;
875            let part_res = part_res
876                .inspect_err(|err| self.update_error_metrics("UploadPart", err))
877                .context("s3 upload_part err")?;
878            let part_e_tag = part_res.e_tag().ok_or_else(|| {
879                self.metrics
880                    .error_counts
881                    .with_label_values(&["UploadPart", "MissingEtag"])
882                    .inc();
883                anyhow!("s3 upload part missing e_tag")
884            })?;
885            parts.push(
886                CompletedPart::builder()
887                    .e_tag(part_e_tag)
888                    .part_number(part_num as i32)
889                    .build(),
890            );
891            min_part_elapsed.observe(this_part_elapsed, "s3 upload_part took");
892        }
893        trace!(
894            "s3 upload_parts overall took {:?} ({} parts)",
895            start_parts.elapsed(),
896            parts_len
897        );
898
899        // Complete the upload.
900        //
901        // Currently, we early return if any of the individual parts fail. This
902        // permanently orphans any parts that succeeded. One fix is to call
903        // abort_multipart_upload, which deletes them. However, there's also an
904        // option for an s3 bucket to auto-delete parts that haven't been
905        // completed or aborted after a given amount of time. This latter is
906        // simpler and also resilient to ill-timed mz restarts, so we use it for
907        // now. We could likely add the accounting necessary to make
908        // abort_multipart_upload work, but it would be complex and affect perf.
909        // Let's see how far we can get without it.
910        let start_complete = Instant::now();
911        self.metrics.set_multi_complete.inc();
912        self.client
913            .complete_multipart_upload()
914            .bucket(&self.bucket)
915            .key(&path)
916            .upload_id(upload_id)
917            .multipart_upload(
918                CompletedMultipartUpload::builder()
919                    .set_parts(Some(parts))
920                    .build(),
921            )
922            .send()
923            .instrument(debug_span!("s3set_multi_complete", num_parts = parts_len))
924            .await
925            .inspect_err(|err| self.update_error_metrics("CompleteMultipartUpload", err))
926            .context("complete_multipart_upload err")?;
927        trace!(
928            "s3 complete_multipart_upload took {:?}",
929            start_complete.elapsed()
930        );
931
932        debug!(
933            "s3 PutObject multi done {}b / {:?} ({} parts)",
934            value.len(),
935            start_overall.elapsed(),
936            parts_len
937        );
938        Ok(())
939    }
940
941    fn update_error_metrics<E, R>(&self, op: &str, err: &SdkError<E, R>)
942    where
943        E: ProvideErrorMetadata,
944    {
945        let code = match err {
946            SdkError::ServiceError(e) => match e.err().code() {
947                Some(code) => code,
948                None => "UnknownServiceError",
949            },
950            SdkError::DispatchFailure(e) => {
951                if let Some(other_error) = e.as_other() {
952                    match other_error {
953                        aws_config::retry::ErrorKind::TransientError => "TransientError",
954                        aws_config::retry::ErrorKind::ThrottlingError => "ThrottlingError",
955                        aws_config::retry::ErrorKind::ServerError => "ServerError",
956                        aws_config::retry::ErrorKind::ClientError => "ClientError",
957                        _ => "UnknownDispatchFailure",
958                    }
959                } else if e.is_timeout() {
960                    "TimeoutError"
961                } else if e.is_io() {
962                    "IOError"
963                } else if e.is_user() {
964                    "UserError"
965                } else {
966                    "UnknownDispathFailure"
967                }
968            }
969            SdkError::ResponseError(_) => "ResponseError",
970            SdkError::ConstructionFailure(_) => "ConstructionFailure",
971            // There is some overlap with MetricsSleep. MetricsSleep is more granular
972            // but does not contain the operation.
973            SdkError::TimeoutError(_) => "TimeoutError",
974            // an error was added at some point in the future
975            _ => "UnknownSdkError",
976        };
977        self.metrics
978            .error_counts
979            .with_label_values(&[op, code])
980            .inc();
981    }
982}
983
984#[derive(Clone, Debug)]
985struct MultipartConfig {
986    multipart_threshold: usize,
987    multipart_chunk_size: usize,
988}
989
990impl Default for MultipartConfig {
991    fn default() -> Self {
992        Self {
993            multipart_threshold: Self::DEFAULT_MULTIPART_THRESHOLD,
994            multipart_chunk_size: Self::DEFAULT_MULTIPART_CHUNK_SIZE,
995        }
996    }
997}
998
999const MB: usize = 1024 * 1024;
1000const TB: usize = 1024 * 1024 * MB;
1001
1002impl MultipartConfig {
1003    /// The minimum object size for which we start using multipart upload.
1004    ///
1005    /// From the official `aws cli` tool implementation:
1006    ///
1007    /// <https://github.com/aws/aws-cli/blob/2.4.14/awscli/customizations/s3/transferconfig.py#L18-L29>
1008    const DEFAULT_MULTIPART_THRESHOLD: usize = 8 * MB;
1009    /// The size of each part (except the last) in a multipart upload.
1010    ///
1011    /// From the official `aws cli` tool implementation:
1012    ///
1013    /// <https://github.com/aws/aws-cli/blob/2.4.14/awscli/customizations/s3/transferconfig.py#L18-L29>
1014    const DEFAULT_MULTIPART_CHUNK_SIZE: usize = 8 * MB;
1015
1016    /// The largest size object creatable in S3.
1017    ///
1018    /// From <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
1019    const MAX_SINGLE_UPLOAD_SIZE: usize = 5 * TB;
1020    /// The minimum size of a part in a multipart upload.
1021    ///
1022    /// This minimum doesn't apply to the last chunk, which can be any size.
1023    ///
1024    /// From <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
1025    const MIN_UPLOAD_CHUNK_SIZE: usize = 5 * MB;
1026    /// The smallest allowable part number (inclusive).
1027    ///
1028    /// From <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
1029    const MIN_PART_NUM: u32 = 1;
1030    /// The largest allowable part number (inclusive).
1031    ///
1032    /// From <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
1033    const MAX_PART_NUM: u32 = 10_000;
1034
1035    fn should_multipart(&self, blob_len: usize) -> Result<bool, String> {
1036        if blob_len > Self::MAX_SINGLE_UPLOAD_SIZE {
1037            return Err(format!(
1038                "S3 does not support blobs larger than {} bytes got: {}",
1039                Self::MAX_SINGLE_UPLOAD_SIZE,
1040                blob_len
1041            ));
1042        }
1043        Ok(blob_len > self.multipart_threshold)
1044    }
1045
1046    fn part_iter(&self, blob_len: usize) -> MultipartChunkIter {
1047        debug_assert!(self.multipart_chunk_size >= MultipartConfig::MIN_UPLOAD_CHUNK_SIZE);
1048        MultipartChunkIter::new(self.multipart_chunk_size, blob_len)
1049    }
1050}
1051
1052#[derive(Clone, Debug)]
1053struct MultipartChunkIter {
1054    total_len: usize,
1055    part_size: usize,
1056    part_idx: u32,
1057}
1058
1059impl MultipartChunkIter {
1060    fn new(default_part_size: usize, blob_len: usize) -> Self {
1061        let max_parts: usize = usize::cast_from(MultipartConfig::MAX_PART_NUM);
1062
1063        // Compute the minimum part size we can use without going over the max
1064        // number of parts that S3 allows: `ceil(blob_len / max_parts)`.This
1065        // will end up getting thrown away by the `cmp::max` for anything
1066        // smaller than `max_parts * default_part_size = 80GiB`.
1067        let min_part_size = (blob_len + max_parts - 1) / max_parts;
1068        let part_size = cmp::max(min_part_size, default_part_size);
1069
1070        // Part nums are 1-indexed in S3. Convert back to 0-indexed to make the
1071        // range math easier to follow.
1072        let part_idx = MultipartConfig::MIN_PART_NUM - 1;
1073        MultipartChunkIter {
1074            total_len: blob_len,
1075            part_size,
1076            part_idx,
1077        }
1078    }
1079}
1080
1081impl Iterator for MultipartChunkIter {
1082    type Item = (u32, Range<usize>);
1083
1084    fn next(&mut self) -> Option<Self::Item> {
1085        let part_idx = self.part_idx;
1086        self.part_idx += 1;
1087
1088        let start = usize::cast_from(part_idx) * self.part_size;
1089        if start >= self.total_len {
1090            return None;
1091        }
1092        let end = cmp::min(start + self.part_size, self.total_len);
1093        let part_num = part_idx + 1;
1094        Some((part_num, start..end))
1095    }
1096}
1097
1098/// A helper for tracking the minimum of a set of Durations.
1099#[derive(Debug)]
1100struct MinElapsed {
1101    min: AtomicU64,
1102    alert_factor: u64,
1103}
1104
1105impl Default for MinElapsed {
1106    fn default() -> Self {
1107        MinElapsed {
1108            min: AtomicU64::new(u64::MAX),
1109            alert_factor: 8,
1110        }
1111    }
1112}
1113
1114impl MinElapsed {
1115    fn observe(&self, x: Duration, msg: &'static str) {
1116        let nanos = x.as_nanos();
1117        let nanos = u64::try_from(nanos).unwrap_or(u64::MAX);
1118
1119        // Possibly set a new minimum.
1120        let prev_min = self.min.fetch_min(nanos, atomic::Ordering::SeqCst);
1121
1122        // Trace if our provided duration was much larger than our minimum.
1123        let new_min = std::cmp::min(prev_min, nanos);
1124        if nanos > new_min.saturating_mul(self.alert_factor) {
1125            let min_duration = Duration::from_nanos(new_min);
1126            let factor = self.alert_factor;
1127            debug!("{msg} took {x:?} more than {factor}x the min {min_duration:?}");
1128        } else {
1129            trace!("{msg} took {x:?}");
1130        }
1131    }
1132}
1133
1134// Make sure the "vendored" feature of the openssl_sys crate makes it into the
1135// transitive dep graph of persist, so that we don't attempt to link against the
1136// system OpenSSL library. Fake a usage of the crate here so that a good
1137// samaritan doesn't remove our unused dep.
1138#[allow(dead_code)]
1139fn openssl_sys_hack() {
1140    openssl_sys::init();
1141}
1142
1143#[cfg(test)]
1144mod tests {
1145    use tracing::info;
1146
1147    use crate::location::tests::blob_impl_test;
1148
1149    use super::*;
1150
1151    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
1152    #[cfg_attr(coverage, ignore)] // https://github.com/MaterializeInc/database-issues/issues/5586
1153    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_method` on OS `linux`
1154    #[ignore] // TODO: Reenable against minio so it can run locally
1155    async fn s3_blob() -> Result<(), ExternalError> {
1156        let config = match S3BlobConfig::new_for_test().await? {
1157            Some(client) => client,
1158            None => {
1159                info!(
1160                    "{} env not set: skipping test that uses external service",
1161                    S3BlobConfig::EXTERNAL_TESTS_S3_BUCKET
1162                );
1163                return Ok(());
1164            }
1165        };
1166        let config_multipart = config.clone_with_new_uuid_prefix();
1167
1168        blob_impl_test(move |path| {
1169            let path = path.to_owned();
1170            let config = config.clone();
1171            async move {
1172                let config = S3BlobConfig {
1173                    metrics: config.metrics.clone(),
1174                    client: config.client.clone(),
1175                    bucket: config.bucket.clone(),
1176                    prefix: format!("{}/s3_blob_impl_test/{}", config.prefix, path),
1177                    cfg: Arc::new(
1178                        ConfigSet::default()
1179                            .add(&ENABLE_S3_LGALLOC_CC_SIZES)
1180                            .add(&ENABLE_S3_LGALLOC_NONCC_SIZES),
1181                    ),
1182                    is_cc_active: true,
1183                };
1184                let mut blob = S3Blob::open(config).await?;
1185                blob.max_keys = 2;
1186                Ok(blob)
1187            }
1188        })
1189        .await?;
1190
1191        // Also specifically test multipart. S3 requires all parts but the last
1192        // to be at least 5MB, which we don't want to do from a test, so this
1193        // uses the multipart code path but only writes a single part.
1194        {
1195            let blob = S3Blob::open(config_multipart).await?;
1196            blob.set_multi_part("multipart", "foobar".into()).await?;
1197            assert_eq!(
1198                blob.get("multipart").await?,
1199                Some(b"foobar".to_vec().into())
1200            );
1201        }
1202
1203        Ok(())
1204    }
1205
1206    #[mz_ore::test]
1207    fn should_multipart() {
1208        let config = MultipartConfig::default();
1209        assert_eq!(config.should_multipart(0), Ok(false));
1210        assert_eq!(config.should_multipart(1), Ok(false));
1211        assert_eq!(
1212            config.should_multipart(MultipartConfig::DEFAULT_MULTIPART_THRESHOLD),
1213            Ok(false)
1214        );
1215        assert_eq!(
1216            config.should_multipart(MultipartConfig::DEFAULT_MULTIPART_THRESHOLD + 1),
1217            Ok(true)
1218        );
1219        assert_eq!(
1220            config.should_multipart(MultipartConfig::DEFAULT_MULTIPART_THRESHOLD * 2),
1221            Ok(true)
1222        );
1223        assert_eq!(
1224            config.should_multipart(MultipartConfig::MAX_SINGLE_UPLOAD_SIZE),
1225            Ok(true)
1226        );
1227        assert_eq!(
1228            config.should_multipart(MultipartConfig::MAX_SINGLE_UPLOAD_SIZE + 1),
1229            Err(
1230                "S3 does not support blobs larger than 5497558138880 bytes got: 5497558138881"
1231                    .into()
1232            )
1233        );
1234    }
1235
1236    #[mz_ore::test]
1237    fn multipart_iter() {
1238        let iter = MultipartChunkIter::new(10, 0);
1239        assert_eq!(iter.collect::<Vec<_>>(), vec![]);
1240
1241        let iter = MultipartChunkIter::new(10, 9);
1242        assert_eq!(iter.collect::<Vec<_>>(), vec![(1, 0..9)]);
1243
1244        let iter = MultipartChunkIter::new(10, 10);
1245        assert_eq!(iter.collect::<Vec<_>>(), vec![(1, 0..10)]);
1246
1247        let iter = MultipartChunkIter::new(10, 11);
1248        assert_eq!(iter.collect::<Vec<_>>(), vec![(1, 0..10), (2, 10..11)]);
1249
1250        let iter = MultipartChunkIter::new(10, 19);
1251        assert_eq!(iter.collect::<Vec<_>>(), vec![(1, 0..10), (2, 10..19)]);
1252
1253        let iter = MultipartChunkIter::new(10, 20);
1254        assert_eq!(iter.collect::<Vec<_>>(), vec![(1, 0..10), (2, 10..20)]);
1255
1256        let iter = MultipartChunkIter::new(10, 21);
1257        assert_eq!(
1258            iter.collect::<Vec<_>>(),
1259            vec![(1, 0..10), (2, 10..20), (3, 20..21)]
1260        );
1261    }
1262}