1use std::time::{Duration, SystemTime};
26
27use anyhow::{Context, bail};
28use aws_credential_types::Credentials;
29use aws_credential_types::provider::ProvideCredentials;
30use aws_credential_types::provider::error::CredentialsError;
31use aws_types::SdkConfig;
32use aws_types::region::Region;
33use base64::Engine;
34use base64::prelude::BASE64_URL_SAFE_NO_PAD;
35use chrono::NaiveDateTime;
36use thiserror::Error;
37use url::Url;
38
39const DEFAULT_EXPIRY_SECONDS: u32 = 900;
41
42#[derive(Error, Debug)]
44pub enum SignerError {
45 #[error("failed to provide credentials: {0}")]
47 ProvideCredentials(#[from] CredentialsError),
48 #[error("failed constuct auth token: {0}")]
50 ConstructAuthToken(String),
51}
52
53pub async fn generate_auth_token(sdk_config: &SdkConfig) -> Result<(String, i64), anyhow::Error> {
56 let Some(region) = sdk_config.region() else {
57 bail!("internal error: AWS configuration missing region");
58 };
59
60 let Some(credentials_provider) = sdk_config.credentials_provider() else {
61 bail!("internal error: AWS configuration missing credentials");
62 };
63 let credentials = credentials_provider.provide_credentials().await?;
64
65 let endpoint_url = format!("https://kafka.{}.amazonaws.com", region);
69
70 let mut url = build_url(&endpoint_url).context("failed to build request for signing")?;
71
72 sign_url(&mut url, region, credentials).context("failed to sign request with aws sig v4")?;
73
74 let expiration_time_ms =
75 get_expiration_time_ms(&url).context("failed to extract expiration from signed url")?;
76
77 url.query_pairs_mut()
78 .append_pair("User-Agent", "materialize");
79
80 Ok((base64_encode(url), expiration_time_ms))
81}
82
83fn build_url(endpoint_url: &str) -> Result<Url, anyhow::Error> {
84 let mut url = Url::parse(endpoint_url).context("failed to parse url: {e}")?;
85 url.query_pairs_mut()
86 .append_pair("Action", "kafka-cluster:Connect");
87 Ok(url)
88}
89
90fn sign_url(url: &mut Url, region: &Region, credentials: Credentials) -> Result<(), anyhow::Error> {
91 use aws_sigv4::http_request::{
92 SignableBody, SignableRequest, SignatureLocation, SigningSettings, sign,
93 };
94 use aws_sigv4::sign::v4;
95
96 let mut signing_settings = SigningSettings::default();
97 signing_settings.signature_location = SignatureLocation::QueryParams;
98 signing_settings.expires_in = Some(Duration::from_secs(u64::from(DEFAULT_EXPIRY_SECONDS)));
99 let identity = credentials.into();
100 let signing_params = v4::SigningParams::builder()
101 .identity(&identity)
102 .region(region.as_ref())
103 .name("kafka-cluster")
104 .time(SystemTime::now())
105 .settings(signing_settings)
106 .build()
107 .context("failed to build signing parameters")?;
108 let signable_request = SignableRequest::new(
109 "GET",
110 url.as_str(),
111 std::iter::empty(),
112 SignableBody::Bytes(&[]),
113 )
114 .expect("signable request");
115
116 let sign_output =
117 sign(signable_request, &signing_params.into()).context("failed to build sign request")?;
118 let (sign_instructions, _) = sign_output.into_parts();
119
120 let mut url_queries = url.query_pairs_mut();
121 for (name, value) in sign_instructions.params() {
122 url_queries.append_pair(name, value);
123 }
124 Ok(())
125}
126
127fn get_expiration_time_ms(signed_url: &Url) -> Result<i64, anyhow::Error> {
128 let (_name, value) = &signed_url
129 .query_pairs()
130 .find(|(name, _value)| name == "X-Amz-Date")
131 .unwrap_or_else(|| ("".into(), "".into()));
132
133 let date_time = NaiveDateTime::parse_from_str(value, "%Y%m%dT%H%M%SZ")
134 .with_context(|| format!("failed to parse 'X-Amz-Date' param {value} from signed url"))?;
135
136 let signing_time_ms = date_time.and_utc().timestamp_millis();
137
138 Ok(signing_time_ms + i64::from(DEFAULT_EXPIRY_SECONDS) * 1000)
139}
140
141fn base64_encode(signed_url: Url) -> String {
142 BASE64_URL_SAFE_NO_PAD.encode(signed_url.as_str().as_bytes())
143}