mz_orchestrator_kubernetes/
secrets.rs
1use std::iter;
13use std::sync::Arc;
14
15use anyhow::anyhow;
16use async_trait::async_trait;
17use k8s_openapi::ByteString;
18use k8s_openapi::api::core::v1::Secret;
19use kube::Api;
20use kube::api::{DeleteParams, ListParams, ObjectMeta, Patch, PatchParams};
21use mz_repr::CatalogItemId;
22use mz_secrets::{SecretsController, SecretsReader};
23
24use crate::{FIELD_MANAGER, KubernetesOrchestrator, util};
25
26#[async_trait]
27impl SecretsController for KubernetesOrchestrator {
28 async fn ensure(&self, id: CatalogItemId, contents: &[u8]) -> Result<(), anyhow::Error> {
29 let name = secret_name(id, &self.config.name_prefix());
30 let data = iter::once(("contents".into(), ByteString(contents.into())));
31 let secret = Secret {
32 metadata: ObjectMeta {
33 name: Some(name.clone()),
34 ..Default::default()
35 },
36 data: Some(data.collect()),
37 ..Default::default()
38 };
39 self.secret_api
40 .patch(
41 &name,
42 &PatchParams::apply(FIELD_MANAGER).force(),
43 &Patch::Apply(secret),
44 )
45 .await?;
46 Ok(())
47 }
48
49 async fn delete(&self, id: CatalogItemId) -> Result<(), anyhow::Error> {
50 match self
55 .secret_api
56 .delete(
57 &secret_name(id, &self.config.name_prefix()),
58 &DeleteParams::default(),
59 )
60 .await
61 {
62 Ok(_) => Ok(()),
63 Err(kube::Error::Api(e)) if e.code == 404 => Ok(()),
65 Err(e) => return Err(e.into()),
66 }
67 }
68
69 async fn list(&self) -> Result<Vec<CatalogItemId>, anyhow::Error> {
70 let objs = self.secret_api.list(&ListParams::default()).await?;
71 let mut ids = Vec::new();
72 for item in objs.items {
73 let Some(name) = item.metadata.name else {
75 continue;
76 };
77 let Some(id) = from_secret_name(&name, &self.config.name_prefix()) else {
79 continue;
80 };
81 ids.push(id);
82 }
83 Ok(ids)
84 }
85
86 fn reader(&self) -> Arc<dyn SecretsReader> {
87 Arc::new(KubernetesSecretsReader {
88 secret_api: self.secret_api.clone(),
89 name_prefix: self.config.name_prefix(),
90 })
91 }
92}
93
94#[derive(Debug)]
96pub struct KubernetesSecretsReader {
97 secret_api: Api<Secret>,
98 name_prefix: String,
99}
100
101impl KubernetesSecretsReader {
102 pub async fn new(
107 context: String,
108 name_prefix: Option<String>,
109 ) -> Result<KubernetesSecretsReader, anyhow::Error> {
110 let (client, _) = util::create_client(context).await?;
111 let secret_api: Api<Secret> = Api::default_namespaced(client);
112 let name_prefix = name_prefix.clone().unwrap_or_default();
113 Ok(KubernetesSecretsReader {
114 secret_api,
115 name_prefix,
116 })
117 }
118}
119
120#[async_trait]
121impl SecretsReader for KubernetesSecretsReader {
122 async fn read(&self, id: CatalogItemId) -> Result<Vec<u8>, anyhow::Error> {
123 let secret = self
124 .secret_api
125 .get(&secret_name(id, &self.name_prefix))
126 .await?;
127 let mut data = secret
128 .data
129 .ok_or_else(|| anyhow!("internal error: secret missing data field"))?;
130 let contents = data
131 .remove("contents")
132 .ok_or_else(|| anyhow!("internal error: secret missing contents field"))?;
133 Ok(contents.0)
134 }
135}
136
137const SECRET_NAME_PREFIX: &str = "user-managed-";
138
139fn secret_name(id: CatalogItemId, name_prefix: &str) -> String {
140 format!("{name_prefix}{SECRET_NAME_PREFIX}{id}")
141}
142
143fn from_secret_name(name: &str, name_prefix: &str) -> Option<CatalogItemId> {
144 name.strip_prefix(&name_prefix)
145 .and_then(|name| name.strip_prefix(SECRET_NAME_PREFIX))
146 .and_then(|id| id.parse().ok())
147}