1use std::cmp;
13use std::fmt::{Debug, Formatter};
14use std::ops::Range;
15use std::sync::Arc;
16use std::sync::atomic::{self, AtomicU64};
17use std::time::{Duration, Instant};
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use aws_config::sts::AssumeRoleProvider;
22use aws_config::timeout::TimeoutConfig;
23use aws_credential_types::Credentials;
24use aws_sdk_s3::Client as S3Client;
25use aws_sdk_s3::config::{AsyncSleep, Sleep};
26use aws_sdk_s3::error::{ProvideErrorMetadata, SdkError};
27use aws_sdk_s3::primitives::ByteStream;
28use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
29use aws_types::region::Region;
30use bytes::Bytes;
31use futures_util::stream::FuturesOrdered;
32use futures_util::{FutureExt, StreamExt};
33use mz_dyncfg::{Config, ConfigSet};
34use mz_ore::bytes::SegmentedBytes;
35use mz_ore::cast::CastFrom;
36use mz_ore::lgbytes::MetricsRegion;
37use mz_ore::metrics::MetricsRegistry;
38use mz_ore::task::RuntimeExt;
39use tokio::runtime::Handle as AsyncHandle;
40use tracing::{Instrument, debug, debug_span, trace, trace_span};
41use uuid::Uuid;
42
43use crate::cfg::BlobKnobs;
44use crate::error::Error;
45use crate::location::{Blob, BlobMetadata, Determinate, ExternalError};
46use crate::metrics::S3BlobMetrics;
47
48#[derive(Clone, Debug)]
50pub struct S3BlobConfig {
51 metrics: S3BlobMetrics,
52 client: S3Client,
53 bucket: String,
54 prefix: String,
55 cfg: Arc<ConfigSet>,
56 is_cc_active: bool,
57}
58
59const OPERATION_TIMEOUT_MARKER: Duration = Duration::new(111, 1111);
64const OPERATION_ATTEMPT_TIMEOUT_MARKER: Duration = Duration::new(222, 2222);
65const CONNECT_TIMEOUT_MARKER: Duration = Duration::new(333, 3333);
66const READ_TIMEOUT_MARKER: Duration = Duration::new(444, 4444);
67
68#[derive(Debug)]
69struct MetricsSleep {
70 knobs: Box<dyn BlobKnobs>,
71 metrics: S3BlobMetrics,
72}
73
74impl AsyncSleep for MetricsSleep {
75 fn sleep(&self, duration: Duration) -> Sleep {
76 let (duration, metric) = match duration {
77 OPERATION_TIMEOUT_MARKER => (
78 self.knobs.operation_timeout(),
79 Some(self.metrics.operation_timeouts.clone()),
80 ),
81 OPERATION_ATTEMPT_TIMEOUT_MARKER => (
82 self.knobs.operation_attempt_timeout(),
83 Some(self.metrics.operation_attempt_timeouts.clone()),
84 ),
85 CONNECT_TIMEOUT_MARKER => (
86 self.knobs.connect_timeout(),
87 Some(self.metrics.connect_timeouts.clone()),
88 ),
89 READ_TIMEOUT_MARKER => (
90 self.knobs.read_timeout(),
91 Some(self.metrics.read_timeouts.clone()),
92 ),
93 duration => (duration, None),
94 };
95
96 Sleep::new(tokio::time::sleep(duration).map(|x| {
101 if let Some(counter) = metric {
102 counter.inc();
103 }
104 x
105 }))
106 }
107}
108
109impl S3BlobConfig {
110 const EXTERNAL_TESTS_S3_BUCKET: &'static str = "MZ_PERSIST_EXTERNAL_STORAGE_TEST_S3_BUCKET";
111
112 pub async fn new(
118 bucket: String,
119 prefix: String,
120 role_arn: Option<String>,
121 endpoint: Option<String>,
122 region: Option<String>,
123 credentials: Option<(String, String)>,
124 knobs: Box<dyn BlobKnobs>,
125 metrics: S3BlobMetrics,
126 cfg: Arc<ConfigSet>,
127 ) -> Result<Self, Error> {
128 let is_cc_active = knobs.is_cc_active();
129 let mut loader = mz_aws_util::defaults();
130
131 if let Some(region) = region {
132 loader = loader.region(Region::new(region));
133 };
134
135 if let Some(role_arn) = role_arn {
136 let assume_role_sdk_config = mz_aws_util::defaults().load().await;
137 let role_provider = AssumeRoleProvider::builder(role_arn)
138 .configure(&assume_role_sdk_config)
139 .session_name("persist")
140 .build()
141 .await;
142 loader = loader.credentials_provider(role_provider);
143 }
144
145 if let Some((access_key_id, secret_access_key)) = credentials {
146 loader = loader.credentials_provider(Credentials::from_keys(
147 access_key_id,
148 secret_access_key,
149 None,
150 ));
151 }
152
153 if let Some(endpoint) = endpoint {
154 loader = loader.endpoint_url(endpoint)
155 }
156
157 loader = loader.sleep_impl(MetricsSleep {
159 knobs,
160 metrics: metrics.clone(),
161 });
162 loader = loader.timeout_config(
163 TimeoutConfig::builder()
164 .operation_timeout(OPERATION_TIMEOUT_MARKER)
166 .operation_attempt_timeout(OPERATION_ATTEMPT_TIMEOUT_MARKER)
168 .connect_timeout(CONNECT_TIMEOUT_MARKER)
170 .read_timeout(READ_TIMEOUT_MARKER)
172 .build(),
173 );
174
175 let client = mz_aws_util::s3::new_client(&loader.load().await);
176 Ok(S3BlobConfig {
177 metrics,
178 client,
179 bucket,
180 prefix,
181 cfg,
182 is_cc_active,
183 })
184 }
185
186 pub async fn new_for_test() -> Result<Option<Self>, Error> {
223 let bucket = match std::env::var(Self::EXTERNAL_TESTS_S3_BUCKET) {
224 Ok(bucket) => bucket,
225 Err(_) => {
226 if mz_ore::env::is_var_truthy("CI") {
227 panic!("CI is supposed to run this test but something has gone wrong!");
228 }
229 return Ok(None);
230 }
231 };
232
233 struct TestBlobKnobs;
234 impl std::fmt::Debug for TestBlobKnobs {
235 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
236 f.debug_struct("TestBlobKnobs").finish_non_exhaustive()
237 }
238 }
239 impl BlobKnobs for TestBlobKnobs {
240 fn operation_timeout(&self) -> Duration {
241 OPERATION_TIMEOUT_MARKER
242 }
243
244 fn operation_attempt_timeout(&self) -> Duration {
245 OPERATION_ATTEMPT_TIMEOUT_MARKER
246 }
247
248 fn connect_timeout(&self) -> Duration {
249 CONNECT_TIMEOUT_MARKER
250 }
251
252 fn read_timeout(&self) -> Duration {
253 READ_TIMEOUT_MARKER
254 }
255
256 fn is_cc_active(&self) -> bool {
257 false
258 }
259 }
260
261 let prefix = Uuid::new_v4().to_string();
265 let role_arn = None;
266 let metrics = S3BlobMetrics::new(&MetricsRegistry::new());
267 let config = S3BlobConfig::new(
268 bucket,
269 prefix,
270 role_arn,
271 None,
272 None,
273 None,
274 Box::new(TestBlobKnobs),
275 metrics,
276 Arc::new(
277 ConfigSet::default()
278 .add(&ENABLE_S3_LGALLOC_CC_SIZES)
279 .add(&ENABLE_S3_LGALLOC_NONCC_SIZES),
280 ),
281 )
282 .await?;
283 Ok(Some(config))
284 }
285
286 pub fn clone_with_new_uuid_prefix(&self) -> Self {
288 let mut ret = self.clone();
289 ret.prefix = Uuid::new_v4().to_string();
290 ret
291 }
292}
293
294#[derive(Debug)]
296pub struct S3Blob {
297 metrics: S3BlobMetrics,
298 client: S3Client,
299 bucket: String,
300 prefix: String,
301 max_keys: i32,
305 multipart_config: MultipartConfig,
306 cfg: Arc<ConfigSet>,
307 is_cc_active: bool,
308}
309
310impl S3Blob {
311 pub async fn open(config: S3BlobConfig) -> Result<Self, ExternalError> {
313 let ret = S3Blob {
314 metrics: config.metrics,
315 client: config.client,
316 bucket: config.bucket,
317 prefix: config.prefix,
318 max_keys: 1_000,
319 multipart_config: MultipartConfig::default(),
320 cfg: config.cfg,
321 is_cc_active: config.is_cc_active,
322 };
323 let _ = ret.get("HEALTH_CHECK").await?;
327 Ok(ret)
328 }
329
330 fn get_path(&self, key: &str) -> String {
331 format!("{}/{}", self.prefix, key)
332 }
333}
334
335pub(crate) const ENABLE_S3_LGALLOC_CC_SIZES: Config<bool> = Config::new(
336 "persist_enable_s3_lgalloc_cc_sizes",
337 true,
338 "An incident flag to disable copying fetched s3 data into lgalloc on cc sized clusters.",
339);
340
341pub(crate) const ENABLE_S3_LGALLOC_NONCC_SIZES: Config<bool> = Config::new(
342 "persist_enable_s3_lgalloc_noncc_sizes",
343 false,
344 "A feature flag to enable copying fetched s3 data into lgalloc on non-cc sized clusters.",
345);
346
347#[async_trait]
348impl Blob for S3Blob {
349 async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
350 let start_overall = Instant::now();
351 let path = self.get_path(key);
352
353 let min_body_elapsed = Arc::new(MinElapsed::default());
388 let min_header_elapsed = Arc::new(MinElapsed::default());
389 self.metrics.get_part.inc();
390
391 let header_start = Instant::now();
393 let object = self
394 .client
395 .get_object()
396 .bucket(&self.bucket)
397 .key(&path)
398 .part_number(1)
399 .send()
400 .await;
401 let elapsed = header_start.elapsed();
402 min_header_elapsed.observe(elapsed, "s3 download first part header");
403
404 let first_part = match object {
405 Ok(object) => object,
406 Err(SdkError::ServiceError(err)) if err.err().is_no_such_key() => return Ok(None),
407 Err(err) => {
408 self.update_error_metrics("GetObject", &err);
409 Err(anyhow!(err).context("s3 get meta err"))?
410 }
411 };
412
413 let num_parts = match first_part.parts_count() {
415 None => 1,
418 Some(parts @ 1..) => parts,
420 Some(bad) => {
422 assert!(bad <= 0);
423 return Err(anyhow!("unexpected number of s3 object parts: {}", bad).into());
424 }
425 };
426
427 trace!(
428 "s3 download first header took {:?} ({num_parts} parts)",
429 start_overall.elapsed(),
430 );
431
432 let mut body_futures = FuturesOrdered::new();
433 let mut first_part = Some(first_part);
434
435 for part_num in 1..=num_parts {
438 let min_header_elapsed = Arc::clone(&min_header_elapsed);
441 let min_body_elapsed = Arc::clone(&min_body_elapsed);
442 let get_invalid_resp = self.metrics.get_invalid_resp.clone();
443 let first_part = first_part.take();
444 let path = &path;
445 let request_future = async move {
446 let mut object = match first_part {
449 Some(first_part) => {
450 assert_eq!(part_num, 1, "only the first part should be prefetched");
451 first_part
452 }
453 None => {
454 assert_ne!(part_num, 1, "first part should be prefetched");
455 let header_start = Instant::now();
457 let object = self
458 .client
459 .get_object()
460 .bucket(&self.bucket)
461 .key(path)
462 .part_number(part_num)
463 .send()
464 .await
465 .inspect_err(|err| self.update_error_metrics("GetObject", err))
466 .context("s3 get meta err")?;
467 min_header_elapsed
468 .observe(header_start.elapsed(), "s3 download part header");
469 object
470 }
471 };
472
473 let body_start = Instant::now();
475 let mut body_parts: Vec<Bytes> = Vec::new();
476
477 let enable_s3_lgalloc = if self.is_cc_active {
480 ENABLE_S3_LGALLOC_CC_SIZES.get(&self.cfg)
481 } else {
482 ENABLE_S3_LGALLOC_NONCC_SIZES.get(&self.cfg)
483 };
484
485 let mut buffer = match object.content_length() {
487 Some(len @ 1..) => {
488 let len: u64 = len.try_into().expect("positive integer");
489 let buf: MetricsRegion<u8> = self
492 .metrics
493 .lgbytes
494 .persist_s3
495 .new_region(usize::cast_from(len));
496 Some(buf)
497 }
498 Some(len @ ..=-1) => {
500 tracing::trace!(?len, "found invalid content-length, falling back");
501 get_invalid_resp.inc();
502 None
503 }
504 Some(0) | None => None,
505 };
506
507 while let Some(data) = object.body.next().await {
508 let data = data.context("s3 get body err")?;
509 match &mut buffer {
510 Some(buf) => buf.extend_from_slice(&data[..]),
512 None if enable_s3_lgalloc => {
514 body_parts.push(self.metrics.lgbytes.persist_s3.try_mmap_bytes(data));
515 }
516 None => {
518 body_parts.push(data);
526 }
527 }
528 }
529
530 if let Some(body) = buffer {
532 assert!(body_parts.is_empty());
535 body_parts.push(body.into());
536 }
537
538 let body_elapsed = body_start.elapsed();
539 min_body_elapsed.observe(body_elapsed, "s3 download part body");
540
541 Ok::<_, anyhow::Error>(body_parts)
542 };
543
544 body_futures.push_back(request_future);
545 }
546
547 let mut segments = vec![];
549 while let Some(result) = body_futures.next().await {
550 let mut part_body = result
552 .inspect_err(|e| {
553 self.metrics
554 .error_counts
555 .with_label_values(&["GetObjectStream", e.to_string().as_str()])
556 .inc()
557 })
558 .context("s3 get body err")?;
559
560 segments.append(&mut part_body);
562 }
563
564 debug!(
565 "s3 GetObject took {:?} ({} parts)",
566 start_overall.elapsed(),
567 num_parts
568 );
569 Ok(Some(SegmentedBytes::from(segments)))
570 }
571
572 async fn list_keys_and_metadata(
573 &self,
574 key_prefix: &str,
575 f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
576 ) -> Result<(), ExternalError> {
577 let mut continuation_token = None;
578 let blob_key_prefix = self.get_path(key_prefix);
580 let strippable_root_prefix = format!("{}/", self.prefix);
583
584 loop {
585 self.metrics.list_objects.inc();
586 let resp = self
587 .client
588 .list_objects_v2()
589 .bucket(&self.bucket)
590 .prefix(&blob_key_prefix)
591 .max_keys(self.max_keys)
592 .set_continuation_token(continuation_token)
593 .send()
594 .await
595 .inspect_err(|err| self.update_error_metrics("ListObjectsV2", err))
596 .context("list bucket error")?;
597 if let Some(contents) = resp.contents {
598 for object in contents.iter() {
599 if let Some(key) = object.key.as_ref() {
600 if let Some(key) = key.strip_prefix(&strippable_root_prefix) {
601 let size_in_bytes = match object.size {
602 None => {
603 return Err(ExternalError::from(anyhow!(
604 "object missing size: {key}"
605 )));
606 }
607 Some(size) => size
608 .try_into()
609 .expect("file in S3 cannot have negative size"),
610 };
611 f(BlobMetadata { key, size_in_bytes });
612 } else {
613 return Err(ExternalError::from(anyhow!(
614 "found key with invalid prefix: {}",
615 key
616 )));
617 }
618 }
619 }
620 }
621
622 if resp.next_continuation_token.is_some() {
623 continuation_token = resp.next_continuation_token;
624 } else {
625 break;
626 }
627 }
628
629 Ok(())
630 }
631
632 async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
633 let value_len = value.len();
634 if self
635 .multipart_config
636 .should_multipart(value_len)
637 .map_err(anyhow::Error::msg)?
638 {
639 self.set_multi_part(key, value)
640 .instrument(debug_span!("s3set_multi", payload_len = value_len))
641 .await
642 } else {
643 self.set_single_part(key, value).await
644 }
645 }
646
647 async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
648 let path = self.get_path(key);
653 self.metrics.delete_head.inc();
654 let head_res = self
655 .client
656 .head_object()
657 .bucket(&self.bucket)
658 .key(&path)
659 .send()
660 .await;
661 let size_bytes = match head_res {
662 Ok(x) => match x.content_length {
663 None => {
664 return Err(ExternalError::from(anyhow!(
665 "s3 delete content length was none"
666 )));
667 }
668 Some(content_length) => {
669 u64::try_from(content_length).expect("file in S3 cannot have negative size")
670 }
671 },
672 Err(SdkError::ServiceError(err)) if err.err().is_not_found() => return Ok(None),
673 Err(err) => {
674 self.update_error_metrics("HeadObject", &err);
675 return Err(ExternalError::from(
676 anyhow!(err).context("s3 delete head err"),
677 ));
678 }
679 };
680 self.metrics.delete_object.inc();
681 let _ = self
682 .client
683 .delete_object()
684 .bucket(&self.bucket)
685 .key(&path)
686 .send()
687 .await
688 .inspect_err(|err| self.update_error_metrics("DeleteObject", err))
689 .context("s3 delete object err")?;
690 Ok(Some(usize::cast_from(size_bytes)))
691 }
692
693 async fn restore(&self, key: &str) -> Result<(), ExternalError> {
694 let path = self.get_path(key);
695 loop {
700 let list_res = self
704 .client
705 .list_object_versions()
706 .bucket(&self.bucket)
707 .prefix(&path)
708 .max_keys(1)
709 .send()
710 .await
711 .inspect_err(|err| self.update_error_metrics("ListObjectVersions", err))
712 .context("listing object versions during restore")?;
713
714 let current_delete = list_res
715 .delete_markers()
716 .into_iter()
717 .filter(|d| {
718 d.key() == Some(path.as_str())
721 })
722 .find(|d| d.is_latest().unwrap_or(false))
723 .and_then(|d| d.version_id());
724
725 if let Some(version) = current_delete {
726 let deleted = self
727 .client
728 .delete_object()
729 .bucket(&self.bucket)
730 .key(&path)
731 .version_id(version)
732 .send()
733 .await
734 .inspect_err(|err| self.update_error_metrics("DeleteObject", err))
735 .context("deleting a delete marker")?;
736 assert!(
737 deleted.delete_marker().unwrap_or(false),
738 "deleting a delete marker"
739 );
740 } else {
741 let has_current_version = list_res
742 .versions()
743 .into_iter()
744 .filter(|d| d.key() == Some(path.as_str()))
745 .any(|v| v.is_latest().unwrap_or(false));
746
747 if !has_current_version {
748 return Err(Determinate::new(anyhow!(
749 "unable to restore {key} in s3: no valid version exists"
750 ))
751 .into());
752 }
753 return Ok(());
754 }
755 }
756 }
757}
758
759impl S3Blob {
760 async fn set_single_part(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
761 let start_overall = Instant::now();
762 let path = self.get_path(key);
763
764 let value_len = value.len();
765 let part_span = trace_span!("s3set_single", payload_len = value_len);
766 self.metrics.set_single.inc();
767 self.client
768 .put_object()
769 .bucket(&self.bucket)
770 .key(path)
771 .body(ByteStream::from(value))
772 .send()
773 .instrument(part_span)
774 .await
775 .inspect_err(|err| self.update_error_metrics("PutObject", err))
776 .context("set single part")?;
777 debug!(
778 "s3 PutObject single done {}b / {:?}",
779 value_len,
780 start_overall.elapsed()
781 );
782 Ok(())
783 }
784
785 #[allow(clippy::as_conversions)]
788 async fn set_multi_part(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
789 let start_overall = Instant::now();
790 let path = self.get_path(key);
791
792 trace!("s3 PutObject multi start {}b", value.len());
794 self.metrics.set_multi_create.inc();
795 let upload_res = self
796 .client
797 .create_multipart_upload()
798 .bucket(&self.bucket)
799 .key(&path)
800 .customize()
801 .mutate_request(|req| {
802 req.headers_mut().insert("Content-Length", "0");
807 })
808 .send()
809 .instrument(debug_span!("s3set_multi_start"))
810 .await
811 .inspect_err(|err| self.update_error_metrics("CreateMultipartUpload", err))
812 .context("create_multipart_upload err")?;
813 let upload_id = upload_res
814 .upload_id()
815 .ok_or_else(|| anyhow!("create_multipart_upload response missing upload_id"))?;
816 trace!(
817 "s3 create_multipart_upload took {:?}",
818 start_overall.elapsed()
819 );
820
821 let async_runtime = AsyncHandle::try_current().map_err(anyhow::Error::new)?;
822
823 let start_parts = Instant::now();
828 let mut part_futs = Vec::new();
829 for (part_num, part_range) in self.multipart_config.part_iter(value.len()) {
830 let part_span = debug_span!("s3set_multi_part", payload_len = part_range.len());
834 let part_fut = async_runtime.spawn_named(
835 || "persist_s3blob_put_part",
838 {
839 self.metrics.set_multi_part.inc();
840 self.client
841 .upload_part()
842 .bucket(&self.bucket)
843 .key(&path)
844 .upload_id(upload_id)
845 .part_number(part_num as i32)
846 .body(ByteStream::from(value.slice(part_range)))
847 .send()
848 .instrument(part_span)
849 .map(move |res| (start_parts.elapsed(), res))
850 },
851 );
852 part_futs.push((part_num, part_fut));
853 }
854 let parts_len = part_futs.len();
855
856 let min_part_elapsed = MinElapsed::default();
864 let mut parts = Vec::with_capacity(parts_len);
865 for (part_num, part_fut) in part_futs.into_iter() {
866 let (this_part_elapsed, part_res) = part_fut
867 .await
868 .inspect(|_| {
869 self.metrics
870 .error_counts
871 .with_label_values(&["UploadPart", "AsyncSpawnError"])
872 .inc()
873 })
874 .map_err(|err| anyhow!(err).context("s3 spawn err"))?;
875 let part_res = part_res
876 .inspect_err(|err| self.update_error_metrics("UploadPart", err))
877 .context("s3 upload_part err")?;
878 let part_e_tag = part_res.e_tag().ok_or_else(|| {
879 self.metrics
880 .error_counts
881 .with_label_values(&["UploadPart", "MissingEtag"])
882 .inc();
883 anyhow!("s3 upload part missing e_tag")
884 })?;
885 parts.push(
886 CompletedPart::builder()
887 .e_tag(part_e_tag)
888 .part_number(part_num as i32)
889 .build(),
890 );
891 min_part_elapsed.observe(this_part_elapsed, "s3 upload_part took");
892 }
893 trace!(
894 "s3 upload_parts overall took {:?} ({} parts)",
895 start_parts.elapsed(),
896 parts_len
897 );
898
899 let start_complete = Instant::now();
911 self.metrics.set_multi_complete.inc();
912 self.client
913 .complete_multipart_upload()
914 .bucket(&self.bucket)
915 .key(&path)
916 .upload_id(upload_id)
917 .multipart_upload(
918 CompletedMultipartUpload::builder()
919 .set_parts(Some(parts))
920 .build(),
921 )
922 .send()
923 .instrument(debug_span!("s3set_multi_complete", num_parts = parts_len))
924 .await
925 .inspect_err(|err| self.update_error_metrics("CompleteMultipartUpload", err))
926 .context("complete_multipart_upload err")?;
927 trace!(
928 "s3 complete_multipart_upload took {:?}",
929 start_complete.elapsed()
930 );
931
932 debug!(
933 "s3 PutObject multi done {}b / {:?} ({} parts)",
934 value.len(),
935 start_overall.elapsed(),
936 parts_len
937 );
938 Ok(())
939 }
940
941 fn update_error_metrics<E, R>(&self, op: &str, err: &SdkError<E, R>)
942 where
943 E: ProvideErrorMetadata,
944 {
945 let code = match err {
946 SdkError::ServiceError(e) => match e.err().code() {
947 Some(code) => code,
948 None => "UnknownServiceError",
949 },
950 SdkError::DispatchFailure(e) => {
951 if let Some(other_error) = e.as_other() {
952 match other_error {
953 aws_config::retry::ErrorKind::TransientError => "TransientError",
954 aws_config::retry::ErrorKind::ThrottlingError => "ThrottlingError",
955 aws_config::retry::ErrorKind::ServerError => "ServerError",
956 aws_config::retry::ErrorKind::ClientError => "ClientError",
957 _ => "UnknownDispatchFailure",
958 }
959 } else if e.is_timeout() {
960 "TimeoutError"
961 } else if e.is_io() {
962 "IOError"
963 } else if e.is_user() {
964 "UserError"
965 } else {
966 "UnknownDispathFailure"
967 }
968 }
969 SdkError::ResponseError(_) => "ResponseError",
970 SdkError::ConstructionFailure(_) => "ConstructionFailure",
971 SdkError::TimeoutError(_) => "TimeoutError",
974 _ => "UnknownSdkError",
976 };
977 self.metrics
978 .error_counts
979 .with_label_values(&[op, code])
980 .inc();
981 }
982}
983
984#[derive(Clone, Debug)]
985struct MultipartConfig {
986 multipart_threshold: usize,
987 multipart_chunk_size: usize,
988}
989
990impl Default for MultipartConfig {
991 fn default() -> Self {
992 Self {
993 multipart_threshold: Self::DEFAULT_MULTIPART_THRESHOLD,
994 multipart_chunk_size: Self::DEFAULT_MULTIPART_CHUNK_SIZE,
995 }
996 }
997}
998
999const MB: usize = 1024 * 1024;
1000const TB: usize = 1024 * 1024 * MB;
1001
1002impl MultipartConfig {
1003 const DEFAULT_MULTIPART_THRESHOLD: usize = 8 * MB;
1009 const DEFAULT_MULTIPART_CHUNK_SIZE: usize = 8 * MB;
1015
1016 const MAX_SINGLE_UPLOAD_SIZE: usize = 5 * TB;
1020 const MIN_UPLOAD_CHUNK_SIZE: usize = 5 * MB;
1026 const MIN_PART_NUM: u32 = 1;
1030 const MAX_PART_NUM: u32 = 10_000;
1034
1035 fn should_multipart(&self, blob_len: usize) -> Result<bool, String> {
1036 if blob_len > Self::MAX_SINGLE_UPLOAD_SIZE {
1037 return Err(format!(
1038 "S3 does not support blobs larger than {} bytes got: {}",
1039 Self::MAX_SINGLE_UPLOAD_SIZE,
1040 blob_len
1041 ));
1042 }
1043 Ok(blob_len > self.multipart_threshold)
1044 }
1045
1046 fn part_iter(&self, blob_len: usize) -> MultipartChunkIter {
1047 debug_assert!(self.multipart_chunk_size >= MultipartConfig::MIN_UPLOAD_CHUNK_SIZE);
1048 MultipartChunkIter::new(self.multipart_chunk_size, blob_len)
1049 }
1050}
1051
1052#[derive(Clone, Debug)]
1053struct MultipartChunkIter {
1054 total_len: usize,
1055 part_size: usize,
1056 part_idx: u32,
1057}
1058
1059impl MultipartChunkIter {
1060 fn new(default_part_size: usize, blob_len: usize) -> Self {
1061 let max_parts: usize = usize::cast_from(MultipartConfig::MAX_PART_NUM);
1062
1063 let min_part_size = (blob_len + max_parts - 1) / max_parts;
1068 let part_size = cmp::max(min_part_size, default_part_size);
1069
1070 let part_idx = MultipartConfig::MIN_PART_NUM - 1;
1073 MultipartChunkIter {
1074 total_len: blob_len,
1075 part_size,
1076 part_idx,
1077 }
1078 }
1079}
1080
1081impl Iterator for MultipartChunkIter {
1082 type Item = (u32, Range<usize>);
1083
1084 fn next(&mut self) -> Option<Self::Item> {
1085 let part_idx = self.part_idx;
1086 self.part_idx += 1;
1087
1088 let start = usize::cast_from(part_idx) * self.part_size;
1089 if start >= self.total_len {
1090 return None;
1091 }
1092 let end = cmp::min(start + self.part_size, self.total_len);
1093 let part_num = part_idx + 1;
1094 Some((part_num, start..end))
1095 }
1096}
1097
1098#[derive(Debug)]
1100struct MinElapsed {
1101 min: AtomicU64,
1102 alert_factor: u64,
1103}
1104
1105impl Default for MinElapsed {
1106 fn default() -> Self {
1107 MinElapsed {
1108 min: AtomicU64::new(u64::MAX),
1109 alert_factor: 8,
1110 }
1111 }
1112}
1113
1114impl MinElapsed {
1115 fn observe(&self, x: Duration, msg: &'static str) {
1116 let nanos = x.as_nanos();
1117 let nanos = u64::try_from(nanos).unwrap_or(u64::MAX);
1118
1119 let prev_min = self.min.fetch_min(nanos, atomic::Ordering::SeqCst);
1121
1122 let new_min = std::cmp::min(prev_min, nanos);
1124 if nanos > new_min.saturating_mul(self.alert_factor) {
1125 let min_duration = Duration::from_nanos(new_min);
1126 let factor = self.alert_factor;
1127 debug!("{msg} took {x:?} more than {factor}x the min {min_duration:?}");
1128 } else {
1129 trace!("{msg} took {x:?}");
1130 }
1131 }
1132}
1133
1134#[allow(dead_code)]
1139fn openssl_sys_hack() {
1140 openssl_sys::init();
1141}
1142
1143#[cfg(test)]
1144mod tests {
1145 use tracing::info;
1146
1147 use crate::location::tests::blob_impl_test;
1148
1149 use super::*;
1150
1151 #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
1152 #[cfg_attr(coverage, ignore)] #[cfg_attr(miri, ignore)] #[ignore] async fn s3_blob() -> Result<(), ExternalError> {
1156 let config = match S3BlobConfig::new_for_test().await? {
1157 Some(client) => client,
1158 None => {
1159 info!(
1160 "{} env not set: skipping test that uses external service",
1161 S3BlobConfig::EXTERNAL_TESTS_S3_BUCKET
1162 );
1163 return Ok(());
1164 }
1165 };
1166 let config_multipart = config.clone_with_new_uuid_prefix();
1167
1168 blob_impl_test(move |path| {
1169 let path = path.to_owned();
1170 let config = config.clone();
1171 async move {
1172 let config = S3BlobConfig {
1173 metrics: config.metrics.clone(),
1174 client: config.client.clone(),
1175 bucket: config.bucket.clone(),
1176 prefix: format!("{}/s3_blob_impl_test/{}", config.prefix, path),
1177 cfg: Arc::new(
1178 ConfigSet::default()
1179 .add(&ENABLE_S3_LGALLOC_CC_SIZES)
1180 .add(&ENABLE_S3_LGALLOC_NONCC_SIZES),
1181 ),
1182 is_cc_active: true,
1183 };
1184 let mut blob = S3Blob::open(config).await?;
1185 blob.max_keys = 2;
1186 Ok(blob)
1187 }
1188 })
1189 .await?;
1190
1191 {
1195 let blob = S3Blob::open(config_multipart).await?;
1196 blob.set_multi_part("multipart", "foobar".into()).await?;
1197 assert_eq!(
1198 blob.get("multipart").await?,
1199 Some(b"foobar".to_vec().into())
1200 );
1201 }
1202
1203 Ok(())
1204 }
1205
1206 #[mz_ore::test]
1207 fn should_multipart() {
1208 let config = MultipartConfig::default();
1209 assert_eq!(config.should_multipart(0), Ok(false));
1210 assert_eq!(config.should_multipart(1), Ok(false));
1211 assert_eq!(
1212 config.should_multipart(MultipartConfig::DEFAULT_MULTIPART_THRESHOLD),
1213 Ok(false)
1214 );
1215 assert_eq!(
1216 config.should_multipart(MultipartConfig::DEFAULT_MULTIPART_THRESHOLD + 1),
1217 Ok(true)
1218 );
1219 assert_eq!(
1220 config.should_multipart(MultipartConfig::DEFAULT_MULTIPART_THRESHOLD * 2),
1221 Ok(true)
1222 );
1223 assert_eq!(
1224 config.should_multipart(MultipartConfig::MAX_SINGLE_UPLOAD_SIZE),
1225 Ok(true)
1226 );
1227 assert_eq!(
1228 config.should_multipart(MultipartConfig::MAX_SINGLE_UPLOAD_SIZE + 1),
1229 Err(
1230 "S3 does not support blobs larger than 5497558138880 bytes got: 5497558138881"
1231 .into()
1232 )
1233 );
1234 }
1235
1236 #[mz_ore::test]
1237 fn multipart_iter() {
1238 let iter = MultipartChunkIter::new(10, 0);
1239 assert_eq!(iter.collect::<Vec<_>>(), vec![]);
1240
1241 let iter = MultipartChunkIter::new(10, 9);
1242 assert_eq!(iter.collect::<Vec<_>>(), vec![(1, 0..9)]);
1243
1244 let iter = MultipartChunkIter::new(10, 10);
1245 assert_eq!(iter.collect::<Vec<_>>(), vec![(1, 0..10)]);
1246
1247 let iter = MultipartChunkIter::new(10, 11);
1248 assert_eq!(iter.collect::<Vec<_>>(), vec![(1, 0..10), (2, 10..11)]);
1249
1250 let iter = MultipartChunkIter::new(10, 19);
1251 assert_eq!(iter.collect::<Vec<_>>(), vec![(1, 0..10), (2, 10..19)]);
1252
1253 let iter = MultipartChunkIter::new(10, 20);
1254 assert_eq!(iter.collect::<Vec<_>>(), vec![(1, 0..10), (2, 10..20)]);
1255
1256 let iter = MultipartChunkIter::new(10, 21);
1257 assert_eq!(
1258 iter.collect::<Vec<_>>(),
1259 vec![(1, 0..10), (2, 10..20), (3, 20..21)]
1260 );
1261 }
1262}