mz_storage_types/connections/
aws.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! AWS configuration for sources and sinks.
11
12use anyhow::{anyhow, bail};
13use aws_config::sts::AssumeRoleProvider;
14use aws_credential_types::Credentials;
15use aws_credential_types::provider::{ProvideCredentials, SharedCredentialsProvider};
16use aws_sdk_sts::error::SdkError;
17use aws_sdk_sts::operation::get_caller_identity::GetCallerIdentityError;
18use aws_types::SdkConfig;
19use aws_types::region::Region;
20use mz_ore::error::ErrorExt;
21use mz_ore::future::{InTask, OreFutureExt};
22use mz_repr::{CatalogItemId, GlobalId};
23use proptest_derive::Arbitrary;
24use serde::{Deserialize, Serialize};
25use serde_json::json;
26
27use crate::AlterCompatible;
28use crate::connections::inline::{
29    ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
30    ReferencedConnection,
31};
32use crate::controller::AlterError;
33use crate::{
34    configuration::StorageConfiguration,
35    connections::{ConnectionContext, StringOrSecret},
36};
37
38/// AWS connection configuration.
39#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
40pub struct AwsConnection {
41    pub auth: AwsAuth,
42    /// The AWS region to use.
43    ///
44    /// Uses the default region (looking at env vars, config files, etc) if not
45    /// provided.
46    pub region: Option<String>,
47    /// The custom AWS endpoint to use, if any.
48    pub endpoint: Option<String>,
49}
50
51impl AlterCompatible for AwsConnection {
52    fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
53        // Every element of the AWS connection is configurable.
54        Ok(())
55    }
56}
57
58/// Describes how to authenticate with AWS.
59#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
60pub enum AwsAuth {
61    /// Authenticate with an access key.
62    Credentials(AwsCredentials),
63    //// Authenticate via assuming an IAM role.
64    AssumeRole(AwsAssumeRole),
65}
66
67/// AWS credentials to access an AWS account using user access keys.
68#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
69pub struct AwsCredentials {
70    /// The AWS API Access Key required to connect to the AWS account.
71    pub access_key_id: StringOrSecret,
72    /// The Secret Access Key required to connect to the AWS account.
73    pub secret_access_key: CatalogItemId,
74    /// Optional session token to connect to the AWS account.
75    pub session_token: Option<StringOrSecret>,
76}
77
78impl AwsCredentials {
79    /// Loads a credentials provider with the configured credentials.
80    async fn load_credentials_provider(
81        &self,
82        connection_context: &ConnectionContext,
83        // Whether or not to do IO in a separate Tokio task.
84        in_task: InTask,
85    ) -> Result<impl ProvideCredentials + use<>, anyhow::Error> {
86        let secrets_reader = &connection_context.secrets_reader;
87        Ok(Credentials::from_keys(
88            self.access_key_id
89                // We will already be contained within a tokio task from `load_sdk_config`.
90                .get_string(in_task, secrets_reader)
91                .await
92                .map_err(|_| {
93                    anyhow!("internal error: failed to read access key ID from secret store")
94                })?,
95            connection_context
96                .secrets_reader
97                .read_string(self.secret_access_key)
98                .await
99                .map_err(|_| {
100                    anyhow!("internal error: failed to read secret access key from secret store")
101                })?,
102            match &self.session_token {
103                Some(t) => {
104                    let t = t.get_string(in_task, secrets_reader).await.map_err(|_| {
105                        anyhow!("internal error: failed to read session token from secret store")
106                    })?;
107                    Some(t)
108                }
109                None => None,
110            },
111        ))
112    }
113}
114
115/// Describes an AWS IAM role to assume.
116#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
117pub struct AwsAssumeRole {
118    /// The Amazon Resource Name of the role to assume.
119    pub arn: String,
120    /// The optional session name for the session.
121    pub session_name: Option<String>,
122}
123
124impl AwsAssumeRole {
125    /// Loads a credentials provider that will assume the specified role
126    /// with the appropriate external ID.
127    async fn load_credentials_provider(
128        &self,
129        connection_context: &ConnectionContext,
130        connection_id: CatalogItemId,
131    ) -> Result<impl ProvideCredentials + use<>, anyhow::Error> {
132        let external_id = self.external_id(connection_context, connection_id)?;
133        // It's okay to use `dangerously_load_credentials_provider` here, as
134        // this is the method that provides a safe wrapper by forcing use of the
135        // correct external ID.
136        self.dangerously_load_credentials_provider(
137            connection_context,
138            connection_id,
139            Some(external_id),
140        )
141        .await
142    }
143
144    /// DANGEROUS: only for internal use!
145    ///
146    /// Like `load_credentials_provider`, but accepts an arbitrary external ID.
147    /// Only for use in the internal implementation of AWS connections. Using
148    /// this method incorrectly can result in violating our AWS security
149    /// requirements.
150    async fn dangerously_load_credentials_provider(
151        &self,
152        connection_context: &ConnectionContext,
153        connection_id: CatalogItemId,
154        external_id: Option<String>,
155    ) -> Result<impl ProvideCredentials + use<>, anyhow::Error> {
156        let Some(aws_connection_role_arn) = &connection_context.aws_connection_role_arn else {
157            bail!("internal error: no AWS connection role configured");
158        };
159
160        // Load the default SDK configuration to use for the assume role
161        // operations themselves.
162        let assume_role_sdk_config = mz_aws_util::defaults().load().await;
163
164        // The default session name identifies the environment and the
165        // connection.
166        let default_session_name =
167            format!("{}-{}", &connection_context.environment_id, connection_id);
168
169        // First we create a credentials provider that will assume the "jump
170        // role" provided to this Materialize environment. This is the role that
171        // we've told the end user to allow in their role trust policy. No need
172        // to specify the external ID here as we're still within the Materialize
173        // sphere of trust. The ambient AWS credentials provided to this
174        // environment will be provided via the default credentials change and
175        // allow us to assume the jump role. We always use the default session
176        // name here, so that we can identify the specific environment and
177        // connection ID that initiated the session in our internal CloudTrail
178        // logs. This session isn't visible to the end user.
179        let jump_credentials = AssumeRoleProvider::builder(aws_connection_role_arn)
180            .configure(&assume_role_sdk_config)
181            .session_name(default_session_name.clone())
182            .build()
183            .await;
184
185        // Then we create the provider that will assume the end user's role.
186        // Here, we *must* install the external ID, as we're using the jump role
187        // to hop into the end user's AWS account, and the external ID is the
188        // only thing that allows them to limit their trust of the jump role to
189        // this specific Materialize environment and AWS connection. We also
190        // respect the user's configured session name, if any, as this is the
191        // session that will be visible to them.
192        let mut credentials = AssumeRoleProvider::builder(&self.arn)
193            .configure(&assume_role_sdk_config)
194            .session_name(self.session_name.clone().unwrap_or(default_session_name));
195        if let Some(external_id) = external_id {
196            credentials = credentials.external_id(external_id);
197        }
198        Ok(credentials.build_from_provider(jump_credentials).await)
199    }
200
201    pub fn external_id(
202        &self,
203        connection_context: &ConnectionContext,
204        connection_id: CatalogItemId,
205    ) -> Result<String, anyhow::Error> {
206        let Some(aws_external_id_prefix) = &connection_context.aws_external_id_prefix else {
207            bail!("internal error: no AWS external ID prefix configured");
208        };
209        Ok(format!("mz_{}_{}", aws_external_id_prefix, connection_id))
210    }
211
212    pub fn example_trust_policy(
213        &self,
214        connection_context: &ConnectionContext,
215        connection_id: CatalogItemId,
216    ) -> Result<serde_json::Value, anyhow::Error> {
217        let Some(aws_connection_role_arn) = &connection_context.aws_connection_role_arn else {
218            bail!("internal error: no AWS connection role configured");
219        };
220        Ok(json!(
221            {
222            "Version": "2012-10-17",
223            "Statement": [
224              {
225                "Effect": "Allow",
226                "Principal": {
227                  "AWS": aws_connection_role_arn
228                },
229                "Action": "sts:AssumeRole",
230                "Condition": {
231                  "StringEquals": {
232                    "sts:ExternalId": self.external_id(connection_context, connection_id)?
233                  }
234                }
235              }
236            ]
237          }
238        ))
239    }
240}
241
242impl AwsConnection {
243    /// Loads the AWS SDK configuration with the configuration specified on this
244    /// object.
245    pub async fn load_sdk_config(
246        &self,
247        connection_context: &ConnectionContext,
248        connection_id: CatalogItemId,
249        in_task: InTask,
250    ) -> Result<SdkConfig, anyhow::Error> {
251        let connection_context = connection_context.clone();
252        let this = self.clone();
253        // This entire block is wrapped in a `run_in_task_if`, so the inner futures are
254        // run in-line with `InTask::No`.
255        async move {
256            let credentials = match &this.auth {
257                AwsAuth::Credentials(credentials) => SharedCredentialsProvider::new(
258                    credentials
259                        .load_credentials_provider(&connection_context, InTask::No)
260                        .await?,
261                ),
262                AwsAuth::AssumeRole(assume_role) => SharedCredentialsProvider::new(
263                    assume_role
264                        .load_credentials_provider(&connection_context, connection_id)
265                        .await?,
266                ),
267            };
268            this.load_sdk_config_from_credentials(credentials).await
269        }
270        .run_in_task_if(in_task, || "load_sdk_config".to_string())
271        .await
272    }
273
274    async fn load_sdk_config_from_credentials(
275        &self,
276        credentials: impl ProvideCredentials + 'static,
277    ) -> Result<SdkConfig, anyhow::Error> {
278        let mut loader = mz_aws_util::defaults().credentials_provider(credentials);
279        if let Some(region) = &self.region {
280            loader = loader.region(Region::new(region.clone()));
281        }
282        if let Some(endpoint) = &self.endpoint {
283            loader = loader.endpoint_url(endpoint);
284        }
285        Ok(loader.load().await)
286    }
287
288    pub(crate) async fn validate(
289        &self,
290        id: CatalogItemId,
291        storage_configuration: &StorageConfiguration,
292    ) -> Result<(), AwsConnectionValidationError> {
293        let aws_config = self
294            .load_sdk_config(
295                &storage_configuration.connection_context,
296                id,
297                // We are in a normal tokio context during validation, already.
298                InTask::No,
299            )
300            .await?;
301        let sts_client = aws_sdk_sts::Client::new(&aws_config);
302        let _ = sts_client.get_caller_identity().send().await?;
303
304        if let AwsAuth::AssumeRole(assume_role) = &self.auth {
305            // Per AWS's recommendation, when validating a connection using
306            // `AssumeRole` authentication, we should ensure that the
307            // role rejects `AssumeRole` requests that don't specify an
308            // external ID.
309            let external_id = None;
310            let credentials = assume_role
311                .dangerously_load_credentials_provider(
312                    &storage_configuration.connection_context,
313                    id,
314                    external_id,
315                )
316                .await?;
317            let aws_config = self.load_sdk_config_from_credentials(credentials).await?;
318            let sts_client = aws_sdk_sts::Client::new(&aws_config);
319            if sts_client.get_caller_identity().send().await.is_ok() {
320                return Err(AwsConnectionValidationError::RoleDoesNotRequireExternalId {
321                    role_arn: assume_role.arn.clone(),
322                });
323            }
324        }
325
326        Ok(())
327    }
328
329    pub(crate) fn validate_by_default(&self) -> bool {
330        false
331    }
332}
333
334/// An error returned by `AwsConnection::validate`.
335#[derive(thiserror::Error, Debug)]
336pub enum AwsConnectionValidationError {
337    #[error("role trust policy does not require an external ID")]
338    RoleDoesNotRequireExternalId { role_arn: String },
339    #[error("{}", .0.display_with_causes())]
340    StsGetCallerIdentityError(#[from] SdkError<GetCallerIdentityError>),
341    #[error("{}", .0.display_with_causes())]
342    Other(#[from] anyhow::Error),
343}
344
345impl AwsConnectionValidationError {
346    /// Reports additional details about the error, if any are available.
347    pub fn detail(&self) -> Option<String> {
348        match self {
349            AwsConnectionValidationError::RoleDoesNotRequireExternalId { role_arn } => {
350                Some(format!(
351                    "The trust policy for the connection's role ({role_arn}) is insecure and allows any Materialize customer to assume the role."
352                ))
353            }
354            _ => None,
355        }
356    }
357
358    /// Reports a hint for the user about how the error could be fixed.
359    pub fn hint(&self) -> Option<String> {
360        match self {
361            AwsConnectionValidationError::RoleDoesNotRequireExternalId { .. } => {
362                Some("See: https://materialize.com/s/aws-connection-role-trust-policy".into())
363            }
364            _ => None,
365        }
366    }
367}
368
369/// References an AWS connection.
370#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
371pub struct AwsConnectionReference<C: ConnectionAccess = InlinedConnection> {
372    /// ID of the AWS connection.
373    pub connection_id: CatalogItemId,
374    /// AWS connection object.
375    pub connection: C::Aws,
376}
377
378impl<R: ConnectionResolver> IntoInlineConnection<AwsConnectionReference, R>
379    for AwsConnectionReference<ReferencedConnection>
380{
381    fn into_inline_connection(self, r: R) -> AwsConnectionReference {
382        let AwsConnectionReference {
383            connection,
384            connection_id,
385        } = self;
386
387        AwsConnectionReference {
388            connection: r.resolve_connection(connection).unwrap_aws(),
389            connection_id,
390        }
391    }
392}