// Copyright Materialize, Inc. and contributors. All rights reserved.
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! Management of user secrets via Kubernetes.
use std::iter;
use std::sync::Arc;
use anyhow::anyhow;
use async_trait::async_trait;
use k8s_openapi::api::core::v1::Secret;
use k8s_openapi::ByteString;
use kube::api::{DeleteParams, ListParams, ObjectMeta, Patch, PatchParams};
use kube::Api;
use mz_repr::CatalogItemId;
use mz_secrets::{SecretsController, SecretsReader};
use crate::{util, KubernetesOrchestrator, FIELD_MANAGER};
impl SecretsController for KubernetesOrchestrator {
async fn ensure(&self, id: CatalogItemId, contents: &[u8]) -> Result<(), anyhow::Error> {
let name = secret_name(id, &self.config.name_prefix());
let data = iter::once(("contents".into(), ByteString(contents.into())));
let secret = Secret {
metadata: ObjectMeta {
name: Some(name.clone()),
data: Some(data.collect()),
async fn delete(&self, id: CatalogItemId) -> Result<(), anyhow::Error> {
// We intentionally don't wait for the secret to be deleted; our
// obligation is only to initiate the deletion. Garbage collecting
// secrets that fail to delete will be the responsibility of a future
// garbage collection task.
match self
&secret_name(id, &self.config.name_prefix()),
Ok(_) => Ok(()),
// Secret is already deleted.
Err(kube::Error::Api(e)) if e.code == 404 => Ok(()),
Err(e) => return Err(e.into()),
async fn list(&self) -> Result<Vec<CatalogItemId>, anyhow::Error> {
let objs = self.secret_api.list(&ListParams::default()).await?;
let mut ids = Vec::new();
for item in objs.items {
// Ignore unnamed objects.
let Some(name) = item.metadata.name else {
// Ignore invalidly named objects.
let Some(id) = from_secret_name(&name, &self.config.name_prefix()) else {
fn reader(&self) -> Arc<dyn SecretsReader> {
Arc::new(KubernetesSecretsReader {
secret_api: self.secret_api.clone(),
name_prefix: self.config.name_prefix(),
/// Reads secrets managed by a [`KubernetesOrchestrator`].
pub struct KubernetesSecretsReader {
secret_api: Api<Secret>,
name_prefix: String,
impl KubernetesSecretsReader {
/// Constructs a new Kubernetes secrets reader.
/// The `context` parameter works like
/// [`KubernetesOrchestratorConfig::context`](crate::KubernetesOrchestratorConfig::context).
pub async fn new(
context: String,
name_prefix: Option<String>,
) -> Result<KubernetesSecretsReader, anyhow::Error> {
let (client, _) = util::create_client(context).await?;
let secret_api: Api<Secret> = Api::default_namespaced(client);
let name_prefix = name_prefix.clone().unwrap_or_default();
Ok(KubernetesSecretsReader {
impl SecretsReader for KubernetesSecretsReader {
async fn read(&self, id: CatalogItemId) -> Result<Vec<u8>, anyhow::Error> {
let secret = self
.get(&secret_name(id, &self.name_prefix))
let mut data = secret
.ok_or_else(|| anyhow!("internal error: secret missing data field"))?;
let contents = data
.ok_or_else(|| anyhow!("internal error: secret missing contents field"))?;
const SECRET_NAME_PREFIX: &str = "user-managed-";
fn secret_name(id: CatalogItemId, name_prefix: &str) -> String {
fn from_secret_name(name: &str, name_prefix: &str) -> Option<CatalogItemId> {
.and_then(|name| name.strip_prefix(SECRET_NAME_PREFIX))
.and_then(|id| id.parse().ok())