iceberg/io/
storage_s3.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use opendal::services::S3Config;
23use opendal::{Configurator, Operator};
24pub use reqsign::{AwsCredential, AwsCredentialLoad};
25use reqwest::Client;
26use url::Url;
27
28use crate::io::is_truthy;
29use crate::{Error, ErrorKind, Result};
30
31/// Following are arguments for [s3 file io](https://py.iceberg.apache.org/configuration/#s3).
32/// S3 endpoint.
33pub const S3_ENDPOINT: &str = "s3.endpoint";
34/// S3 access key id.
35pub const S3_ACCESS_KEY_ID: &str = "s3.access-key-id";
36/// S3 secret access key.
37pub const S3_SECRET_ACCESS_KEY: &str = "s3.secret-access-key";
38/// S3 session token.
39/// This is required when using temporary credentials.
40pub const S3_SESSION_TOKEN: &str = "s3.session-token";
41/// S3 region.
42pub const S3_REGION: &str = "s3.region";
43/// Region to use for the S3 client.
44///
45/// This takes precedence over [`S3_REGION`].
46pub const CLIENT_REGION: &str = "client.region";
47/// S3 Path Style Access.
48pub const S3_PATH_STYLE_ACCESS: &str = "s3.path-style-access";
49/// S3 Server Side Encryption Type.
50pub const S3_SSE_TYPE: &str = "s3.sse.type";
51/// S3 Server Side Encryption Key.
52/// If S3 encryption type is kms, input is a KMS Key ID.
53/// In case this property is not set, default key "aws/s3" is used.
54/// If encryption type is custom, input is a custom base-64 AES256 symmetric key.
55pub const S3_SSE_KEY: &str = "s3.sse.key";
56/// S3 Server Side Encryption MD5.
57pub const S3_SSE_MD5: &str = "s3.sse.md5";
58/// If set, all AWS clients will assume a role of the given ARN, instead of using the default
59/// credential chain.
60pub const S3_ASSUME_ROLE_ARN: &str = "client.assume-role.arn";
61/// Optional external ID used to assume an IAM role.
62pub const S3_ASSUME_ROLE_EXTERNAL_ID: &str = "client.assume-role.external-id";
63/// Optional session name used to assume an IAM role.
64pub const S3_ASSUME_ROLE_SESSION_NAME: &str = "client.assume-role.session-name";
65/// Option to skip signing requests (e.g. for public buckets/folders).
66pub const S3_ALLOW_ANONYMOUS: &str = "s3.allow-anonymous";
67/// Option to skip loading the credential from EC2 metadata (typically used in conjunction with
68/// `S3_ALLOW_ANONYMOUS`).
69pub const S3_DISABLE_EC2_METADATA: &str = "s3.disable-ec2-metadata";
70/// Option to skip loading configuration from config file and the env.
71pub const S3_DISABLE_CONFIG_LOAD: &str = "s3.disable-config-load";
72
73/// Parse iceberg props to s3 config.
74pub(crate) fn s3_config_parse(mut m: HashMap<String, String>) -> Result<S3Config> {
75    let mut cfg = S3Config::default();
76    if let Some(endpoint) = m.remove(S3_ENDPOINT) {
77        cfg.endpoint = Some(endpoint);
78    };
79    if let Some(access_key_id) = m.remove(S3_ACCESS_KEY_ID) {
80        cfg.access_key_id = Some(access_key_id);
81    };
82    if let Some(secret_access_key) = m.remove(S3_SECRET_ACCESS_KEY) {
83        cfg.secret_access_key = Some(secret_access_key);
84    };
85    if let Some(session_token) = m.remove(S3_SESSION_TOKEN) {
86        cfg.session_token = Some(session_token);
87    };
88    if let Some(region) = m.remove(S3_REGION) {
89        cfg.region = Some(region);
90    };
91    if let Some(region) = m.remove(CLIENT_REGION) {
92        cfg.region = Some(region);
93    };
94    if let Some(path_style_access) = m.remove(S3_PATH_STYLE_ACCESS) {
95        cfg.enable_virtual_host_style = !is_truthy(path_style_access.to_lowercase().as_str());
96    };
97    if let Some(arn) = m.remove(S3_ASSUME_ROLE_ARN) {
98        cfg.role_arn = Some(arn);
99    }
100    if let Some(external_id) = m.remove(S3_ASSUME_ROLE_EXTERNAL_ID) {
101        cfg.external_id = Some(external_id);
102    };
103    if let Some(session_name) = m.remove(S3_ASSUME_ROLE_SESSION_NAME) {
104        cfg.role_session_name = Some(session_name);
105    };
106    let s3_sse_key = m.remove(S3_SSE_KEY);
107    if let Some(sse_type) = m.remove(S3_SSE_TYPE) {
108        match sse_type.to_lowercase().as_str() {
109            // No Server Side Encryption
110            "none" => {}
111            // S3 SSE-S3 encryption (S3 managed keys). https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingServerSideEncryption.html
112            "s3" => {
113                cfg.server_side_encryption = Some("AES256".to_string());
114            }
115            // S3 SSE KMS, either using default or custom KMS key. https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingKMSEncryption.html
116            "kms" => {
117                cfg.server_side_encryption = Some("aws:kms".to_string());
118                cfg.server_side_encryption_aws_kms_key_id = s3_sse_key;
119            }
120            // S3 SSE-C, using customer managed keys. https://docs.aws.amazon.com/AmazonS3/latest/dev/ServerSideEncryptionCustomerKeys.html
121            "custom" => {
122                cfg.server_side_encryption_customer_algorithm = Some("AES256".to_string());
123                cfg.server_side_encryption_customer_key = s3_sse_key;
124                cfg.server_side_encryption_customer_key_md5 = m.remove(S3_SSE_MD5);
125            }
126            _ => {
127                return Err(Error::new(
128                    ErrorKind::DataInvalid,
129                    format!(
130                        "Invalid {}: {}. Expected one of (custom, kms, s3, none)",
131                        S3_SSE_TYPE, sse_type
132                    ),
133                ));
134            }
135        }
136    };
137
138    if let Some(allow_anonymous) = m.remove(S3_ALLOW_ANONYMOUS) {
139        if is_truthy(allow_anonymous.to_lowercase().as_str()) {
140            cfg.allow_anonymous = true;
141        }
142    }
143    if let Some(disable_ec2_metadata) = m.remove(S3_DISABLE_EC2_METADATA) {
144        if is_truthy(disable_ec2_metadata.to_lowercase().as_str()) {
145            cfg.disable_ec2_metadata = true;
146        }
147    };
148    if let Some(disable_config_load) = m.remove(S3_DISABLE_CONFIG_LOAD) {
149        if is_truthy(disable_config_load.to_lowercase().as_str()) {
150            cfg.disable_config_load = true;
151        }
152    };
153
154    Ok(cfg)
155}
156
157/// Build new opendal operator from give path.
158pub(crate) fn s3_config_build(
159    cfg: &S3Config,
160    customized_credential_load: &Option<CustomAwsCredentialLoader>,
161    path: &str,
162) -> Result<Operator> {
163    let url = Url::parse(path)?;
164    let bucket = url.host_str().ok_or_else(|| {
165        Error::new(
166            ErrorKind::DataInvalid,
167            format!("Invalid s3 url: {}, missing bucket", path),
168        )
169    })?;
170
171    let mut builder = cfg
172        .clone()
173        .into_builder()
174        // Set bucket name.
175        .bucket(bucket);
176
177    if let Some(customized_credential_load) = customized_credential_load {
178        builder = builder
179            .customized_credential_load(customized_credential_load.clone().into_opendal_loader());
180    }
181
182    Ok(Operator::new(builder)?.finish())
183}
184
185/// Custom AWS credential loader.
186/// This can be used to load credentials from a custom source, such as the AWS SDK.
187///
188/// This should be set as an extension on `FileIOBuilder`.
189#[derive(Clone)]
190pub struct CustomAwsCredentialLoader(Arc<dyn AwsCredentialLoad>);
191
192impl std::fmt::Debug for CustomAwsCredentialLoader {
193    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194        f.debug_struct("CustomAwsCredentialLoader")
195            .finish_non_exhaustive()
196    }
197}
198
199impl CustomAwsCredentialLoader {
200    /// Create a new custom AWS credential loader.
201    pub fn new(loader: Arc<dyn AwsCredentialLoad>) -> Self {
202        Self(loader)
203    }
204
205    /// Convert this loader into an opendal compatible loader for customized AWS credentials.
206    pub fn into_opendal_loader(self) -> Box<dyn AwsCredentialLoad> {
207        Box::new(self)
208    }
209}
210
211#[async_trait]
212impl AwsCredentialLoad for CustomAwsCredentialLoader {
213    async fn load_credential(&self, client: Client) -> anyhow::Result<Option<AwsCredential>> {
214        self.0.load_credential(client).await
215    }
216}