mz_storage_types/connections/
aws.rs1use anyhow::{anyhow, bail};
13use aws_config::sts::AssumeRoleProvider;
14use aws_credential_types::Credentials;
15use aws_credential_types::provider::{ProvideCredentials, SharedCredentialsProvider};
16use aws_sdk_sts::error::SdkError;
17use aws_sdk_sts::operation::get_caller_identity::GetCallerIdentityError;
18use aws_types::SdkConfig;
19use aws_types::region::Region;
20use mz_ore::error::ErrorExt;
21use mz_ore::future::{InTask, OreFutureExt};
22use mz_repr::{CatalogItemId, GlobalId};
23use proptest_derive::Arbitrary;
24use serde::{Deserialize, Serialize};
25use serde_json::json;
26
27use crate::AlterCompatible;
28use crate::connections::inline::{
29 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
30 ReferencedConnection,
31};
32use crate::controller::AlterError;
33use crate::{
34 configuration::StorageConfiguration,
35 connections::{ConnectionContext, StringOrSecret},
36};
37
38#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
40pub struct AwsConnection {
41 pub auth: AwsAuth,
42 pub region: Option<String>,
47 pub endpoint: Option<String>,
49}
50
51impl AlterCompatible for AwsConnection {
52 fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
53 Ok(())
55 }
56}
57
58#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
60pub enum AwsAuth {
61 Credentials(AwsCredentials),
63 AssumeRole(AwsAssumeRole),
65}
66
67#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
69pub struct AwsCredentials {
70 pub access_key_id: StringOrSecret,
72 pub secret_access_key: CatalogItemId,
74 pub session_token: Option<StringOrSecret>,
76}
77
78impl AwsCredentials {
79 async fn load_credentials_provider(
81 &self,
82 connection_context: &ConnectionContext,
83 in_task: InTask,
85 ) -> Result<impl ProvideCredentials + use<>, anyhow::Error> {
86 let secrets_reader = &connection_context.secrets_reader;
87 Ok(Credentials::from_keys(
88 self.access_key_id
89 .get_string(in_task, secrets_reader)
91 .await
92 .map_err(|_| {
93 anyhow!("internal error: failed to read access key ID from secret store")
94 })?,
95 connection_context
96 .secrets_reader
97 .read_string(self.secret_access_key)
98 .await
99 .map_err(|_| {
100 anyhow!("internal error: failed to read secret access key from secret store")
101 })?,
102 match &self.session_token {
103 Some(t) => {
104 let t = t.get_string(in_task, secrets_reader).await.map_err(|_| {
105 anyhow!("internal error: failed to read session token from secret store")
106 })?;
107 Some(t)
108 }
109 None => None,
110 },
111 ))
112 }
113}
114
115#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
117pub struct AwsAssumeRole {
118 pub arn: String,
120 pub session_name: Option<String>,
122}
123
124impl AwsAssumeRole {
125 async fn load_credentials_provider(
128 &self,
129 connection_context: &ConnectionContext,
130 connection_id: CatalogItemId,
131 ) -> Result<impl ProvideCredentials + use<>, anyhow::Error> {
132 let external_id = self.external_id(connection_context, connection_id)?;
133 self.dangerously_load_credentials_provider(
137 connection_context,
138 connection_id,
139 Some(external_id),
140 )
141 .await
142 }
143
144 async fn dangerously_load_credentials_provider(
151 &self,
152 connection_context: &ConnectionContext,
153 connection_id: CatalogItemId,
154 external_id: Option<String>,
155 ) -> Result<impl ProvideCredentials + use<>, anyhow::Error> {
156 let Some(aws_connection_role_arn) = &connection_context.aws_connection_role_arn else {
157 bail!("internal error: no AWS connection role configured");
158 };
159
160 let assume_role_sdk_config = mz_aws_util::defaults().load().await;
163
164 let default_session_name =
167 format!("{}-{}", &connection_context.environment_id, connection_id);
168
169 let jump_credentials = AssumeRoleProvider::builder(aws_connection_role_arn)
180 .configure(&assume_role_sdk_config)
181 .session_name(default_session_name.clone())
182 .build()
183 .await;
184
185 let mut credentials = AssumeRoleProvider::builder(&self.arn)
193 .configure(&assume_role_sdk_config)
194 .session_name(self.session_name.clone().unwrap_or(default_session_name));
195 if let Some(external_id) = external_id {
196 credentials = credentials.external_id(external_id);
197 }
198 Ok(credentials.build_from_provider(jump_credentials).await)
199 }
200
201 pub fn external_id(
202 &self,
203 connection_context: &ConnectionContext,
204 connection_id: CatalogItemId,
205 ) -> Result<String, anyhow::Error> {
206 let Some(aws_external_id_prefix) = &connection_context.aws_external_id_prefix else {
207 bail!("internal error: no AWS external ID prefix configured");
208 };
209 Ok(format!("mz_{}_{}", aws_external_id_prefix, connection_id))
210 }
211
212 pub fn example_trust_policy(
213 &self,
214 connection_context: &ConnectionContext,
215 connection_id: CatalogItemId,
216 ) -> Result<serde_json::Value, anyhow::Error> {
217 let Some(aws_connection_role_arn) = &connection_context.aws_connection_role_arn else {
218 bail!("internal error: no AWS connection role configured");
219 };
220 Ok(json!(
221 {
222 "Version": "2012-10-17",
223 "Statement": [
224 {
225 "Effect": "Allow",
226 "Principal": {
227 "AWS": aws_connection_role_arn
228 },
229 "Action": "sts:AssumeRole",
230 "Condition": {
231 "StringEquals": {
232 "sts:ExternalId": self.external_id(connection_context, connection_id)?
233 }
234 }
235 }
236 ]
237 }
238 ))
239 }
240}
241
242impl AwsConnection {
243 pub async fn load_sdk_config(
246 &self,
247 connection_context: &ConnectionContext,
248 connection_id: CatalogItemId,
249 in_task: InTask,
250 ) -> Result<SdkConfig, anyhow::Error> {
251 let connection_context = connection_context.clone();
252 let this = self.clone();
253 async move {
256 let credentials = match &this.auth {
257 AwsAuth::Credentials(credentials) => SharedCredentialsProvider::new(
258 credentials
259 .load_credentials_provider(&connection_context, InTask::No)
260 .await?,
261 ),
262 AwsAuth::AssumeRole(assume_role) => SharedCredentialsProvider::new(
263 assume_role
264 .load_credentials_provider(&connection_context, connection_id)
265 .await?,
266 ),
267 };
268 this.load_sdk_config_from_credentials(credentials).await
269 }
270 .run_in_task_if(in_task, || "load_sdk_config".to_string())
271 .await
272 }
273
274 async fn load_sdk_config_from_credentials(
275 &self,
276 credentials: impl ProvideCredentials + 'static,
277 ) -> Result<SdkConfig, anyhow::Error> {
278 let mut loader = mz_aws_util::defaults().credentials_provider(credentials);
279 if let Some(region) = &self.region {
280 loader = loader.region(Region::new(region.clone()));
281 }
282 if let Some(endpoint) = &self.endpoint {
283 loader = loader.endpoint_url(endpoint);
284 }
285 Ok(loader.load().await)
286 }
287
288 pub(crate) async fn validate(
289 &self,
290 id: CatalogItemId,
291 storage_configuration: &StorageConfiguration,
292 ) -> Result<(), AwsConnectionValidationError> {
293 let aws_config = self
294 .load_sdk_config(
295 &storage_configuration.connection_context,
296 id,
297 InTask::No,
299 )
300 .await?;
301 let sts_client = aws_sdk_sts::Client::new(&aws_config);
302 let _ = sts_client.get_caller_identity().send().await?;
303
304 if let AwsAuth::AssumeRole(assume_role) = &self.auth {
305 let external_id = None;
310 let credentials = assume_role
311 .dangerously_load_credentials_provider(
312 &storage_configuration.connection_context,
313 id,
314 external_id,
315 )
316 .await?;
317 let aws_config = self.load_sdk_config_from_credentials(credentials).await?;
318 let sts_client = aws_sdk_sts::Client::new(&aws_config);
319 if sts_client.get_caller_identity().send().await.is_ok() {
320 return Err(AwsConnectionValidationError::RoleDoesNotRequireExternalId {
321 role_arn: assume_role.arn.clone(),
322 });
323 }
324 }
325
326 Ok(())
327 }
328
329 pub(crate) fn validate_by_default(&self) -> bool {
330 false
331 }
332}
333
334#[derive(thiserror::Error, Debug)]
336pub enum AwsConnectionValidationError {
337 #[error("role trust policy does not require an external ID")]
338 RoleDoesNotRequireExternalId { role_arn: String },
339 #[error("{}", .0.display_with_causes())]
340 StsGetCallerIdentityError(#[from] SdkError<GetCallerIdentityError>),
341 #[error("{}", .0.display_with_causes())]
342 Other(#[from] anyhow::Error),
343}
344
345impl AwsConnectionValidationError {
346 pub fn detail(&self) -> Option<String> {
348 match self {
349 AwsConnectionValidationError::RoleDoesNotRequireExternalId { role_arn } => {
350 Some(format!(
351 "The trust policy for the connection's role ({role_arn}) is insecure and allows any Materialize customer to assume the role."
352 ))
353 }
354 _ => None,
355 }
356 }
357
358 pub fn hint(&self) -> Option<String> {
360 match self {
361 AwsConnectionValidationError::RoleDoesNotRequireExternalId { .. } => {
362 Some("See: https://materialize.com/s/aws-connection-role-trust-policy".into())
363 }
364 _ => None,
365 }
366 }
367}
368
369#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
371pub struct AwsConnectionReference<C: ConnectionAccess = InlinedConnection> {
372 pub connection_id: CatalogItemId,
374 pub connection: C::Aws,
376}
377
378impl<R: ConnectionResolver> IntoInlineConnection<AwsConnectionReference, R>
379 for AwsConnectionReference<ReferencedConnection>
380{
381 fn into_inline_connection(self, r: R) -> AwsConnectionReference {
382 let AwsConnectionReference {
383 connection,
384 connection_id,
385 } = self;
386
387 AwsConnectionReference {
388 connection: r.resolve_connection(connection).unwrap_aws(),
389 connection_id,
390 }
391 }
392}