Skip to main content

mz_cloud_resources/
crd.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Kubernetes custom resources
11
12use std::collections::BTreeMap;
13use std::time::Duration;
14
15use futures::future::join_all;
16use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::{
17    CustomResourceConversion, CustomResourceDefinition,
18};
19use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
20use kube::{
21    Api, Client, Resource, ResourceExt,
22    api::{ObjectMeta, Patch, PatchParams},
23    core::crd::merge_crds,
24    runtime::{conditions, wait::await_condition},
25};
26use rand::{Rng, distr::Uniform};
27use schemars::JsonSchema;
28use serde::{Deserialize, Serialize};
29use tracing::{info, warn};
30
31use crate::crd::generated::cert_manager::certificates::{
32    CertificateIssuerRef, CertificatePrivateKeyAlgorithm, CertificateSecretTemplate,
33};
34use mz_ore::retry::Retry;
35
36pub mod balancer;
37pub mod console;
38pub mod generated;
39pub mod materialize;
40#[cfg(feature = "vpc-endpoints")]
41pub mod vpc_endpoint;
42
43// This is intentionally a subset of the fields of a Certificate.
44// We do not want customers to configure options that may conflict with
45// things we override or expand in our code.
46#[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
47#[serde(rename_all = "camelCase")]
48pub struct MaterializeCertSpec {
49    /// Additional DNS names the certificate will be valid for.
50    pub dns_names: Option<Vec<String>>,
51    /// Duration the certificate will be requested for.
52    /// Value must be in units accepted by Go
53    /// [`time.ParseDuration`](https://golang.org/pkg/time/#ParseDuration).
54    pub duration: Option<String>,
55    /// Duration before expiration the certificate will be renewed.
56    /// Value must be in units accepted by Go
57    /// [`time.ParseDuration`](https://golang.org/pkg/time/#ParseDuration).
58    pub renew_before: Option<String>,
59    /// Reference to an `Issuer` or `ClusterIssuer` that will generate the certificate.
60    pub issuer_ref: Option<CertificateIssuerRef>,
61    /// Additional annotations and labels to include in the Certificate object.
62    pub secret_template: Option<CertificateSecretTemplate>,
63    /// Optional algorithm to use for the private key. If not specified, a recommended default will be chosen.
64    #[serde(skip_serializing_if = "Option::is_none")]
65    pub private_key_algorithm: Option<CertificatePrivateKeyAlgorithm>,
66    /// Optional size for the private key.
67    #[serde(skip_serializing_if = "Option::is_none")]
68    pub private_key_size: Option<i64>,
69}
70
71pub trait ManagedResource: Resource<DynamicType = ()> + Sized {
72    fn default_labels(&self) -> BTreeMap<String, String> {
73        BTreeMap::new()
74    }
75
76    fn app_name(&self) -> Option<&str> {
77        None
78    }
79
80    fn managed_resource_meta(&self, name: String) -> ObjectMeta {
81        let mut labels = self.default_labels();
82        if let Some(app) = self.app_name() {
83            labels.insert("app.kubernetes.io/name".to_owned(), app.to_owned());
84        }
85        ObjectMeta {
86            namespace: Some(self.meta().namespace.clone().unwrap()),
87            name: Some(name),
88            labels: Some(labels),
89            owner_references: Some(vec![owner_reference(self)]),
90            ..Default::default()
91        }
92    }
93}
94
95fn owner_reference<T: Resource<DynamicType = ()>>(t: &T) -> OwnerReference {
96    OwnerReference {
97        api_version: T::api_version(&()).to_string(),
98        kind: T::kind(&()).to_string(),
99        name: t.name_unchecked(),
100        uid: t.uid().unwrap(),
101        block_owner_deletion: Some(true),
102        ..Default::default()
103    }
104}
105
106#[derive(Debug, Clone)]
107pub struct VersionedCrd {
108    pub crds: Vec<CustomResourceDefinition>,
109    pub stored_version: String,
110    /// Conversion configuration to apply after merging CRDs.
111    /// `merge_crds` drops the conversion field, so we must set it after merging.
112    pub conversion: Option<CustomResourceConversion>,
113}
114
115pub async fn register_versioned_crds(
116    kube_client: Client,
117    versioned_crds: Vec<VersionedCrd>,
118    field_manager: &str,
119) -> Result<(), anyhow::Error> {
120    let crd_futures = versioned_crds
121        .into_iter()
122        .map(|versioned_crd| register_w_retry(kube_client.clone(), versioned_crd, field_manager));
123    for res in join_all(crd_futures).await {
124        if res.is_err() {
125            return res;
126        }
127    }
128    Ok(())
129}
130
131async fn register_w_retry(
132    kube_client: Client,
133    versioned_crds: VersionedCrd,
134    field_manager: &str,
135) -> Result<(), anyhow::Error> {
136    Retry::default()
137        .max_duration(Duration::from_secs(30))
138        .clamp_backoff(Duration::from_secs(5))
139        .retry_async(|_| async {
140            let res = register_custom_resource(
141                kube_client.clone(),
142                versioned_crds.clone(),
143                field_manager,
144            )
145            .await;
146            if let Err(err) = &res {
147                warn!(err = %err);
148            }
149            res
150        })
151        .await?;
152    Ok(())
153}
154
155/// Registers a custom resource with Kubernetes,
156/// the specification of which is automatically derived from the structs.
157async fn register_custom_resource(
158    kube_client: Client,
159    versioned_crds: VersionedCrd,
160    field_manager: &str,
161) -> Result<(), anyhow::Error> {
162    let crds = versioned_crds.crds;
163    let crd_name = format!("{}.{}", &crds[0].spec.names.plural, &crds[0].spec.group);
164    info!("Registering {} crd", &crd_name);
165    let crd_api = Api::<CustomResourceDefinition>::all(kube_client);
166    let mut crd = merge_crds(crds, &versioned_crds.stored_version).unwrap();
167    if let Some(conversion) = versioned_crds.conversion {
168        crd.spec.conversion = Some(conversion);
169    }
170    let crd_json = serde_json::to_string(&serde_json::json!(&crd))?;
171    info!(crd_json = %crd_json);
172    crd_api
173        .patch(
174            &crd_name,
175            &PatchParams::apply(field_manager).force(),
176            &Patch::Apply(crd),
177        )
178        .await?;
179    await_condition(crd_api, &crd_name, conditions::is_crd_established()).await?;
180    info!("Done registering {} crd", &crd_name);
181    Ok(())
182}
183
184pub fn new_resource_id() -> String {
185    // DNS-1035 names are supposed to be case insensitive,
186    // so we define our own character set, rather than use the
187    // built-in Alphanumeric distribution from rand, which
188    // includes both upper and lowercase letters.
189    const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
190    rand::rng()
191        .sample_iter(Uniform::new(0, CHARSET.len()).expect("valid range"))
192        .take(10)
193        .map(|i| char::from(CHARSET[i]))
194        .collect()
195}