1// Copyright 2024 Yuhao Su. All rights reserved.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// This file is derived from:
5//
6// https://github.com/yuhao-su/aws-msk-iam-sasl-signer-rs
7//
8// It was incorporated directly into Materialize on August 1, 2024.
9//
10// Licensed under the Apache License, Version 2.0 (the "License");
11// you may not use this file except in compliance with the License.
12// You may obtain a copy of the License in the LICENSE file at the
13// root of this repository, or online at
14//
15// http://www.apache.org/licenses/LICENSE-2.0
16//
17// Unless required by applicable law or agreed to in writing, software
18// distributed under the License is distributed on an "AS IS" BASIS,
19// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20// See the License for the specific language governing permissions and
21// limitations under the License.
2223//! AWS integration for Kafka.
2425use std::time::{Duration, SystemTime};
2627use anyhow::{Context, bail};
28use aws_credential_types::Credentials;
29use aws_credential_types::provider::ProvideCredentials;
30use aws_credential_types::provider::error::CredentialsError;
31use aws_types::SdkConfig;
32use aws_types::region::Region;
33use base64::Engine;
34use base64::prelude::BASE64_URL_SAFE_NO_PAD;
35use chrono::NaiveDateTime;
36use thiserror::Error;
37use url::Url;
3839/// The default expiration time in seconds.
40const DEFAULT_EXPIRY_SECONDS: u32 = 900;
4142/// An error while signing an AWS IAM URL.
43#[derive(Error, Debug)]
44pub enum SignerError {
45/// An error while fetching AWS credentials.
46#[error("failed to provide credentials: {0}")]
47ProvideCredentials(#[from] CredentialsError),
48/// An error constructing the authentication token.
49#[error("failed constuct auth token: {0}")]
50ConstructAuthToken(String),
51}
5253/// Generate a base64-encoded signed url as an auth token by loading IAM
54/// credentials from an AWS credentials provider.
55pub async fn generate_auth_token(sdk_config: &SdkConfig) -> Result<(String, i64), anyhow::Error> {
56let Some(region) = sdk_config.region() else {
57bail!("internal error: AWS configuration missing region");
58 };
5960let Some(credentials_provider) = sdk_config.credentials_provider() else {
61bail!("internal error: AWS configuration missing credentials");
62 };
63let credentials = credentials_provider.provide_credentials().await?;
6465// TODO: figure out how to generate the endpoint from the SDK configuration
66 // to support localstack, FIPS, etc. The SDK does not make this easy, so for
67 // now we just hardcode the endpoint construction for the major AWS regions.
68let endpoint_url = format!("https://kafka.{}.amazonaws.com", region);
6970let mut url = build_url(&endpoint_url).context("failed to build request for signing")?;
7172 sign_url(&mut url, region, credentials).context("failed to sign request with aws sig v4")?;
7374let expiration_time_ms =
75 get_expiration_time_ms(&url).context("failed to extract expiration from signed url")?;
7677 url.query_pairs_mut()
78 .append_pair("User-Agent", "materialize");
7980Ok((base64_encode(url), expiration_time_ms))
81}
8283fn build_url(endpoint_url: &str) -> Result<Url, anyhow::Error> {
84let mut url = Url::parse(endpoint_url).context("failed to parse url: {e}")?;
85 url.query_pairs_mut()
86 .append_pair("Action", "kafka-cluster:Connect");
87Ok(url)
88}
8990fn sign_url(url: &mut Url, region: &Region, credentials: Credentials) -> Result<(), anyhow::Error> {
91use aws_sigv4::http_request::{
92 SignableBody, SignableRequest, SignatureLocation, SigningSettings, sign,
93 };
94use aws_sigv4::sign::v4;
9596let mut signing_settings = SigningSettings::default();
97 signing_settings.signature_location = SignatureLocation::QueryParams;
98 signing_settings.expires_in = Some(Duration::from_secs(u64::from(DEFAULT_EXPIRY_SECONDS)));
99let identity = credentials.into();
100let signing_params = v4::SigningParams::builder()
101 .identity(&identity)
102 .region(region.as_ref())
103 .name("kafka-cluster")
104 .time(SystemTime::now())
105 .settings(signing_settings)
106 .build()
107 .context("failed to build signing parameters")?;
108let signable_request = SignableRequest::new(
109"GET",
110 url.as_str(),
111 std::iter::empty(),
112 SignableBody::Bytes(&[]),
113 )
114 .expect("signable request");
115116let sign_output =
117 sign(signable_request, &signing_params.into()).context("failed to build sign request")?;
118let (sign_instructions, _) = sign_output.into_parts();
119120let mut url_queries = url.query_pairs_mut();
121for (name, value) in sign_instructions.params() {
122 url_queries.append_pair(name, value);
123 }
124Ok(())
125}
126127fn get_expiration_time_ms(signed_url: &Url) -> Result<i64, anyhow::Error> {
128let (_name, value) = &signed_url
129 .query_pairs()
130 .find(|(name, _value)| name == "X-Amz-Date")
131 .unwrap_or_else(|| ("".into(), "".into()));
132133let date_time = NaiveDateTime::parse_from_str(value, "%Y%m%dT%H%M%SZ")
134 .with_context(|| format!("failed to parse 'X-Amz-Date' param {value} from signed url"))?;
135136let signing_time_ms = date_time.and_utc().timestamp_millis();
137138Ok(signing_time_ms + i64::from(DEFAULT_EXPIRY_SECONDS) * 1000)
139}
140141fn base64_encode(signed_url: Url) -> String {
142 BASE64_URL_SAFE_NO_PAD.encode(signed_url.as_str().as_bytes())
143}