1use std::collections::HashMap;
19use std::fmt::Debug;
20use std::fmt::Write;
21use std::str::FromStr;
22use std::sync::Arc;
23use std::sync::LazyLock;
24use std::sync::atomic::AtomicBool;
25
26use base64::Engine;
27use base64::prelude::BASE64_STANDARD;
28use constants::X_AMZ_META_PREFIX;
29use constants::X_AMZ_VERSION_ID;
30use http::Response;
31use http::StatusCode;
32use log::debug;
33use log::warn;
34use md5::Digest;
35use md5::Md5;
36use reqsign::AwsAssumeRoleLoader;
37use reqsign::AwsConfig;
38use reqsign::AwsCredentialLoad;
39use reqsign::AwsDefaultLoader;
40use reqsign::AwsV4Signer;
41use reqwest::Url;
42
43use super::S3_SCHEME;
44use super::config::S3Config;
45use super::core::*;
46use super::deleter::S3Deleter;
47use super::error::parse_error;
48use super::lister::S3ListerV1;
49use super::lister::S3ListerV2;
50use super::lister::S3Listers;
51use super::lister::S3ObjectVersionsLister;
52use super::writer::S3Writer;
53use super::writer::S3Writers;
54use crate::raw::*;
55use crate::*;
56
57static ENDPOINT_TEMPLATES: LazyLock<HashMap<&'static str, &'static str>> = LazyLock::new(|| {
59 let mut m = HashMap::new();
60 m.insert(
62 "https://s3.amazonaws.com",
63 "https://s3.{region}.amazonaws.com",
64 );
65 m
66});
67
68const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
69
70#[doc = include_str!("docs.md")]
73#[doc = include_str!("compatible_services.md")]
74#[derive(Default)]
75pub struct S3Builder {
76 pub(super) config: S3Config,
77
78 pub(super) customized_credential_load: Option<Box<dyn AwsCredentialLoad>>,
79
80 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
81 pub(super) http_client: Option<HttpClient>,
82}
83
84impl Debug for S3Builder {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 f.debug_struct("S3Builder")
87 .field("config", &self.config)
88 .finish_non_exhaustive()
89 }
90}
91
92impl S3Builder {
93 pub fn root(mut self, root: &str) -> Self {
97 self.config.root = if root.is_empty() {
98 None
99 } else {
100 Some(root.to_string())
101 };
102
103 self
104 }
105
106 pub fn bucket(mut self, bucket: &str) -> Self {
108 self.config.bucket = bucket.to_string();
109
110 self
111 }
112
113 pub fn endpoint(mut self, endpoint: &str) -> Self {
126 if !endpoint.is_empty() {
127 self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string())
129 }
130
131 self
132 }
133
134 pub fn region(mut self, region: &str) -> Self {
141 if !region.is_empty() {
142 self.config.region = Some(region.to_string())
143 }
144
145 self
146 }
147
148 pub fn access_key_id(mut self, v: &str) -> Self {
153 if !v.is_empty() {
154 self.config.access_key_id = Some(v.to_string())
155 }
156
157 self
158 }
159
160 pub fn secret_access_key(mut self, v: &str) -> Self {
165 if !v.is_empty() {
166 self.config.secret_access_key = Some(v.to_string())
167 }
168
169 self
170 }
171
172 pub fn role_arn(mut self, v: &str) -> Self {
177 if !v.is_empty() {
178 self.config.role_arn = Some(v.to_string())
179 }
180
181 self
182 }
183
184 pub fn external_id(mut self, v: &str) -> Self {
186 if !v.is_empty() {
187 self.config.external_id = Some(v.to_string())
188 }
189
190 self
191 }
192
193 pub fn role_session_name(mut self, v: &str) -> Self {
195 if !v.is_empty() {
196 self.config.role_session_name = Some(v.to_string())
197 }
198
199 self
200 }
201
202 pub fn default_storage_class(mut self, v: &str) -> Self {
215 if !v.is_empty() {
216 self.config.default_storage_class = Some(v.to_string())
217 }
218
219 self
220 }
221
222 pub fn server_side_encryption(mut self, v: &str) -> Self {
233 if !v.is_empty() {
234 self.config.server_side_encryption = Some(v.to_string())
235 }
236
237 self
238 }
239
240 pub fn server_side_encryption_aws_kms_key_id(mut self, v: &str) -> Self {
257 if !v.is_empty() {
258 self.config.server_side_encryption_aws_kms_key_id = Some(v.to_string())
259 }
260
261 self
262 }
263
264 pub fn server_side_encryption_customer_algorithm(mut self, v: &str) -> Self {
275 if !v.is_empty() {
276 self.config.server_side_encryption_customer_algorithm = Some(v.to_string())
277 }
278
279 self
280 }
281
282 pub fn server_side_encryption_customer_key(mut self, v: &str) -> Self {
296 if !v.is_empty() {
297 self.config.server_side_encryption_customer_key = Some(v.to_string())
298 }
299
300 self
301 }
302
303 pub fn server_side_encryption_customer_key_md5(mut self, v: &str) -> Self {
316 if !v.is_empty() {
317 self.config.server_side_encryption_customer_key_md5 = Some(v.to_string())
318 }
319
320 self
321 }
322
323 pub fn server_side_encryption_with_aws_managed_kms_key(mut self) -> Self {
329 self.config.server_side_encryption = Some("aws:kms".to_string());
330 self
331 }
332
333 pub fn server_side_encryption_with_customer_managed_kms_key(
339 mut self,
340 aws_kms_key_id: &str,
341 ) -> Self {
342 self.config.server_side_encryption = Some("aws:kms".to_string());
343 self.config.server_side_encryption_aws_kms_key_id = Some(aws_kms_key_id.to_string());
344 self
345 }
346
347 pub fn server_side_encryption_with_s3_key(mut self) -> Self {
353 self.config.server_side_encryption = Some("AES256".to_string());
354 self
355 }
356
357 pub fn server_side_encryption_with_customer_key(mut self, algorithm: &str, key: &[u8]) -> Self {
363 self.config.server_side_encryption_customer_algorithm = Some(algorithm.to_string());
364 self.config.server_side_encryption_customer_key = Some(BASE64_STANDARD.encode(key));
365 let key_md5 = Md5::digest(key);
366 self.config.server_side_encryption_customer_key_md5 = Some(BASE64_STANDARD.encode(key_md5));
367 self
368 }
369
370 pub fn session_token(mut self, token: &str) -> Self {
376 if !token.is_empty() {
377 self.config.session_token = Some(token.to_string());
378 }
379 self
380 }
381
382 pub fn disable_config_load(mut self) -> Self {
390 self.config.disable_config_load = true;
391 self
392 }
393
394 pub fn disable_list_objects_v2(mut self) -> Self {
400 self.config.disable_list_objects_v2 = true;
401 self
402 }
403
404 pub fn enable_request_payer(mut self) -> Self {
408 self.config.enable_request_payer = true;
409 self
410 }
411
412 pub fn disable_ec2_metadata(mut self) -> Self {
417 self.config.disable_ec2_metadata = true;
418 self
419 }
420
421 pub fn allow_anonymous(mut self) -> Self {
424 self.config.allow_anonymous = true;
425 self
426 }
427
428 pub fn enable_virtual_host_style(mut self) -> Self {
434 self.config.enable_virtual_host_style = true;
435 self
436 }
437
438 pub fn disable_stat_with_override(mut self) -> Self {
442 self.config.disable_stat_with_override = true;
443 self
444 }
445
446 pub fn customized_credential_load(mut self, cred: Box<dyn AwsCredentialLoad>) -> Self {
451 self.customized_credential_load = Some(cred);
452 self
453 }
454
455 #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
462 #[allow(deprecated)]
463 pub fn http_client(mut self, client: HttpClient) -> Self {
464 self.http_client = Some(client);
465 self
466 }
467
468 pub fn enable_versioning(mut self, enabled: bool) -> Self {
470 self.config.enable_versioning = enabled;
471
472 self
473 }
474
475 fn is_bucket_valid(&self) -> bool {
479 if self.config.bucket.is_empty() {
480 return false;
481 }
482 if self.config.enable_virtual_host_style && self.config.bucket.contains('.') {
486 return false;
487 }
488 true
489 }
490
491 fn build_endpoint(&self, region: &str) -> String {
493 let bucket = {
494 debug_assert!(self.is_bucket_valid(), "bucket must be valid");
495
496 self.config.bucket.as_str()
497 };
498
499 let mut endpoint = match &self.config.endpoint {
500 Some(endpoint) => {
501 if endpoint.starts_with("http") {
502 endpoint.to_string()
503 } else {
504 format!("https://{endpoint}")
506 }
507 }
508 None => "https://s3.amazonaws.com".to_string(),
509 };
510
511 endpoint = endpoint.replace(&format!("//{bucket}."), "//");
513
514 if let Ok(url) = Url::from_str(&endpoint) {
516 endpoint = url.to_string().trim_end_matches('/').to_string();
518 }
519
520 endpoint = if let Some(template) = ENDPOINT_TEMPLATES.get(endpoint.as_str()) {
522 template.replace("{region}", region)
523 } else {
524 endpoint.to_string()
527 };
528
529 if self.config.enable_virtual_host_style {
531 endpoint = endpoint.replace("//", &format!("//{bucket}."))
532 } else {
533 write!(endpoint, "/{bucket}").expect("write into string must succeed");
534 };
535
536 endpoint
537 }
538
539 #[deprecated(
541 since = "0.52.0",
542 note = "Please use `delete_max_size` instead of `batch_max_operations`"
543 )]
544 pub fn batch_max_operations(mut self, batch_max_operations: usize) -> Self {
545 self.config.delete_max_size = Some(batch_max_operations);
546
547 self
548 }
549
550 pub fn delete_max_size(mut self, delete_max_size: usize) -> Self {
552 self.config.delete_max_size = Some(delete_max_size);
553
554 self
555 }
556
557 pub fn checksum_algorithm(mut self, checksum_algorithm: &str) -> Self {
563 self.config.checksum_algorithm = Some(checksum_algorithm.to_string());
564
565 self
566 }
567
568 pub fn disable_write_with_if_match(mut self) -> Self {
570 self.config.disable_write_with_if_match = true;
571 self
572 }
573
574 pub fn enable_write_with_append(mut self) -> Self {
576 self.config.enable_write_with_append = true;
577 self
578 }
579
580 pub async fn detect_region(endpoint: &str, bucket: &str) -> Option<String> {
616 let endpoint = endpoint.trim_end_matches('/');
618
619 let mut endpoint = if endpoint.starts_with("http") {
621 endpoint.to_string()
622 } else {
623 format!("https://{endpoint}")
625 };
626
627 endpoint = endpoint.replace(&format!("//{bucket}."), "//");
629 let url = format!("{endpoint}/{bucket}");
630
631 debug!("detect region with url: {url}");
632
633 if endpoint.ends_with("r2.cloudflarestorage.com") {
639 return Some("auto".to_string());
640 }
641
642 if let Some(v) = endpoint.strip_prefix("https://s3.") {
644 if let Some(region) = v.strip_suffix(".amazonaws.com") {
645 return Some(region.to_string());
646 }
647 }
648
649 if let Some(v) = endpoint.strip_prefix("https://") {
654 if let Some(region) = v.strip_suffix(".aliyuncs.com") {
655 return Some(region.to_string());
656 }
657
658 if let Some(region) = v.strip_suffix("-internal.aliyuncs.com") {
659 return Some(region.to_string());
660 }
661 }
662
663 let req = http::Request::head(&url).body(Buffer::new()).ok()?;
665
666 let client = HttpClient::new().ok()?;
667 let res = client
668 .send(req)
669 .await
670 .map_err(|err| warn!("detect region failed for: {err:?}"))
671 .ok()?;
672
673 debug!(
674 "auto detect region got response: status {:?}, header: {:?}",
675 res.status(),
676 res.headers()
677 );
678
679 if let Some(header) = res.headers().get("x-amz-bucket-region") {
681 if let Ok(regin) = header.to_str() {
682 return Some(regin.to_string());
683 }
684 }
685
686 if res.status() == StatusCode::FORBIDDEN || res.status() == StatusCode::OK {
689 return Some("us-east-1".to_string());
690 }
691
692 None
693 }
694}
695
696impl Builder for S3Builder {
697 type Config = S3Config;
698
699 fn build(mut self) -> Result<impl Access> {
700 debug!("backend build started: {:?}", &self);
701
702 let root = normalize_root(&self.config.root.clone().unwrap_or_default());
703 debug!("backend use root {}", &root);
704
705 let bucket = if self.is_bucket_valid() {
707 Ok(&self.config.bucket)
708 } else {
709 Err(
710 Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
711 .with_context("service", S3_SCHEME),
712 )
713 }?;
714 debug!("backend use bucket {}", &bucket);
715
716 let default_storage_class = match &self.config.default_storage_class {
717 None => None,
718 Some(v) => Some(
719 build_header_value(v).map_err(|err| err.with_context("key", "storage_class"))?,
720 ),
721 };
722
723 let server_side_encryption = match &self.config.server_side_encryption {
724 None => None,
725 Some(v) => Some(
726 build_header_value(v)
727 .map_err(|err| err.with_context("key", "server_side_encryption"))?,
728 ),
729 };
730
731 let server_side_encryption_aws_kms_key_id =
732 match &self.config.server_side_encryption_aws_kms_key_id {
733 None => None,
734 Some(v) => Some(build_header_value(v).map_err(|err| {
735 err.with_context("key", "server_side_encryption_aws_kms_key_id")
736 })?),
737 };
738
739 let server_side_encryption_customer_algorithm =
740 match &self.config.server_side_encryption_customer_algorithm {
741 None => None,
742 Some(v) => Some(build_header_value(v).map_err(|err| {
743 err.with_context("key", "server_side_encryption_customer_algorithm")
744 })?),
745 };
746
747 let server_side_encryption_customer_key =
748 match &self.config.server_side_encryption_customer_key {
749 None => None,
750 Some(v) => Some(build_header_value(v).map_err(|err| {
751 err.with_context("key", "server_side_encryption_customer_key")
752 })?),
753 };
754
755 let server_side_encryption_customer_key_md5 =
756 match &self.config.server_side_encryption_customer_key_md5 {
757 None => None,
758 Some(v) => Some(build_header_value(v).map_err(|err| {
759 err.with_context("key", "server_side_encryption_customer_key_md5")
760 })?),
761 };
762
763 let checksum_algorithm = match self.config.checksum_algorithm.as_deref() {
764 Some("crc32c") => Some(ChecksumAlgorithm::Crc32c),
765 Some("md5") => Some(ChecksumAlgorithm::Md5),
766 None => None,
767 v => {
768 return Err(Error::new(
769 ErrorKind::ConfigInvalid,
770 format!("{v:?} is not a supported checksum_algorithm."),
771 ));
772 }
773 };
774
775 let mut cfg = AwsConfig::default();
777 if !self.config.disable_config_load {
778 #[cfg(not(target_arch = "wasm32"))]
779 {
780 cfg = cfg.from_profile();
781 cfg = cfg.from_env();
782 }
783 }
784
785 if let Some(ref v) = self.config.region {
786 cfg.region = Some(v.to_string());
787 }
788
789 if cfg.region.is_none() {
790 return Err(Error::new(
791 ErrorKind::ConfigInvalid,
792 "region is missing. Please find it by S3::detect_region() or set them in env.",
793 )
794 .with_operation("Builder::build")
795 .with_context("service", S3_SCHEME));
796 }
797
798 let region = cfg.region.to_owned().unwrap();
799 debug!("backend use region: {region}");
800
801 self.config.endpoint = self.config.endpoint.or_else(|| cfg.endpoint_url.clone());
803
804 let endpoint = self.build_endpoint(®ion);
806 debug!("backend use endpoint: {endpoint}");
807
808 if let Some(v) = self.config.access_key_id {
810 cfg.access_key_id = Some(v)
811 }
812 if let Some(v) = self.config.secret_access_key {
813 cfg.secret_access_key = Some(v)
814 }
815 if let Some(v) = self.config.session_token {
816 cfg.session_token = Some(v)
817 }
818
819 let mut loader: Option<Box<dyn AwsCredentialLoad>> = None;
820 if let Some(v) = self.customized_credential_load {
822 loader = Some(v);
823 }
824
825 if let Some(role_arn) = self.config.role_arn {
827 let default_loader =
829 AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg.clone());
830
831 let mut assume_role_cfg = AwsConfig {
833 region: Some(region.clone()),
834 role_arn: Some(role_arn),
835 external_id: self.config.external_id.clone(),
836 sts_regional_endpoints: "regional".to_string(),
837 ..Default::default()
838 };
839
840 if let Some(name) = self.config.role_session_name {
842 assume_role_cfg.role_session_name = name;
843 }
844
845 let assume_role_loader = AwsAssumeRoleLoader::new(
846 GLOBAL_REQWEST_CLIENT.clone().clone(),
847 assume_role_cfg,
848 Box::new(default_loader),
849 )
850 .map_err(|err| {
851 Error::new(
852 ErrorKind::ConfigInvalid,
853 "The assume_role_loader is misconfigured",
854 )
855 .with_context("service", S3_SCHEME)
856 .set_source(err)
857 })?;
858 loader = Some(Box::new(assume_role_loader));
859 }
860 let loader = match loader {
862 Some(v) => v,
863 None => {
864 let mut default_loader =
865 AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg);
866 if self.config.disable_ec2_metadata {
867 default_loader = default_loader.with_disable_ec2_metadata();
868 }
869
870 Box::new(default_loader)
871 }
872 };
873
874 let signer = AwsV4Signer::new("s3", ®ion);
875
876 let delete_max_size = self
877 .config
878 .delete_max_size
879 .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
880
881 Ok(S3Backend {
882 core: Arc::new(S3Core {
883 info: {
884 let am = AccessorInfo::default();
885 am.set_scheme(S3_SCHEME)
886 .set_root(&root)
887 .set_name(bucket)
888 .set_native_capability(Capability {
889 stat: true,
890 stat_with_if_match: true,
891 stat_with_if_none_match: true,
892 stat_with_if_modified_since: true,
893 stat_with_if_unmodified_since: true,
894 stat_with_override_cache_control: !self
895 .config
896 .disable_stat_with_override,
897 stat_with_override_content_disposition: !self
898 .config
899 .disable_stat_with_override,
900 stat_with_override_content_type: !self
901 .config
902 .disable_stat_with_override,
903 stat_with_version: self.config.enable_versioning,
904
905 read: true,
906 read_with_if_match: true,
907 read_with_if_none_match: true,
908 read_with_if_modified_since: true,
909 read_with_if_unmodified_since: true,
910 read_with_override_cache_control: true,
911 read_with_override_content_disposition: true,
912 read_with_override_content_type: true,
913 read_with_version: self.config.enable_versioning,
914
915 write: true,
916 write_can_empty: true,
917 write_can_multi: true,
918 write_can_append: self.config.enable_write_with_append,
919
920 write_with_cache_control: true,
921 write_with_content_type: true,
922 write_with_content_encoding: true,
923 write_with_if_match: !self.config.disable_write_with_if_match,
924 write_with_if_not_exists: true,
925 write_with_user_metadata: true,
926
927 write_multi_min_size: Some(5 * 1024 * 1024),
931 write_multi_max_size: if cfg!(target_pointer_width = "64") {
935 Some(5 * 1024 * 1024 * 1024)
936 } else {
937 Some(usize::MAX)
938 },
939
940 delete: true,
941 delete_max_size: Some(delete_max_size),
942 delete_with_version: self.config.enable_versioning,
943
944 copy: true,
945
946 list: true,
947 list_with_limit: true,
948 list_with_start_after: true,
949 list_with_recursive: true,
950 list_with_versions: self.config.enable_versioning,
951 list_with_deleted: self.config.enable_versioning,
952
953 presign: true,
954 presign_stat: true,
955 presign_read: true,
956 presign_write: true,
957
958 shared: true,
959
960 ..Default::default()
961 });
962
963 #[allow(deprecated)]
965 if let Some(client) = self.http_client {
966 am.update_http_client(|_| client);
967 }
968
969 am.into()
970 },
971 bucket: bucket.to_string(),
972 endpoint,
973 root,
974 server_side_encryption,
975 server_side_encryption_aws_kms_key_id,
976 server_side_encryption_customer_algorithm,
977 server_side_encryption_customer_key,
978 server_side_encryption_customer_key_md5,
979 default_storage_class,
980 allow_anonymous: self.config.allow_anonymous,
981 disable_list_objects_v2: self.config.disable_list_objects_v2,
982 enable_request_payer: self.config.enable_request_payer,
983 signer,
984 loader,
985 credential_loaded: AtomicBool::new(false),
986 checksum_algorithm,
987 }),
988 })
989 }
990}
991
992#[derive(Debug, Clone)]
994pub struct S3Backend {
995 core: Arc<S3Core>,
996}
997
998impl Access for S3Backend {
999 type Reader = HttpBody;
1000 type Writer = S3Writers;
1001 type Lister = S3Listers;
1002 type Deleter = oio::BatchDeleter<S3Deleter>;
1003
1004 fn info(&self) -> Arc<AccessorInfo> {
1005 self.core.info.clone()
1006 }
1007
1008 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
1009 let resp = self.core.s3_head_object(path, args).await?;
1010
1011 let status = resp.status();
1012
1013 match status {
1014 StatusCode::OK => {
1015 let headers = resp.headers();
1016 let mut meta = parse_into_metadata(path, headers)?;
1017
1018 let user_meta = parse_prefixed_headers(headers, X_AMZ_META_PREFIX);
1019 if !user_meta.is_empty() {
1020 meta = meta.with_user_metadata(user_meta);
1021 }
1022
1023 if let Some(v) = parse_header_to_str(headers, X_AMZ_VERSION_ID)? {
1024 meta.set_version(v);
1025 }
1026
1027 Ok(RpStat::new(meta))
1028 }
1029 _ => Err(parse_error(resp)),
1030 }
1031 }
1032
1033 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
1034 let resp = self.core.s3_get_object(path, args.range(), &args).await?;
1035
1036 let status = resp.status();
1037 match status {
1038 StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
1039 Ok((RpRead::default(), resp.into_body()))
1040 }
1041 _ => {
1042 let (part, mut body) = resp.into_parts();
1043 let buf = body.to_buffer().await?;
1044 Err(parse_error(Response::from_parts(part, buf)))
1045 }
1046 }
1047 }
1048
1049 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
1050 let writer = S3Writer::new(self.core.clone(), path, args.clone());
1051
1052 let w = if args.append() {
1053 S3Writers::Two(oio::AppendWriter::new(writer))
1054 } else {
1055 S3Writers::One(oio::MultipartWriter::new(
1056 self.core.info.clone(),
1057 writer,
1058 args.concurrent(),
1059 ))
1060 };
1061
1062 Ok((RpWrite::default(), w))
1063 }
1064
1065 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
1066 Ok((
1067 RpDelete::default(),
1068 oio::BatchDeleter::new(S3Deleter::new(self.core.clone())),
1069 ))
1070 }
1071
1072 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
1073 let l = if args.versions() || args.deleted() {
1074 ThreeWays::Three(oio::PageLister::new(S3ObjectVersionsLister::new(
1075 self.core.clone(),
1076 path,
1077 args,
1078 )))
1079 } else if self.core.disable_list_objects_v2 {
1080 ThreeWays::One(oio::PageLister::new(S3ListerV1::new(
1081 self.core.clone(),
1082 path,
1083 args,
1084 )))
1085 } else {
1086 ThreeWays::Two(oio::PageLister::new(S3ListerV2::new(
1087 self.core.clone(),
1088 path,
1089 args,
1090 )))
1091 };
1092
1093 Ok((RpList::default(), l))
1094 }
1095
1096 async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
1097 let resp = self.core.s3_copy_object(from, to).await?;
1098
1099 let status = resp.status();
1100
1101 match status {
1102 StatusCode::OK => Ok(RpCopy::default()),
1103 _ => Err(parse_error(resp)),
1104 }
1105 }
1106
1107 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
1108 let (expire, op) = args.into_parts();
1109 let req = match op {
1111 PresignOperation::Stat(v) => self.core.s3_head_object_request(path, v),
1112 PresignOperation::Read(v) => {
1113 self.core
1114 .s3_get_object_request(path, BytesRange::default(), &v)
1115 }
1116 PresignOperation::Write(_) => {
1117 self.core
1118 .s3_put_object_request(path, None, &OpWrite::default(), Buffer::new())
1119 }
1120 PresignOperation::Delete(_) => Err(Error::new(
1121 ErrorKind::Unsupported,
1122 "operation is not supported",
1123 )),
1124 };
1125 let mut req = req?;
1126
1127 self.core.sign_query(&mut req, expire).await?;
1128
1129 let (parts, _) = req.into_parts();
1131
1132 Ok(RpPresign::new(PresignedRequest::new(
1133 parts.method,
1134 parts.uri,
1135 parts.headers,
1136 )))
1137 }
1138}