opendal/services/s3/
backend.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Debug;
20use std::fmt::Write;
21use std::str::FromStr;
22use std::sync::Arc;
23use std::sync::LazyLock;
24use std::sync::atomic::AtomicBool;
25
26use base64::Engine;
27use base64::prelude::BASE64_STANDARD;
28use constants::X_AMZ_META_PREFIX;
29use constants::X_AMZ_VERSION_ID;
30use http::Response;
31use http::StatusCode;
32use log::debug;
33use log::warn;
34use md5::Digest;
35use md5::Md5;
36use reqsign::AwsAssumeRoleLoader;
37use reqsign::AwsConfig;
38use reqsign::AwsCredentialLoad;
39use reqsign::AwsDefaultLoader;
40use reqsign::AwsV4Signer;
41use reqwest::Url;
42
43use super::S3_SCHEME;
44use super::config::S3Config;
45use super::core::*;
46use super::deleter::S3Deleter;
47use super::error::parse_error;
48use super::lister::S3ListerV1;
49use super::lister::S3ListerV2;
50use super::lister::S3Listers;
51use super::lister::S3ObjectVersionsLister;
52use super::writer::S3Writer;
53use super::writer::S3Writers;
54use crate::raw::*;
55use crate::*;
56
57/// Allow constructing correct region endpoint if user gives a global endpoint.
58static ENDPOINT_TEMPLATES: LazyLock<HashMap<&'static str, &'static str>> = LazyLock::new(|| {
59    let mut m = HashMap::new();
60    // AWS S3 Service.
61    m.insert(
62        "https://s3.amazonaws.com",
63        "https://s3.{region}.amazonaws.com",
64    );
65    m
66});
67
68const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
69
70/// Aws S3 and compatible services (including minio, digitalocean space, Tencent Cloud Object Storage(COS) and so on) support.
71/// For more information about s3-compatible services, refer to [Compatible Services](#compatible-services).
72#[doc = include_str!("docs.md")]
73#[doc = include_str!("compatible_services.md")]
74#[derive(Default)]
75pub struct S3Builder {
76    pub(super) config: S3Config,
77
78    pub(super) customized_credential_load: Option<Box<dyn AwsCredentialLoad>>,
79
80    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
81    pub(super) http_client: Option<HttpClient>,
82}
83
84impl Debug for S3Builder {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        f.debug_struct("S3Builder")
87            .field("config", &self.config)
88            .finish_non_exhaustive()
89    }
90}
91
92impl S3Builder {
93    /// Set root of this backend.
94    ///
95    /// All operations will happen under this root.
96    pub fn root(mut self, root: &str) -> Self {
97        self.config.root = if root.is_empty() {
98            None
99        } else {
100            Some(root.to_string())
101        };
102
103        self
104    }
105
106    /// Set bucket name of this backend.
107    pub fn bucket(mut self, bucket: &str) -> Self {
108        self.config.bucket = bucket.to_string();
109
110        self
111    }
112
113    /// Set endpoint of this backend.
114    ///
115    /// Endpoint must be full uri, e.g.
116    ///
117    /// - AWS S3: `https://s3.amazonaws.com` or `https://s3.{region}.amazonaws.com`
118    /// - Cloudflare R2: `https://<ACCOUNT_ID>.r2.cloudflarestorage.com`
119    /// - Aliyun OSS: `https://{region}.aliyuncs.com`
120    /// - Tencent COS: `https://cos.{region}.myqcloud.com`
121    /// - Minio: `http://127.0.0.1:9000`
122    ///
123    /// If user inputs endpoint without scheme like "s3.amazonaws.com", we
124    /// will prepend "https://" before it.
125    pub fn endpoint(mut self, endpoint: &str) -> Self {
126        if !endpoint.is_empty() {
127            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
128            self.config.endpoint = Some(endpoint.trim_end_matches('/').to_string())
129        }
130
131        self
132    }
133
134    /// Region represent the signing region of this endpoint. This is required
135    /// if you are using the default AWS S3 endpoint.
136    ///
137    /// If using a custom endpoint,
138    /// - If region is set, we will take user's input first.
139    /// - If not, we will try to load it from environment.
140    pub fn region(mut self, region: &str) -> Self {
141        if !region.is_empty() {
142            self.config.region = Some(region.to_string())
143        }
144
145        self
146    }
147
148    /// Set access_key_id of this backend.
149    ///
150    /// - If access_key_id is set, we will take user's input first.
151    /// - If not, we will try to load it from environment.
152    pub fn access_key_id(mut self, v: &str) -> Self {
153        if !v.is_empty() {
154            self.config.access_key_id = Some(v.to_string())
155        }
156
157        self
158    }
159
160    /// Set secret_access_key of this backend.
161    ///
162    /// - If secret_access_key is set, we will take user's input first.
163    /// - If not, we will try to load it from environment.
164    pub fn secret_access_key(mut self, v: &str) -> Self {
165        if !v.is_empty() {
166            self.config.secret_access_key = Some(v.to_string())
167        }
168
169        self
170    }
171
172    /// Set role_arn for this backend.
173    ///
174    /// If `role_arn` is set, we will use already known config as source
175    /// credential to assume role with `role_arn`.
176    pub fn role_arn(mut self, v: &str) -> Self {
177        if !v.is_empty() {
178            self.config.role_arn = Some(v.to_string())
179        }
180
181        self
182    }
183
184    /// Set external_id for this backend.
185    pub fn external_id(mut self, v: &str) -> Self {
186        if !v.is_empty() {
187            self.config.external_id = Some(v.to_string())
188        }
189
190        self
191    }
192
193    /// Set role_session_name for this backend.
194    pub fn role_session_name(mut self, v: &str) -> Self {
195        if !v.is_empty() {
196            self.config.role_session_name = Some(v.to_string())
197        }
198
199        self
200    }
201
202    /// Set default storage_class for this backend.
203    ///
204    /// Available values:
205    /// - `DEEP_ARCHIVE`
206    /// - `GLACIER`
207    /// - `GLACIER_IR`
208    /// - `INTELLIGENT_TIERING`
209    /// - `ONEZONE_IA`
210    /// - `OUTPOSTS`
211    /// - `REDUCED_REDUNDANCY`
212    /// - `STANDARD`
213    /// - `STANDARD_IA`
214    pub fn default_storage_class(mut self, v: &str) -> Self {
215        if !v.is_empty() {
216            self.config.default_storage_class = Some(v.to_string())
217        }
218
219        self
220    }
221
222    /// Set server_side_encryption for this backend.
223    ///
224    /// Available values: `AES256`, `aws:kms`.
225    ///
226    /// # Note
227    ///
228    /// This function is the low-level setting for SSE related features.
229    ///
230    /// SSE related options should be set carefully to make them works.
231    /// Please use `server_side_encryption_with_*` helpers if even possible.
232    pub fn server_side_encryption(mut self, v: &str) -> Self {
233        if !v.is_empty() {
234            self.config.server_side_encryption = Some(v.to_string())
235        }
236
237        self
238    }
239
240    /// Set server_side_encryption_aws_kms_key_id for this backend
241    ///
242    /// - If `server_side_encryption` set to `aws:kms`, and `server_side_encryption_aws_kms_key_id`
243    ///   is not set, S3 will use aws managed kms key to encrypt data.
244    /// - If `server_side_encryption` set to `aws:kms`, and `server_side_encryption_aws_kms_key_id`
245    ///   is a valid kms key id, S3 will use the provided kms key to encrypt data.
246    /// - If the `server_side_encryption_aws_kms_key_id` is invalid or not found, an error will be
247    ///   returned.
248    /// - If `server_side_encryption` is not `aws:kms`, setting `server_side_encryption_aws_kms_key_id` is a noop.
249    ///
250    /// # Note
251    ///
252    /// This function is the low-level setting for SSE related features.
253    ///
254    /// SSE related options should be set carefully to make them works.
255    /// Please use `server_side_encryption_with_*` helpers if even possible.
256    pub fn server_side_encryption_aws_kms_key_id(mut self, v: &str) -> Self {
257        if !v.is_empty() {
258            self.config.server_side_encryption_aws_kms_key_id = Some(v.to_string())
259        }
260
261        self
262    }
263
264    /// Set server_side_encryption_customer_algorithm for this backend.
265    ///
266    /// Available values: `AES256`.
267    ///
268    /// # Note
269    ///
270    /// This function is the low-level setting for SSE related features.
271    ///
272    /// SSE related options should be set carefully to make them works.
273    /// Please use `server_side_encryption_with_*` helpers if even possible.
274    pub fn server_side_encryption_customer_algorithm(mut self, v: &str) -> Self {
275        if !v.is_empty() {
276            self.config.server_side_encryption_customer_algorithm = Some(v.to_string())
277        }
278
279        self
280    }
281
282    /// Set server_side_encryption_customer_key for this backend.
283    ///
284    /// # Args
285    ///
286    /// `v`: base64 encoded key that matches algorithm specified in
287    /// `server_side_encryption_customer_algorithm`.
288    ///
289    /// # Note
290    ///
291    /// This function is the low-level setting for SSE related features.
292    ///
293    /// SSE related options should be set carefully to make them works.
294    /// Please use `server_side_encryption_with_*` helpers if even possible.
295    pub fn server_side_encryption_customer_key(mut self, v: &str) -> Self {
296        if !v.is_empty() {
297            self.config.server_side_encryption_customer_key = Some(v.to_string())
298        }
299
300        self
301    }
302
303    /// Set server_side_encryption_customer_key_md5 for this backend.
304    ///
305    /// # Args
306    ///
307    /// `v`: MD5 digest of key specified in `server_side_encryption_customer_key`.
308    ///
309    /// # Note
310    ///
311    /// This function is the low-level setting for SSE related features.
312    ///
313    /// SSE related options should be set carefully to make them works.
314    /// Please use `server_side_encryption_with_*` helpers if even possible.
315    pub fn server_side_encryption_customer_key_md5(mut self, v: &str) -> Self {
316        if !v.is_empty() {
317            self.config.server_side_encryption_customer_key_md5 = Some(v.to_string())
318        }
319
320        self
321    }
322
323    /// Enable server side encryption with aws managed kms key
324    ///
325    /// As known as: SSE-KMS
326    ///
327    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
328    pub fn server_side_encryption_with_aws_managed_kms_key(mut self) -> Self {
329        self.config.server_side_encryption = Some("aws:kms".to_string());
330        self
331    }
332
333    /// Enable server side encryption with customer managed kms key
334    ///
335    /// As known as: SSE-KMS
336    ///
337    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
338    pub fn server_side_encryption_with_customer_managed_kms_key(
339        mut self,
340        aws_kms_key_id: &str,
341    ) -> Self {
342        self.config.server_side_encryption = Some("aws:kms".to_string());
343        self.config.server_side_encryption_aws_kms_key_id = Some(aws_kms_key_id.to_string());
344        self
345    }
346
347    /// Enable server side encryption with s3 managed key
348    ///
349    /// As known as: SSE-S3
350    ///
351    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
352    pub fn server_side_encryption_with_s3_key(mut self) -> Self {
353        self.config.server_side_encryption = Some("AES256".to_string());
354        self
355    }
356
357    /// Enable server side encryption with customer key.
358    ///
359    /// As known as: SSE-C
360    ///
361    /// NOTE: This function should not be used along with other `server_side_encryption_with_` functions.
362    pub fn server_side_encryption_with_customer_key(mut self, algorithm: &str, key: &[u8]) -> Self {
363        self.config.server_side_encryption_customer_algorithm = Some(algorithm.to_string());
364        self.config.server_side_encryption_customer_key = Some(BASE64_STANDARD.encode(key));
365        let key_md5 = Md5::digest(key);
366        self.config.server_side_encryption_customer_key_md5 = Some(BASE64_STANDARD.encode(key_md5));
367        self
368    }
369
370    /// Set temporary credential used in AWS S3 connections
371    ///
372    /// # Warning
373    ///
374    /// session token's lifetime is short and requires users to refresh in time.
375    pub fn session_token(mut self, token: &str) -> Self {
376        if !token.is_empty() {
377            self.config.session_token = Some(token.to_string());
378        }
379        self
380    }
381
382    /// Disable config load so that opendal will not load config from
383    /// environment.
384    ///
385    /// For examples:
386    ///
387    /// - envs like `AWS_ACCESS_KEY_ID`
388    /// - files like `~/.aws/config`
389    pub fn disable_config_load(mut self) -> Self {
390        self.config.disable_config_load = true;
391        self
392    }
393
394    /// Disable list objects v2 so that opendal will not use the older
395    /// List Objects V1 to list objects.
396    ///
397    /// By default, OpenDAL uses List Objects V2 to list objects. However,
398    /// some legacy services do not yet support V2.
399    pub fn disable_list_objects_v2(mut self) -> Self {
400        self.config.disable_list_objects_v2 = true;
401        self
402    }
403
404    /// Enable request payer so that OpenDAL will send requests with `x-amz-request-payer` header.
405    ///
406    /// With this option the client accepts to pay for the request and data transfer costs.
407    pub fn enable_request_payer(mut self) -> Self {
408        self.config.enable_request_payer = true;
409        self
410    }
411
412    /// Disable load credential from ec2 metadata.
413    ///
414    /// This option is used to disable the default behavior of opendal
415    /// to load credential from ec2 metadata, a.k.a, IMDSv2
416    pub fn disable_ec2_metadata(mut self) -> Self {
417        self.config.disable_ec2_metadata = true;
418        self
419    }
420
421    /// Allow anonymous will allow opendal to send request without signing
422    /// when credential is not loaded.
423    pub fn allow_anonymous(mut self) -> Self {
424        self.config.allow_anonymous = true;
425        self
426    }
427
428    /// Enable virtual host style so that opendal will send API requests
429    /// in virtual host style instead of path style.
430    ///
431    /// - By default, opendal will send API to `https://s3.us-east-1.amazonaws.com/bucket_name`
432    /// - Enabled, opendal will send API to `https://bucket_name.s3.us-east-1.amazonaws.com`
433    pub fn enable_virtual_host_style(mut self) -> Self {
434        self.config.enable_virtual_host_style = true;
435        self
436    }
437
438    /// Disable stat with override so that opendal will not send stat request with override queries.
439    ///
440    /// For example, R2 doesn't support stat with `response_content_type` query.
441    pub fn disable_stat_with_override(mut self) -> Self {
442        self.config.disable_stat_with_override = true;
443        self
444    }
445
446    /// Adding a customized credential load for service.
447    ///
448    /// If customized_credential_load has been set, we will ignore all other
449    /// credential load methods.
450    pub fn customized_credential_load(mut self, cred: Box<dyn AwsCredentialLoad>) -> Self {
451        self.customized_credential_load = Some(cred);
452        self
453    }
454
455    /// Specify the http client that used by this service.
456    ///
457    /// # Notes
458    ///
459    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
460    /// during minor updates.
461    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` instead")]
462    #[allow(deprecated)]
463    pub fn http_client(mut self, client: HttpClient) -> Self {
464        self.http_client = Some(client);
465        self
466    }
467
468    /// Set bucket versioning status for this backend
469    pub fn enable_versioning(mut self, enabled: bool) -> Self {
470        self.config.enable_versioning = enabled;
471
472        self
473    }
474
475    /// Check if `bucket` is valid
476    /// `bucket` must be not empty and if `enable_virtual_host_style` is true
477    /// it couldn't contain dot(.) character
478    fn is_bucket_valid(&self) -> bool {
479        if self.config.bucket.is_empty() {
480            return false;
481        }
482        // If enable virtual host style, `bucket` will reside in domain part,
483        // for example `https://bucket_name.s3.us-east-1.amazonaws.com`,
484        // so `bucket` with dot can't be recognized correctly for this format.
485        if self.config.enable_virtual_host_style && self.config.bucket.contains('.') {
486            return false;
487        }
488        true
489    }
490
491    /// Build endpoint with given region.
492    fn build_endpoint(&self, region: &str) -> String {
493        let bucket = {
494            debug_assert!(self.is_bucket_valid(), "bucket must be valid");
495
496            self.config.bucket.as_str()
497        };
498
499        let mut endpoint = match &self.config.endpoint {
500            Some(endpoint) => {
501                if endpoint.starts_with("http") {
502                    endpoint.to_string()
503                } else {
504                    // Prefix https if endpoint doesn't start with scheme.
505                    format!("https://{endpoint}")
506                }
507            }
508            None => "https://s3.amazonaws.com".to_string(),
509        };
510
511        // If endpoint contains bucket name, we should trim them.
512        endpoint = endpoint.replace(&format!("//{bucket}."), "//");
513
514        // Omit default ports if specified.
515        if let Ok(url) = Url::from_str(&endpoint) {
516            // Remove the trailing `/` of root path.
517            endpoint = url.to_string().trim_end_matches('/').to_string();
518        }
519
520        // Update with endpoint templates.
521        endpoint = if let Some(template) = ENDPOINT_TEMPLATES.get(endpoint.as_str()) {
522            template.replace("{region}", region)
523        } else {
524            // If we don't know where about this endpoint, just leave
525            // them as it.
526            endpoint.to_string()
527        };
528
529        // Apply virtual host style.
530        if self.config.enable_virtual_host_style {
531            endpoint = endpoint.replace("//", &format!("//{bucket}."))
532        } else {
533            write!(endpoint, "/{bucket}").expect("write into string must succeed");
534        };
535
536        endpoint
537    }
538
539    /// Set maximum batch operations of this backend.
540    #[deprecated(
541        since = "0.52.0",
542        note = "Please use `delete_max_size` instead of `batch_max_operations`"
543    )]
544    pub fn batch_max_operations(mut self, batch_max_operations: usize) -> Self {
545        self.config.delete_max_size = Some(batch_max_operations);
546
547        self
548    }
549
550    /// Set maximum delete operations of this backend.
551    pub fn delete_max_size(mut self, delete_max_size: usize) -> Self {
552        self.config.delete_max_size = Some(delete_max_size);
553
554        self
555    }
556
557    /// Set checksum algorithm of this backend.
558    /// This is necessary when writing to AWS S3 Buckets with Object Lock enabled for example.
559    ///
560    /// Available options:
561    /// - "crc32c"
562    pub fn checksum_algorithm(mut self, checksum_algorithm: &str) -> Self {
563        self.config.checksum_algorithm = Some(checksum_algorithm.to_string());
564
565        self
566    }
567
568    /// Disable write with if match so that opendal will not send write request with if match headers.
569    pub fn disable_write_with_if_match(mut self) -> Self {
570        self.config.disable_write_with_if_match = true;
571        self
572    }
573
574    /// Enable write with append so that opendal will send write request with append headers.
575    pub fn enable_write_with_append(mut self) -> Self {
576        self.config.enable_write_with_append = true;
577        self
578    }
579
580    /// Detect region of S3 bucket.
581    ///
582    /// # Args
583    ///
584    /// - endpoint: the endpoint of S3 service
585    /// - bucket: the bucket of S3 service
586    ///
587    /// # Return
588    ///
589    /// - `Some(region)` means we detect the region successfully
590    /// - `None` means we can't detect the region or meeting errors.
591    ///
592    /// # Notes
593    ///
594    /// We will try to detect region by the following methods.
595    ///
596    /// - Match endpoint with given rules to get region
597    ///   - Cloudflare R2
598    ///   - AWS S3
599    ///   - Aliyun OSS
600    /// - Send a `HEAD` request to endpoint with bucket name to get `x-amz-bucket-region`.
601    ///
602    /// # Examples
603    ///
604    /// ```no_run
605    /// use opendal::services::S3;
606    ///
607    /// # async fn example() {
608    /// let region: Option<String> = S3::detect_region("https://s3.amazonaws.com", "example").await;
609    /// # }
610    /// ```
611    ///
612    /// # Reference
613    ///
614    /// - [Amazon S3 HeadBucket API](https://docs.aws.amazon.com/zh_cn/AmazonS3/latest/API/API_HeadBucket.html)
615    pub async fn detect_region(endpoint: &str, bucket: &str) -> Option<String> {
616        // Remove the possible trailing `/` in endpoint.
617        let endpoint = endpoint.trim_end_matches('/');
618
619        // Make sure the endpoint contains the scheme.
620        let mut endpoint = if endpoint.starts_with("http") {
621            endpoint.to_string()
622        } else {
623            // Prefix https if endpoint doesn't start with scheme.
624            format!("https://{endpoint}")
625        };
626
627        // Remove bucket name from endpoint.
628        endpoint = endpoint.replace(&format!("//{bucket}."), "//");
629        let url = format!("{endpoint}/{bucket}");
630
631        debug!("detect region with url: {url}");
632
633        // Try to detect region by endpoint.
634
635        // If this bucket is R2, we can return auto directly.
636        //
637        // Reference: <https://developers.cloudflare.com/r2/api/s3/api/>
638        if endpoint.ends_with("r2.cloudflarestorage.com") {
639            return Some("auto".to_string());
640        }
641
642        // If this bucket is AWS, we can try to match the endpoint.
643        if let Some(v) = endpoint.strip_prefix("https://s3.") {
644            if let Some(region) = v.strip_suffix(".amazonaws.com") {
645                return Some(region.to_string());
646            }
647        }
648
649        // If this bucket is OSS, we can try to match the endpoint.
650        //
651        // - `oss-ap-southeast-1.aliyuncs.com` => `oss-ap-southeast-1`
652        // - `oss-cn-hangzhou-internal.aliyuncs.com` => `oss-cn-hangzhou`
653        if let Some(v) = endpoint.strip_prefix("https://") {
654            if let Some(region) = v.strip_suffix(".aliyuncs.com") {
655                return Some(region.to_string());
656            }
657
658            if let Some(region) = v.strip_suffix("-internal.aliyuncs.com") {
659                return Some(region.to_string());
660            }
661        }
662
663        // Try to detect region by HeadBucket.
664        let req = http::Request::head(&url).body(Buffer::new()).ok()?;
665
666        let client = HttpClient::new().ok()?;
667        let res = client
668            .send(req)
669            .await
670            .map_err(|err| warn!("detect region failed for: {err:?}"))
671            .ok()?;
672
673        debug!(
674            "auto detect region got response: status {:?}, header: {:?}",
675            res.status(),
676            res.headers()
677        );
678
679        // Get region from response header no matter status code.
680        if let Some(header) = res.headers().get("x-amz-bucket-region") {
681            if let Ok(regin) = header.to_str() {
682                return Some(regin.to_string());
683            }
684        }
685
686        // Status code is 403 or 200 means we already visit the correct
687        // region, we can use the default region directly.
688        if res.status() == StatusCode::FORBIDDEN || res.status() == StatusCode::OK {
689            return Some("us-east-1".to_string());
690        }
691
692        None
693    }
694}
695
696impl Builder for S3Builder {
697    type Config = S3Config;
698
699    fn build(mut self) -> Result<impl Access> {
700        debug!("backend build started: {:?}", &self);
701
702        let root = normalize_root(&self.config.root.clone().unwrap_or_default());
703        debug!("backend use root {}", &root);
704
705        // Handle bucket name.
706        let bucket = if self.is_bucket_valid() {
707            Ok(&self.config.bucket)
708        } else {
709            Err(
710                Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
711                    .with_context("service", S3_SCHEME),
712            )
713        }?;
714        debug!("backend use bucket {}", &bucket);
715
716        let default_storage_class = match &self.config.default_storage_class {
717            None => None,
718            Some(v) => Some(
719                build_header_value(v).map_err(|err| err.with_context("key", "storage_class"))?,
720            ),
721        };
722
723        let server_side_encryption = match &self.config.server_side_encryption {
724            None => None,
725            Some(v) => Some(
726                build_header_value(v)
727                    .map_err(|err| err.with_context("key", "server_side_encryption"))?,
728            ),
729        };
730
731        let server_side_encryption_aws_kms_key_id =
732            match &self.config.server_side_encryption_aws_kms_key_id {
733                None => None,
734                Some(v) => Some(build_header_value(v).map_err(|err| {
735                    err.with_context("key", "server_side_encryption_aws_kms_key_id")
736                })?),
737            };
738
739        let server_side_encryption_customer_algorithm =
740            match &self.config.server_side_encryption_customer_algorithm {
741                None => None,
742                Some(v) => Some(build_header_value(v).map_err(|err| {
743                    err.with_context("key", "server_side_encryption_customer_algorithm")
744                })?),
745            };
746
747        let server_side_encryption_customer_key =
748            match &self.config.server_side_encryption_customer_key {
749                None => None,
750                Some(v) => Some(build_header_value(v).map_err(|err| {
751                    err.with_context("key", "server_side_encryption_customer_key")
752                })?),
753            };
754
755        let server_side_encryption_customer_key_md5 =
756            match &self.config.server_side_encryption_customer_key_md5 {
757                None => None,
758                Some(v) => Some(build_header_value(v).map_err(|err| {
759                    err.with_context("key", "server_side_encryption_customer_key_md5")
760                })?),
761            };
762
763        let checksum_algorithm = match self.config.checksum_algorithm.as_deref() {
764            Some("crc32c") => Some(ChecksumAlgorithm::Crc32c),
765            Some("md5") => Some(ChecksumAlgorithm::Md5),
766            None => None,
767            v => {
768                return Err(Error::new(
769                    ErrorKind::ConfigInvalid,
770                    format!("{v:?} is not a supported checksum_algorithm."),
771                ));
772            }
773        };
774
775        // This is our current config.
776        let mut cfg = AwsConfig::default();
777        if !self.config.disable_config_load {
778            #[cfg(not(target_arch = "wasm32"))]
779            {
780                cfg = cfg.from_profile();
781                cfg = cfg.from_env();
782            }
783        }
784
785        if let Some(ref v) = self.config.region {
786            cfg.region = Some(v.to_string());
787        }
788
789        if cfg.region.is_none() {
790            return Err(Error::new(
791                ErrorKind::ConfigInvalid,
792                "region is missing. Please find it by S3::detect_region() or set them in env.",
793            )
794            .with_operation("Builder::build")
795            .with_context("service", S3_SCHEME));
796        }
797
798        let region = cfg.region.to_owned().unwrap();
799        debug!("backend use region: {region}");
800
801        // Retain the user's endpoint if it exists; otherwise, try loading it from the environment.
802        self.config.endpoint = self.config.endpoint.or_else(|| cfg.endpoint_url.clone());
803
804        // Building endpoint.
805        let endpoint = self.build_endpoint(&region);
806        debug!("backend use endpoint: {endpoint}");
807
808        // Setting all value from user input if available.
809        if let Some(v) = self.config.access_key_id {
810            cfg.access_key_id = Some(v)
811        }
812        if let Some(v) = self.config.secret_access_key {
813            cfg.secret_access_key = Some(v)
814        }
815        if let Some(v) = self.config.session_token {
816            cfg.session_token = Some(v)
817        }
818
819        let mut loader: Option<Box<dyn AwsCredentialLoad>> = None;
820        // If customized_credential_load is set, we will use it.
821        if let Some(v) = self.customized_credential_load {
822            loader = Some(v);
823        }
824
825        // If role_arn is set, we must use AssumeRoleLoad.
826        if let Some(role_arn) = self.config.role_arn {
827            // use current env as source credential loader.
828            let default_loader =
829                AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg.clone());
830
831            // Build the config for assume role.
832            let mut assume_role_cfg = AwsConfig {
833                region: Some(region.clone()),
834                role_arn: Some(role_arn),
835                external_id: self.config.external_id.clone(),
836                sts_regional_endpoints: "regional".to_string(),
837                ..Default::default()
838            };
839
840            // override default role_session_name if set
841            if let Some(name) = self.config.role_session_name {
842                assume_role_cfg.role_session_name = name;
843            }
844
845            let assume_role_loader = AwsAssumeRoleLoader::new(
846                GLOBAL_REQWEST_CLIENT.clone().clone(),
847                assume_role_cfg,
848                Box::new(default_loader),
849            )
850            .map_err(|err| {
851                Error::new(
852                    ErrorKind::ConfigInvalid,
853                    "The assume_role_loader is misconfigured",
854                )
855                .with_context("service", S3_SCHEME)
856                .set_source(err)
857            })?;
858            loader = Some(Box::new(assume_role_loader));
859        }
860        // If loader is not set, we will use default loader.
861        let loader = match loader {
862            Some(v) => v,
863            None => {
864                let mut default_loader =
865                    AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg);
866                if self.config.disable_ec2_metadata {
867                    default_loader = default_loader.with_disable_ec2_metadata();
868                }
869
870                Box::new(default_loader)
871            }
872        };
873
874        let signer = AwsV4Signer::new("s3", &region);
875
876        let delete_max_size = self
877            .config
878            .delete_max_size
879            .unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
880
881        Ok(S3Backend {
882            core: Arc::new(S3Core {
883                info: {
884                    let am = AccessorInfo::default();
885                    am.set_scheme(S3_SCHEME)
886                        .set_root(&root)
887                        .set_name(bucket)
888                        .set_native_capability(Capability {
889                            stat: true,
890                            stat_with_if_match: true,
891                            stat_with_if_none_match: true,
892                            stat_with_if_modified_since: true,
893                            stat_with_if_unmodified_since: true,
894                            stat_with_override_cache_control: !self
895                                .config
896                                .disable_stat_with_override,
897                            stat_with_override_content_disposition: !self
898                                .config
899                                .disable_stat_with_override,
900                            stat_with_override_content_type: !self
901                                .config
902                                .disable_stat_with_override,
903                            stat_with_version: self.config.enable_versioning,
904
905                            read: true,
906                            read_with_if_match: true,
907                            read_with_if_none_match: true,
908                            read_with_if_modified_since: true,
909                            read_with_if_unmodified_since: true,
910                            read_with_override_cache_control: true,
911                            read_with_override_content_disposition: true,
912                            read_with_override_content_type: true,
913                            read_with_version: self.config.enable_versioning,
914
915                            write: true,
916                            write_can_empty: true,
917                            write_can_multi: true,
918                            write_can_append: self.config.enable_write_with_append,
919
920                            write_with_cache_control: true,
921                            write_with_content_type: true,
922                            write_with_content_encoding: true,
923                            write_with_if_match: !self.config.disable_write_with_if_match,
924                            write_with_if_not_exists: true,
925                            write_with_user_metadata: true,
926
927                            // The min multipart size of S3 is 5 MiB.
928                            //
929                            // ref: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
930                            write_multi_min_size: Some(5 * 1024 * 1024),
931                            // The max multipart size of S3 is 5 GiB.
932                            //
933                            // ref: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
934                            write_multi_max_size: if cfg!(target_pointer_width = "64") {
935                                Some(5 * 1024 * 1024 * 1024)
936                            } else {
937                                Some(usize::MAX)
938                            },
939
940                            delete: true,
941                            delete_max_size: Some(delete_max_size),
942                            delete_with_version: self.config.enable_versioning,
943
944                            copy: true,
945
946                            list: true,
947                            list_with_limit: true,
948                            list_with_start_after: true,
949                            list_with_recursive: true,
950                            list_with_versions: self.config.enable_versioning,
951                            list_with_deleted: self.config.enable_versioning,
952
953                            presign: true,
954                            presign_stat: true,
955                            presign_read: true,
956                            presign_write: true,
957
958                            shared: true,
959
960                            ..Default::default()
961                        });
962
963                    // allow deprecated api here for compatibility
964                    #[allow(deprecated)]
965                    if let Some(client) = self.http_client {
966                        am.update_http_client(|_| client);
967                    }
968
969                    am.into()
970                },
971                bucket: bucket.to_string(),
972                endpoint,
973                root,
974                server_side_encryption,
975                server_side_encryption_aws_kms_key_id,
976                server_side_encryption_customer_algorithm,
977                server_side_encryption_customer_key,
978                server_side_encryption_customer_key_md5,
979                default_storage_class,
980                allow_anonymous: self.config.allow_anonymous,
981                disable_list_objects_v2: self.config.disable_list_objects_v2,
982                enable_request_payer: self.config.enable_request_payer,
983                signer,
984                loader,
985                credential_loaded: AtomicBool::new(false),
986                checksum_algorithm,
987            }),
988        })
989    }
990}
991
992/// Backend for s3 services.
993#[derive(Debug, Clone)]
994pub struct S3Backend {
995    core: Arc<S3Core>,
996}
997
998impl Access for S3Backend {
999    type Reader = HttpBody;
1000    type Writer = S3Writers;
1001    type Lister = S3Listers;
1002    type Deleter = oio::BatchDeleter<S3Deleter>;
1003
1004    fn info(&self) -> Arc<AccessorInfo> {
1005        self.core.info.clone()
1006    }
1007
1008    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
1009        let resp = self.core.s3_head_object(path, args).await?;
1010
1011        let status = resp.status();
1012
1013        match status {
1014            StatusCode::OK => {
1015                let headers = resp.headers();
1016                let mut meta = parse_into_metadata(path, headers)?;
1017
1018                let user_meta = parse_prefixed_headers(headers, X_AMZ_META_PREFIX);
1019                if !user_meta.is_empty() {
1020                    meta = meta.with_user_metadata(user_meta);
1021                }
1022
1023                if let Some(v) = parse_header_to_str(headers, X_AMZ_VERSION_ID)? {
1024                    meta.set_version(v);
1025                }
1026
1027                Ok(RpStat::new(meta))
1028            }
1029            _ => Err(parse_error(resp)),
1030        }
1031    }
1032
1033    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
1034        let resp = self.core.s3_get_object(path, args.range(), &args).await?;
1035
1036        let status = resp.status();
1037        match status {
1038            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
1039                Ok((RpRead::default(), resp.into_body()))
1040            }
1041            _ => {
1042                let (part, mut body) = resp.into_parts();
1043                let buf = body.to_buffer().await?;
1044                Err(parse_error(Response::from_parts(part, buf)))
1045            }
1046        }
1047    }
1048
1049    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
1050        let writer = S3Writer::new(self.core.clone(), path, args.clone());
1051
1052        let w = if args.append() {
1053            S3Writers::Two(oio::AppendWriter::new(writer))
1054        } else {
1055            S3Writers::One(oio::MultipartWriter::new(
1056                self.core.info.clone(),
1057                writer,
1058                args.concurrent(),
1059            ))
1060        };
1061
1062        Ok((RpWrite::default(), w))
1063    }
1064
1065    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
1066        Ok((
1067            RpDelete::default(),
1068            oio::BatchDeleter::new(S3Deleter::new(self.core.clone())),
1069        ))
1070    }
1071
1072    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
1073        let l = if args.versions() || args.deleted() {
1074            ThreeWays::Three(oio::PageLister::new(S3ObjectVersionsLister::new(
1075                self.core.clone(),
1076                path,
1077                args,
1078            )))
1079        } else if self.core.disable_list_objects_v2 {
1080            ThreeWays::One(oio::PageLister::new(S3ListerV1::new(
1081                self.core.clone(),
1082                path,
1083                args,
1084            )))
1085        } else {
1086            ThreeWays::Two(oio::PageLister::new(S3ListerV2::new(
1087                self.core.clone(),
1088                path,
1089                args,
1090            )))
1091        };
1092
1093        Ok((RpList::default(), l))
1094    }
1095
1096    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
1097        let resp = self.core.s3_copy_object(from, to).await?;
1098
1099        let status = resp.status();
1100
1101        match status {
1102            StatusCode::OK => Ok(RpCopy::default()),
1103            _ => Err(parse_error(resp)),
1104        }
1105    }
1106
1107    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
1108        let (expire, op) = args.into_parts();
1109        // We will not send this request out, just for signing.
1110        let req = match op {
1111            PresignOperation::Stat(v) => self.core.s3_head_object_request(path, v),
1112            PresignOperation::Read(v) => {
1113                self.core
1114                    .s3_get_object_request(path, BytesRange::default(), &v)
1115            }
1116            PresignOperation::Write(_) => {
1117                self.core
1118                    .s3_put_object_request(path, None, &OpWrite::default(), Buffer::new())
1119            }
1120            PresignOperation::Delete(_) => Err(Error::new(
1121                ErrorKind::Unsupported,
1122                "operation is not supported",
1123            )),
1124        };
1125        let mut req = req?;
1126
1127        self.core.sign_query(&mut req, expire).await?;
1128
1129        // We don't need this request anymore, consume it directly.
1130        let (parts, _) = req.into_parts();
1131
1132        Ok(RpPresign::new(PresignedRequest::new(
1133            parts.method,
1134            parts.uri,
1135            parts.headers,
1136        )))
1137    }
1138}