mz_cloud_resources/
crd.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Kubernetes custom resources
11
12use std::collections::BTreeMap;
13use std::time::Duration;
14
15use futures::future::join_all;
16use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
17use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
18use kube::{
19    Api, Client, Resource, ResourceExt,
20    api::{ObjectMeta, Patch, PatchParams},
21    core::crd::merge_crds,
22    runtime::{conditions, wait::await_condition},
23};
24use rand::{Rng, distr::Uniform};
25use schemars::JsonSchema;
26use serde::{Deserialize, Serialize};
27use tracing::{info, warn};
28
29use crate::crd::generated::cert_manager::certificates::{
30    CertificateIssuerRef, CertificateSecretTemplate,
31};
32use mz_ore::retry::Retry;
33
34pub mod generated;
35pub mod materialize;
36#[cfg(feature = "vpc-endpoints")]
37pub mod vpc_endpoint;
38
39// This is intentionally a subset of the fields of a Certificate.
40// We do not want customers to configure options that may conflict with
41// things we override or expand in our code.
42#[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
43#[serde(rename_all = "camelCase")]
44pub struct MaterializeCertSpec {
45    /// Additional DNS names the certificate will be valid for.
46    pub dns_names: Option<Vec<String>>,
47    /// Duration the certificate will be requested for.
48    /// Value must be in units accepted by Go
49    /// [`time.ParseDuration`](https://golang.org/pkg/time/#ParseDuration).
50    pub duration: Option<String>,
51    /// Duration before expiration the certificate will be renewed.
52    /// Value must be in units accepted by Go
53    /// [`time.ParseDuration`](https://golang.org/pkg/time/#ParseDuration).
54    pub renew_before: Option<String>,
55    /// Reference to an `Issuer` or `ClusterIssuer` that will generate the certificate.
56    pub issuer_ref: Option<CertificateIssuerRef>,
57    /// Additional annotations and labels to include in the Certificate object.
58    pub secret_template: Option<CertificateSecretTemplate>,
59}
60
61pub trait ManagedResource: Resource<DynamicType = ()> + Sized {
62    fn default_labels(&self) -> BTreeMap<String, String> {
63        BTreeMap::new()
64    }
65
66    fn managed_resource_meta(&self, name: String) -> ObjectMeta {
67        ObjectMeta {
68            namespace: Some(self.meta().namespace.clone().unwrap()),
69            name: Some(name),
70            labels: Some(self.default_labels()),
71            owner_references: Some(vec![owner_reference(self)]),
72            ..Default::default()
73        }
74    }
75}
76
77fn owner_reference<T: Resource<DynamicType = ()>>(t: &T) -> OwnerReference {
78    OwnerReference {
79        api_version: T::api_version(&()).to_string(),
80        kind: T::kind(&()).to_string(),
81        name: t.name_unchecked(),
82        uid: t.uid().unwrap(),
83        block_owner_deletion: Some(true),
84        ..Default::default()
85    }
86}
87
88#[derive(Debug, Clone)]
89pub struct VersionedCrd {
90    pub crds: Vec<CustomResourceDefinition>,
91    pub stored_version: String,
92}
93
94pub async fn register_versioned_crds(
95    kube_client: Client,
96    versioned_crds: Vec<VersionedCrd>,
97    field_manager: &str,
98) -> Result<(), anyhow::Error> {
99    let crd_futures = versioned_crds
100        .into_iter()
101        .map(|versioned_crd| register_w_retry(kube_client.clone(), versioned_crd, field_manager));
102    for res in join_all(crd_futures).await {
103        if res.is_err() {
104            return res;
105        }
106    }
107    Ok(())
108}
109
110async fn register_w_retry(
111    kube_client: Client,
112    versioned_crds: VersionedCrd,
113    field_manager: &str,
114) -> Result<(), anyhow::Error> {
115    Retry::default()
116        .max_duration(Duration::from_secs(30))
117        .clamp_backoff(Duration::from_secs(5))
118        .retry_async(|_| async {
119            let res = register_custom_resource(
120                kube_client.clone(),
121                versioned_crds.clone(),
122                field_manager,
123            )
124            .await;
125            if let Err(err) = &res {
126                warn!(err = %err);
127            }
128            res
129        })
130        .await?;
131    Ok(())
132}
133
134/// Registers a custom resource with Kubernetes,
135/// the specification of which is automatically derived from the structs.
136async fn register_custom_resource(
137    kube_client: Client,
138    versioned_crds: VersionedCrd,
139    field_manager: &str,
140) -> Result<(), anyhow::Error> {
141    let crds = versioned_crds.crds;
142    let crd_name = format!("{}.{}", &crds[0].spec.names.plural, &crds[0].spec.group);
143    info!("Registering {} crd", &crd_name);
144    let crd_api = Api::<CustomResourceDefinition>::all(kube_client);
145    let crd = merge_crds(crds, &versioned_crds.stored_version).unwrap();
146    let crd_json = serde_json::to_string(&serde_json::json!(&crd))?;
147    info!(crd_json = %crd_json);
148    crd_api
149        .patch(
150            &crd_name,
151            &PatchParams::apply(field_manager).force(),
152            &Patch::Apply(crd),
153        )
154        .await?;
155    await_condition(crd_api, &crd_name, conditions::is_crd_established()).await?;
156    info!("Done registering {} crd", &crd_name);
157    Ok(())
158}
159
160pub fn new_resource_id() -> String {
161    // DNS-1035 names are supposed to be case insensitive,
162    // so we define our own character set, rather than use the
163    // built-in Alphanumeric distribution from rand, which
164    // includes both upper and lowercase letters.
165    const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
166    rand::rng()
167        .sample_iter(Uniform::new(0, CHARSET.len()).expect("valid range"))
168        .take(10)
169        .map(|i| char::from(CHARSET[i]))
170        .collect()
171}