mz_cloud_resources/
crd.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Kubernetes custom resources
11
12use std::collections::BTreeMap;
13use std::time::Duration;
14
15use futures::future::join_all;
16use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
17use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
18use kube::{
19    Api, Client, Resource, ResourceExt,
20    api::{ObjectMeta, Patch, PatchParams},
21    core::crd::merge_crds,
22    runtime::{conditions, wait::await_condition},
23};
24use rand::{Rng, distr::Uniform};
25use schemars::JsonSchema;
26use serde::{Deserialize, Serialize};
27use tracing::{info, warn};
28
29use crate::crd::generated::cert_manager::certificates::{
30    CertificateIssuerRef, CertificateSecretTemplate,
31};
32use mz_ore::retry::Retry;
33
34pub mod balancer;
35pub mod console;
36pub mod generated;
37pub mod materialize;
38#[cfg(feature = "vpc-endpoints")]
39pub mod vpc_endpoint;
40
41// This is intentionally a subset of the fields of a Certificate.
42// We do not want customers to configure options that may conflict with
43// things we override or expand in our code.
44#[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
45#[serde(rename_all = "camelCase")]
46pub struct MaterializeCertSpec {
47    /// Additional DNS names the certificate will be valid for.
48    pub dns_names: Option<Vec<String>>,
49    /// Duration the certificate will be requested for.
50    /// Value must be in units accepted by Go
51    /// [`time.ParseDuration`](https://golang.org/pkg/time/#ParseDuration).
52    pub duration: Option<String>,
53    /// Duration before expiration the certificate will be renewed.
54    /// Value must be in units accepted by Go
55    /// [`time.ParseDuration`](https://golang.org/pkg/time/#ParseDuration).
56    pub renew_before: Option<String>,
57    /// Reference to an `Issuer` or `ClusterIssuer` that will generate the certificate.
58    pub issuer_ref: Option<CertificateIssuerRef>,
59    /// Additional annotations and labels to include in the Certificate object.
60    pub secret_template: Option<CertificateSecretTemplate>,
61}
62
63pub trait ManagedResource: Resource<DynamicType = ()> + Sized {
64    fn default_labels(&self) -> BTreeMap<String, String> {
65        BTreeMap::new()
66    }
67
68    fn managed_resource_meta(&self, name: String) -> ObjectMeta {
69        ObjectMeta {
70            namespace: Some(self.meta().namespace.clone().unwrap()),
71            name: Some(name),
72            labels: Some(self.default_labels()),
73            owner_references: Some(vec![owner_reference(self)]),
74            ..Default::default()
75        }
76    }
77}
78
79fn owner_reference<T: Resource<DynamicType = ()>>(t: &T) -> OwnerReference {
80    OwnerReference {
81        api_version: T::api_version(&()).to_string(),
82        kind: T::kind(&()).to_string(),
83        name: t.name_unchecked(),
84        uid: t.uid().unwrap(),
85        block_owner_deletion: Some(true),
86        ..Default::default()
87    }
88}
89
90#[derive(Debug, Clone)]
91pub struct VersionedCrd {
92    pub crds: Vec<CustomResourceDefinition>,
93    pub stored_version: String,
94}
95
96pub async fn register_versioned_crds(
97    kube_client: Client,
98    versioned_crds: Vec<VersionedCrd>,
99    field_manager: &str,
100) -> Result<(), anyhow::Error> {
101    let crd_futures = versioned_crds
102        .into_iter()
103        .map(|versioned_crd| register_w_retry(kube_client.clone(), versioned_crd, field_manager));
104    for res in join_all(crd_futures).await {
105        if res.is_err() {
106            return res;
107        }
108    }
109    Ok(())
110}
111
112async fn register_w_retry(
113    kube_client: Client,
114    versioned_crds: VersionedCrd,
115    field_manager: &str,
116) -> Result<(), anyhow::Error> {
117    Retry::default()
118        .max_duration(Duration::from_secs(30))
119        .clamp_backoff(Duration::from_secs(5))
120        .retry_async(|_| async {
121            let res = register_custom_resource(
122                kube_client.clone(),
123                versioned_crds.clone(),
124                field_manager,
125            )
126            .await;
127            if let Err(err) = &res {
128                warn!(err = %err);
129            }
130            res
131        })
132        .await?;
133    Ok(())
134}
135
136/// Registers a custom resource with Kubernetes,
137/// the specification of which is automatically derived from the structs.
138async fn register_custom_resource(
139    kube_client: Client,
140    versioned_crds: VersionedCrd,
141    field_manager: &str,
142) -> Result<(), anyhow::Error> {
143    let crds = versioned_crds.crds;
144    let crd_name = format!("{}.{}", &crds[0].spec.names.plural, &crds[0].spec.group);
145    info!("Registering {} crd", &crd_name);
146    let crd_api = Api::<CustomResourceDefinition>::all(kube_client);
147    let crd = merge_crds(crds, &versioned_crds.stored_version).unwrap();
148    let crd_json = serde_json::to_string(&serde_json::json!(&crd))?;
149    info!(crd_json = %crd_json);
150    crd_api
151        .patch(
152            &crd_name,
153            &PatchParams::apply(field_manager).force(),
154            &Patch::Apply(crd),
155        )
156        .await?;
157    await_condition(crd_api, &crd_name, conditions::is_crd_established()).await?;
158    info!("Done registering {} crd", &crd_name);
159    Ok(())
160}
161
162pub fn new_resource_id() -> String {
163    // DNS-1035 names are supposed to be case insensitive,
164    // so we define our own character set, rather than use the
165    // built-in Alphanumeric distribution from rand, which
166    // includes both upper and lowercase letters.
167    const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
168    rand::rng()
169        .sample_iter(Uniform::new(0, CHARSET.len()).expect("valid range"))
170        .take(10)
171        .map(|i| char::from(CHARSET[i]))
172        .collect()
173}