mz_kafka_util/
aws.rs

1// Copyright 2024 Yuhao Su. All rights reserved.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// This file is derived from:
5//
6//    https://github.com/yuhao-su/aws-msk-iam-sasl-signer-rs
7//
8// It was incorporated directly into Materialize on August 1, 2024.
9//
10// Licensed under the Apache License, Version 2.0 (the "License");
11// you may not use this file except in compliance with the License.
12// You may obtain a copy of the License in the LICENSE file at the
13// root of this repository, or online at
14//
15//     http://www.apache.org/licenses/LICENSE-2.0
16//
17// Unless required by applicable law or agreed to in writing, software
18// distributed under the License is distributed on an "AS IS" BASIS,
19// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20// See the License for the specific language governing permissions and
21// limitations under the License.
22
23//! AWS integration for Kafka.
24
25use std::time::{Duration, SystemTime};
26
27use anyhow::{Context, bail};
28use aws_credential_types::Credentials;
29use aws_credential_types::provider::ProvideCredentials;
30use aws_credential_types::provider::error::CredentialsError;
31use aws_types::SdkConfig;
32use aws_types::region::Region;
33use base64::Engine;
34use base64::prelude::BASE64_URL_SAFE_NO_PAD;
35use chrono::NaiveDateTime;
36use thiserror::Error;
37use url::Url;
38
39/// The default expiration time in seconds.
40const DEFAULT_EXPIRY_SECONDS: u32 = 900;
41
42/// An error while signing an AWS IAM URL.
43#[derive(Error, Debug)]
44pub enum SignerError {
45    /// An error while fetching AWS credentials.
46    #[error("failed to provide credentials: {0}")]
47    ProvideCredentials(#[from] CredentialsError),
48    /// An error constructing the authentication token.
49    #[error("failed constuct auth token: {0}")]
50    ConstructAuthToken(String),
51}
52
53/// Generate a base64-encoded signed url as an auth token by loading IAM
54/// credentials from an AWS credentials provider.
55pub async fn generate_auth_token(sdk_config: &SdkConfig) -> Result<(String, i64), anyhow::Error> {
56    let Some(region) = sdk_config.region() else {
57        bail!("internal error: AWS configuration missing region");
58    };
59
60    let Some(credentials_provider) = sdk_config.credentials_provider() else {
61        bail!("internal error: AWS configuration missing credentials");
62    };
63    let credentials = credentials_provider.provide_credentials().await?;
64
65    // TODO: figure out how to generate the endpoint from the SDK configuration
66    // to support localstack, FIPS, etc. The SDK does not make this easy, so for
67    // now we just hardcode the endpoint construction for the major AWS regions.
68    let endpoint_url = format!("https://kafka.{}.amazonaws.com", region);
69
70    let mut url = build_url(&endpoint_url).context("failed to build request for signing")?;
71
72    sign_url(&mut url, region, credentials).context("failed to sign request with aws sig v4")?;
73
74    let expiration_time_ms =
75        get_expiration_time_ms(&url).context("failed to extract expiration from signed url")?;
76
77    url.query_pairs_mut()
78        .append_pair("User-Agent", "materialize");
79
80    Ok((base64_encode(url), expiration_time_ms))
81}
82
83fn build_url(endpoint_url: &str) -> Result<Url, anyhow::Error> {
84    let mut url = Url::parse(endpoint_url).context("failed to parse url: {e}")?;
85    url.query_pairs_mut()
86        .append_pair("Action", "kafka-cluster:Connect");
87    Ok(url)
88}
89
90fn sign_url(url: &mut Url, region: &Region, credentials: Credentials) -> Result<(), anyhow::Error> {
91    use aws_sigv4::http_request::{
92        SignableBody, SignableRequest, SignatureLocation, SigningSettings, sign,
93    };
94    use aws_sigv4::sign::v4;
95
96    let mut signing_settings = SigningSettings::default();
97    signing_settings.signature_location = SignatureLocation::QueryParams;
98    signing_settings.expires_in = Some(Duration::from_secs(u64::from(DEFAULT_EXPIRY_SECONDS)));
99    let identity = credentials.into();
100    let signing_params = v4::SigningParams::builder()
101        .identity(&identity)
102        .region(region.as_ref())
103        .name("kafka-cluster")
104        .time(SystemTime::now())
105        .settings(signing_settings)
106        .build()
107        .context("failed to build signing parameters")?;
108    let signable_request = SignableRequest::new(
109        "GET",
110        url.as_str(),
111        std::iter::empty(),
112        SignableBody::Bytes(&[]),
113    )
114    .expect("signable request");
115
116    let sign_output =
117        sign(signable_request, &signing_params.into()).context("failed to build sign request")?;
118    let (sign_instructions, _) = sign_output.into_parts();
119
120    let mut url_queries = url.query_pairs_mut();
121    for (name, value) in sign_instructions.params() {
122        url_queries.append_pair(name, value);
123    }
124    Ok(())
125}
126
127fn get_expiration_time_ms(signed_url: &Url) -> Result<i64, anyhow::Error> {
128    let (_name, value) = &signed_url
129        .query_pairs()
130        .find(|(name, _value)| name == "X-Amz-Date")
131        .unwrap_or_else(|| ("".into(), "".into()));
132
133    let date_time = NaiveDateTime::parse_from_str(value, "%Y%m%dT%H%M%SZ")
134        .with_context(|| format!("failed to parse 'X-Amz-Date' param {value} from signed url"))?;
135
136    let signing_time_ms = date_time.and_utc().timestamp_millis();
137
138    Ok(signing_time_ms + i64::from(DEFAULT_EXPIRY_SECONDS) * 1000)
139}
140
141fn base64_encode(signed_url: Url) -> String {
142    BASE64_URL_SAFE_NO_PAD.encode(signed_url.as_str().as_bytes())
143}