1use std::collections::HashMap;
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use opendal::services::S3Config;
23use opendal::{Configurator, Operator};
24pub use reqsign::{AwsCredential, AwsCredentialLoad};
25use reqwest::Client;
26use url::Url;
27
28use crate::io::is_truthy;
29use crate::{Error, ErrorKind, Result};
30
31pub const S3_ENDPOINT: &str = "s3.endpoint";
34pub const S3_ACCESS_KEY_ID: &str = "s3.access-key-id";
36pub const S3_SECRET_ACCESS_KEY: &str = "s3.secret-access-key";
38pub const S3_SESSION_TOKEN: &str = "s3.session-token";
41pub const S3_REGION: &str = "s3.region";
43pub const CLIENT_REGION: &str = "client.region";
47pub const S3_PATH_STYLE_ACCESS: &str = "s3.path-style-access";
49pub const S3_SSE_TYPE: &str = "s3.sse.type";
51pub const S3_SSE_KEY: &str = "s3.sse.key";
56pub const S3_SSE_MD5: &str = "s3.sse.md5";
58pub const S3_ASSUME_ROLE_ARN: &str = "client.assume-role.arn";
61pub const S3_ASSUME_ROLE_EXTERNAL_ID: &str = "client.assume-role.external-id";
63pub const S3_ASSUME_ROLE_SESSION_NAME: &str = "client.assume-role.session-name";
65pub const S3_ALLOW_ANONYMOUS: &str = "s3.allow-anonymous";
67pub const S3_DISABLE_EC2_METADATA: &str = "s3.disable-ec2-metadata";
70pub const S3_DISABLE_CONFIG_LOAD: &str = "s3.disable-config-load";
72
73pub(crate) fn s3_config_parse(mut m: HashMap<String, String>) -> Result<S3Config> {
75 let mut cfg = S3Config::default();
76 if let Some(endpoint) = m.remove(S3_ENDPOINT) {
77 cfg.endpoint = Some(endpoint);
78 };
79 if let Some(access_key_id) = m.remove(S3_ACCESS_KEY_ID) {
80 cfg.access_key_id = Some(access_key_id);
81 };
82 if let Some(secret_access_key) = m.remove(S3_SECRET_ACCESS_KEY) {
83 cfg.secret_access_key = Some(secret_access_key);
84 };
85 if let Some(session_token) = m.remove(S3_SESSION_TOKEN) {
86 cfg.session_token = Some(session_token);
87 };
88 if let Some(region) = m.remove(S3_REGION) {
89 cfg.region = Some(region);
90 };
91 if let Some(region) = m.remove(CLIENT_REGION) {
92 cfg.region = Some(region);
93 };
94 if let Some(path_style_access) = m.remove(S3_PATH_STYLE_ACCESS) {
95 cfg.enable_virtual_host_style = !is_truthy(path_style_access.to_lowercase().as_str());
96 };
97 if let Some(arn) = m.remove(S3_ASSUME_ROLE_ARN) {
98 cfg.role_arn = Some(arn);
99 }
100 if let Some(external_id) = m.remove(S3_ASSUME_ROLE_EXTERNAL_ID) {
101 cfg.external_id = Some(external_id);
102 };
103 if let Some(session_name) = m.remove(S3_ASSUME_ROLE_SESSION_NAME) {
104 cfg.role_session_name = Some(session_name);
105 };
106 let s3_sse_key = m.remove(S3_SSE_KEY);
107 if let Some(sse_type) = m.remove(S3_SSE_TYPE) {
108 match sse_type.to_lowercase().as_str() {
109 "none" => {}
111 "s3" => {
113 cfg.server_side_encryption = Some("AES256".to_string());
114 }
115 "kms" => {
117 cfg.server_side_encryption = Some("aws:kms".to_string());
118 cfg.server_side_encryption_aws_kms_key_id = s3_sse_key;
119 }
120 "custom" => {
122 cfg.server_side_encryption_customer_algorithm = Some("AES256".to_string());
123 cfg.server_side_encryption_customer_key = s3_sse_key;
124 cfg.server_side_encryption_customer_key_md5 = m.remove(S3_SSE_MD5);
125 }
126 _ => {
127 return Err(Error::new(
128 ErrorKind::DataInvalid,
129 format!(
130 "Invalid {}: {}. Expected one of (custom, kms, s3, none)",
131 S3_SSE_TYPE, sse_type
132 ),
133 ));
134 }
135 }
136 };
137
138 if let Some(allow_anonymous) = m.remove(S3_ALLOW_ANONYMOUS) {
139 if is_truthy(allow_anonymous.to_lowercase().as_str()) {
140 cfg.allow_anonymous = true;
141 }
142 }
143 if let Some(disable_ec2_metadata) = m.remove(S3_DISABLE_EC2_METADATA) {
144 if is_truthy(disable_ec2_metadata.to_lowercase().as_str()) {
145 cfg.disable_ec2_metadata = true;
146 }
147 };
148 if let Some(disable_config_load) = m.remove(S3_DISABLE_CONFIG_LOAD) {
149 if is_truthy(disable_config_load.to_lowercase().as_str()) {
150 cfg.disable_config_load = true;
151 }
152 };
153
154 Ok(cfg)
155}
156
157pub(crate) fn s3_config_build(
159 cfg: &S3Config,
160 customized_credential_load: &Option<CustomAwsCredentialLoader>,
161 path: &str,
162) -> Result<Operator> {
163 let url = Url::parse(path)?;
164 let bucket = url.host_str().ok_or_else(|| {
165 Error::new(
166 ErrorKind::DataInvalid,
167 format!("Invalid s3 url: {}, missing bucket", path),
168 )
169 })?;
170
171 let mut builder = cfg
172 .clone()
173 .into_builder()
174 .bucket(bucket);
176
177 if let Some(customized_credential_load) = customized_credential_load {
178 builder = builder
179 .customized_credential_load(customized_credential_load.clone().into_opendal_loader());
180 }
181
182 Ok(Operator::new(builder)?.finish())
183}
184
185#[derive(Clone)]
190pub struct CustomAwsCredentialLoader(Arc<dyn AwsCredentialLoad>);
191
192impl std::fmt::Debug for CustomAwsCredentialLoader {
193 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194 f.debug_struct("CustomAwsCredentialLoader")
195 .finish_non_exhaustive()
196 }
197}
198
199impl CustomAwsCredentialLoader {
200 pub fn new(loader: Arc<dyn AwsCredentialLoad>) -> Self {
202 Self(loader)
203 }
204
205 pub fn into_opendal_loader(self) -> Box<dyn AwsCredentialLoad> {
207 Box::new(self)
208 }
209}
210
211#[async_trait]
212impl AwsCredentialLoad for CustomAwsCredentialLoader {
213 async fn load_credential(&self, client: Client) -> anyhow::Result<Option<AwsCredential>> {
214 self.0.load_credential(client).await
215 }
216}