use std::iter;
use std::sync::Arc;
use anyhow::anyhow;
use async_trait::async_trait;
use k8s_openapi::api::core::v1::Secret;
use k8s_openapi::ByteString;
use kube::api::{DeleteParams, ListParams, ObjectMeta, Patch, PatchParams};
use kube::Api;
use mz_repr::GlobalId;
use mz_secrets::{SecretsController, SecretsReader};
use crate::{util, KubernetesOrchestrator, FIELD_MANAGER};
#[async_trait]
impl SecretsController for KubernetesOrchestrator {
async fn ensure(&self, id: GlobalId, contents: &[u8]) -> Result<(), anyhow::Error> {
let name = secret_name(id);
let data = iter::once(("contents".into(), ByteString(contents.into())));
let secret = Secret {
metadata: ObjectMeta {
name: Some(name.clone()),
..Default::default()
},
data: Some(data.collect()),
..Default::default()
};
self.secret_api
.patch(
&name,
&PatchParams::apply(FIELD_MANAGER).force(),
&Patch::Apply(secret),
)
.await?;
Ok(())
}
async fn delete(&self, id: GlobalId) -> Result<(), anyhow::Error> {
match self
.secret_api
.delete(&secret_name(id), &DeleteParams::default())
.await
{
Ok(_) => Ok(()),
Err(kube::Error::Api(e)) if e.code == 404 => Ok(()),
Err(e) => return Err(e.into()),
}
}
async fn list(&self) -> Result<Vec<GlobalId>, anyhow::Error> {
let objs = self.secret_api.list(&ListParams::default()).await?;
let mut ids = Vec::new();
for item in objs.items {
let Some(name) = item.metadata.name else {
continue;
};
let Some(id) = from_secret_name(&name) else {
continue;
};
ids.push(id);
}
Ok(ids)
}
fn reader(&self) -> Arc<dyn SecretsReader> {
Arc::new(KubernetesSecretsReader {
secret_api: self.secret_api.clone(),
})
}
}
#[derive(Debug)]
pub struct KubernetesSecretsReader {
secret_api: Api<Secret>,
}
impl KubernetesSecretsReader {
pub async fn new(context: String) -> Result<KubernetesSecretsReader, anyhow::Error> {
let (client, _) = util::create_client(context).await?;
let secret_api: Api<Secret> = Api::default_namespaced(client);
Ok(KubernetesSecretsReader { secret_api })
}
}
#[async_trait]
impl SecretsReader for KubernetesSecretsReader {
async fn read(&self, id: GlobalId) -> Result<Vec<u8>, anyhow::Error> {
let secret = self.secret_api.get(&secret_name(id)).await?;
let mut data = secret
.data
.ok_or_else(|| anyhow!("internal error: secret missing data field"))?;
let contents = data
.remove("contents")
.ok_or_else(|| anyhow!("internal error: secret missing contents field"))?;
Ok(contents.0)
}
}
const SECRET_NAME_PREFIX: &str = "user-managed-";
fn secret_name(id: GlobalId) -> String {
format!("{SECRET_NAME_PREFIX}{id}")
}
fn from_secret_name(name: &str) -> Option<GlobalId> {
name.strip_prefix(SECRET_NAME_PREFIX)
.and_then(|id| id.parse().ok())
}