mz_aws_secrets_controller/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::sync::Arc;
12use std::time::Instant;
13
14use anyhow::anyhow;
15use async_trait::async_trait;
16use aws_config::SdkConfig;
17use aws_sdk_secretsmanager::Client;
18use aws_sdk_secretsmanager::config::Builder as SecretsManagerConfigBuilder;
19use aws_sdk_secretsmanager::error::SdkError;
20use aws_sdk_secretsmanager::primitives::Blob;
21use aws_sdk_secretsmanager::types::{Filter, FilterNameStringType, Tag};
22use mz_repr::CatalogItemId;
23use mz_secrets::{SecretsController, SecretsReader};
24use tracing::info;
25use uuid::Uuid;
26
27#[derive(Clone, Debug)]
28pub struct AwsSecretsController {
29    pub client: AwsSecretsClient,
30    pub kms_key_alias: String,
31    pub default_tags: BTreeMap<String, String>,
32}
33
34pub async fn load_sdk_config() -> SdkConfig {
35    let mut config_loader = mz_aws_util::defaults();
36    if let Ok(endpoint) = std::env::var("AWS_ENDPOINT_URL") {
37        config_loader = config_loader.endpoint_url(endpoint);
38    }
39    config_loader.load().await
40}
41
42async fn load_secrets_manager_client() -> Client {
43    let sdk_config = load_sdk_config().await;
44    let sm_config = SecretsManagerConfigBuilder::from(&sdk_config).build();
45    Client::from_conf(sm_config)
46}
47
48impl AwsSecretsController {
49    pub async fn new(
50        prefix: &str,
51        key_alias: &str,
52        default_tags: BTreeMap<String, String>,
53    ) -> Self {
54        AwsSecretsController {
55            client: AwsSecretsClient::new(prefix).await,
56            kms_key_alias: key_alias.to_string(),
57            default_tags,
58        }
59    }
60
61    fn tags(&self) -> Vec<Tag> {
62        self.default_tags
63            .iter()
64            .map(|(key, value)| Tag::builder().key(key).value(value).build())
65            .collect()
66    }
67}
68
69#[async_trait]
70impl SecretsController for AwsSecretsController {
71    async fn ensure(&self, id: CatalogItemId, contents: &[u8]) -> Result<(), anyhow::Error> {
72        match self
73            .client
74            .client
75            .create_secret()
76            .name(self.client.secret_name(id))
77            .kms_key_id(self.kms_key_alias.clone())
78            .secret_binary(Blob::new(contents))
79            .set_tags(Some(self.tags()))
80            .send()
81            .await
82        {
83            Ok(_) => {}
84            Err(SdkError::ServiceError(e)) if e.err().is_resource_exists_exception() => {
85                self.client
86                    .client
87                    .put_secret_value()
88                    .secret_id(self.client.secret_name(id))
89                    .secret_binary(Blob::new(contents))
90                    .send()
91                    .await?;
92            }
93            Err(e) => Err(e)?,
94        }
95        Ok(())
96    }
97
98    async fn delete(&self, id: CatalogItemId) -> Result<(), anyhow::Error> {
99        match self
100            .client
101            .client
102            .delete_secret()
103            .secret_id(self.client.secret_name(id))
104            .force_delete_without_recovery(true)
105            .send()
106            .await
107        {
108            Ok(_) => Ok(()),
109            // Secret is already deleted.
110            Err(SdkError::ServiceError(e)) if e.err().is_resource_not_found_exception() => Ok(()),
111            Err(e) => {
112                return Err(e.into());
113            }
114        }
115    }
116
117    async fn list(&self) -> Result<Vec<CatalogItemId>, anyhow::Error> {
118        let mut ids = Vec::new();
119        let mut filters = self.default_tags.iter().fold(
120            Vec::with_capacity(self.default_tags.len() * 2 + 1),
121            |mut filters, (key, value)| {
122                filters.push(
123                    Filter::builder()
124                        .key(FilterNameStringType::TagKey)
125                        .values(key)
126                        .build(),
127                );
128                filters.push(
129                    Filter::builder()
130                        .key(FilterNameStringType::TagValue)
131                        .values(value)
132                        .build(),
133                );
134                filters
135            },
136        );
137        filters.push(
138            Filter::builder()
139                .key(FilterNameStringType::Name)
140                .values(&self.client.secret_name_prefix)
141                .build(),
142        );
143        let mut secrets_paginator = self
144            .client
145            .client
146            .list_secrets()
147            .set_filters(Some(filters))
148            .into_paginator()
149            .send();
150        while let Some(page) = secrets_paginator.next().await {
151            for secret in page?.secret_list() {
152                let required_tags_count: usize = secret
153                    .tags()
154                    .into_iter()
155                    .filter_map(|tag| {
156                        tag.key().and_then(|key| {
157                            if self.default_tags.contains_key(key)
158                                && tag.value() == self.default_tags.get(key).map(|x| x.as_str())
159                            {
160                                Some(1)
161                            } else {
162                                None
163                            }
164                        })
165                    })
166                    .sum();
167                // Ignore improperly tagged objects.
168                if required_tags_count != self.default_tags.len() {
169                    continue;
170                }
171                // Ignore invalidly named objects.
172                let Some(id) = self.client.id_from_secret_name(secret.name().unwrap()) else {
173                    continue;
174                };
175                ids.push(id);
176            }
177        }
178        Ok(ids)
179    }
180
181    fn reader(&self) -> Arc<dyn SecretsReader> {
182        Arc::new(self.client.clone())
183    }
184}
185
186#[derive(Clone, Debug)]
187pub struct AwsSecretsClient {
188    pub(crate) client: Client,
189    pub(crate) secret_name_prefix: String,
190}
191
192impl AwsSecretsClient {
193    pub async fn new(prefix: &str) -> Self {
194        Self {
195            client: load_secrets_manager_client().await,
196            // TODO [Alex Hunt] move this to a shared function that can be imported by the
197            // region-controller.
198            secret_name_prefix: prefix.to_owned(),
199        }
200    }
201
202    fn secret_name(&self, id: CatalogItemId) -> String {
203        format!("{}{}", self.secret_name_prefix, id)
204    }
205
206    fn id_from_secret_name(&self, name: &str) -> Option<CatalogItemId> {
207        name.strip_prefix(&self.secret_name_prefix)
208            .and_then(|id| id.parse().ok())
209    }
210}
211
212#[async_trait]
213impl SecretsReader for AwsSecretsClient {
214    async fn read(&self, id: CatalogItemId) -> Result<Vec<u8>, anyhow::Error> {
215        let op_id = Uuid::new_v4();
216        info!(secret_id = %id, %op_id, "reading secret from AWS");
217        let start = Instant::now();
218        let secret = async {
219            Ok(self
220                .client
221                .get_secret_value()
222                .secret_id(self.secret_name(id))
223                .send()
224                .await?
225                .secret_binary()
226                .ok_or_else(|| anyhow!("internal error: secret missing secret_binary field"))?
227                .to_owned()
228                .into_inner())
229        }
230        .await;
231        info!(%op_id, success = %secret.is_ok(), "secret read in {:?}", start.elapsed());
232        secret
233    }
234}