mz_aws_secrets_controller/
lib.rs
1use std::collections::BTreeMap;
11use std::sync::Arc;
12use std::time::Instant;
13
14use anyhow::anyhow;
15use async_trait::async_trait;
16use aws_config::SdkConfig;
17use aws_sdk_secretsmanager::Client;
18use aws_sdk_secretsmanager::config::Builder as SecretsManagerConfigBuilder;
19use aws_sdk_secretsmanager::error::SdkError;
20use aws_sdk_secretsmanager::primitives::Blob;
21use aws_sdk_secretsmanager::types::{Filter, FilterNameStringType, Tag};
22use mz_repr::CatalogItemId;
23use mz_secrets::{SecretsController, SecretsReader};
24use tracing::info;
25use uuid::Uuid;
26
27#[derive(Clone, Debug)]
28pub struct AwsSecretsController {
29 pub client: AwsSecretsClient,
30 pub kms_key_alias: String,
31 pub default_tags: BTreeMap<String, String>,
32}
33
34pub async fn load_sdk_config() -> SdkConfig {
35 let mut config_loader = mz_aws_util::defaults();
36 if let Ok(endpoint) = std::env::var("AWS_ENDPOINT_URL") {
37 config_loader = config_loader.endpoint_url(endpoint);
38 }
39 config_loader.load().await
40}
41
42async fn load_secrets_manager_client() -> Client {
43 let sdk_config = load_sdk_config().await;
44 let sm_config = SecretsManagerConfigBuilder::from(&sdk_config).build();
45 Client::from_conf(sm_config)
46}
47
48impl AwsSecretsController {
49 pub async fn new(
50 prefix: &str,
51 key_alias: &str,
52 default_tags: BTreeMap<String, String>,
53 ) -> Self {
54 AwsSecretsController {
55 client: AwsSecretsClient::new(prefix).await,
56 kms_key_alias: key_alias.to_string(),
57 default_tags,
58 }
59 }
60
61 fn tags(&self) -> Vec<Tag> {
62 self.default_tags
63 .iter()
64 .map(|(key, value)| Tag::builder().key(key).value(value).build())
65 .collect()
66 }
67}
68
69#[async_trait]
70impl SecretsController for AwsSecretsController {
71 async fn ensure(&self, id: CatalogItemId, contents: &[u8]) -> Result<(), anyhow::Error> {
72 match self
73 .client
74 .client
75 .create_secret()
76 .name(self.client.secret_name(id))
77 .kms_key_id(self.kms_key_alias.clone())
78 .secret_binary(Blob::new(contents))
79 .set_tags(Some(self.tags()))
80 .send()
81 .await
82 {
83 Ok(_) => {}
84 Err(SdkError::ServiceError(e)) if e.err().is_resource_exists_exception() => {
85 self.client
86 .client
87 .put_secret_value()
88 .secret_id(self.client.secret_name(id))
89 .secret_binary(Blob::new(contents))
90 .send()
91 .await?;
92 }
93 Err(e) => Err(e)?,
94 }
95 Ok(())
96 }
97
98 async fn delete(&self, id: CatalogItemId) -> Result<(), anyhow::Error> {
99 match self
100 .client
101 .client
102 .delete_secret()
103 .secret_id(self.client.secret_name(id))
104 .force_delete_without_recovery(true)
105 .send()
106 .await
107 {
108 Ok(_) => Ok(()),
109 Err(SdkError::ServiceError(e)) if e.err().is_resource_not_found_exception() => Ok(()),
111 Err(e) => {
112 return Err(e.into());
113 }
114 }
115 }
116
117 async fn list(&self) -> Result<Vec<CatalogItemId>, anyhow::Error> {
118 let mut ids = Vec::new();
119 let mut filters = self.default_tags.iter().fold(
120 Vec::with_capacity(self.default_tags.len() * 2 + 1),
121 |mut filters, (key, value)| {
122 filters.push(
123 Filter::builder()
124 .key(FilterNameStringType::TagKey)
125 .values(key)
126 .build(),
127 );
128 filters.push(
129 Filter::builder()
130 .key(FilterNameStringType::TagValue)
131 .values(value)
132 .build(),
133 );
134 filters
135 },
136 );
137 filters.push(
138 Filter::builder()
139 .key(FilterNameStringType::Name)
140 .values(&self.client.secret_name_prefix)
141 .build(),
142 );
143 let mut secrets_paginator = self
144 .client
145 .client
146 .list_secrets()
147 .set_filters(Some(filters))
148 .into_paginator()
149 .send();
150 while let Some(page) = secrets_paginator.next().await {
151 for secret in page?.secret_list() {
152 let required_tags_count: usize = secret
153 .tags()
154 .into_iter()
155 .filter_map(|tag| {
156 tag.key().and_then(|key| {
157 if self.default_tags.contains_key(key)
158 && tag.value() == self.default_tags.get(key).map(|x| x.as_str())
159 {
160 Some(1)
161 } else {
162 None
163 }
164 })
165 })
166 .sum();
167 if required_tags_count != self.default_tags.len() {
169 continue;
170 }
171 let Some(id) = self.client.id_from_secret_name(secret.name().unwrap()) else {
173 continue;
174 };
175 ids.push(id);
176 }
177 }
178 Ok(ids)
179 }
180
181 fn reader(&self) -> Arc<dyn SecretsReader> {
182 Arc::new(self.client.clone())
183 }
184}
185
186#[derive(Clone, Debug)]
187pub struct AwsSecretsClient {
188 pub(crate) client: Client,
189 pub(crate) secret_name_prefix: String,
190}
191
192impl AwsSecretsClient {
193 pub async fn new(prefix: &str) -> Self {
194 Self {
195 client: load_secrets_manager_client().await,
196 secret_name_prefix: prefix.to_owned(),
199 }
200 }
201
202 fn secret_name(&self, id: CatalogItemId) -> String {
203 format!("{}{}", self.secret_name_prefix, id)
204 }
205
206 fn id_from_secret_name(&self, name: &str) -> Option<CatalogItemId> {
207 name.strip_prefix(&self.secret_name_prefix)
208 .and_then(|id| id.parse().ok())
209 }
210}
211
212#[async_trait]
213impl SecretsReader for AwsSecretsClient {
214 async fn read(&self, id: CatalogItemId) -> Result<Vec<u8>, anyhow::Error> {
215 let op_id = Uuid::new_v4();
216 info!(secret_id = %id, %op_id, "reading secret from AWS");
217 let start = Instant::now();
218 let secret = async {
219 Ok(self
220 .client
221 .get_secret_value()
222 .secret_id(self.secret_name(id))
223 .send()
224 .await?
225 .secret_binary()
226 .ok_or_else(|| anyhow!("internal error: secret missing secret_binary field"))?
227 .to_owned()
228 .into_inner())
229 }
230 .await;
231 info!(%op_id, success = %secret.is_ok(), "secret read in {:?}", start.elapsed());
232 secret
233 }
234}