Skip to main content

mz_cloud_resources/
crd.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Kubernetes custom resources
11
12use std::collections::BTreeMap;
13use std::time::Duration;
14
15use futures::future::join_all;
16use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
17use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
18use kube::{
19    Api, Client, Resource, ResourceExt,
20    api::{ObjectMeta, Patch, PatchParams},
21    core::crd::merge_crds,
22    runtime::{conditions, wait::await_condition},
23};
24use rand::{Rng, distr::Uniform};
25use schemars::JsonSchema;
26use serde::{Deserialize, Serialize};
27use tracing::{info, warn};
28
29use crate::crd::generated::cert_manager::certificates::{
30    CertificateIssuerRef, CertificatePrivateKeyAlgorithm, CertificateSecretTemplate,
31};
32use mz_ore::retry::Retry;
33
34pub mod balancer;
35pub mod console;
36pub mod generated;
37pub mod materialize;
38#[cfg(feature = "vpc-endpoints")]
39pub mod vpc_endpoint;
40
41// This is intentionally a subset of the fields of a Certificate.
42// We do not want customers to configure options that may conflict with
43// things we override or expand in our code.
44#[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
45#[serde(rename_all = "camelCase")]
46pub struct MaterializeCertSpec {
47    /// Additional DNS names the certificate will be valid for.
48    pub dns_names: Option<Vec<String>>,
49    /// Duration the certificate will be requested for.
50    /// Value must be in units accepted by Go
51    /// [`time.ParseDuration`](https://golang.org/pkg/time/#ParseDuration).
52    pub duration: Option<String>,
53    /// Duration before expiration the certificate will be renewed.
54    /// Value must be in units accepted by Go
55    /// [`time.ParseDuration`](https://golang.org/pkg/time/#ParseDuration).
56    pub renew_before: Option<String>,
57    /// Reference to an `Issuer` or `ClusterIssuer` that will generate the certificate.
58    pub issuer_ref: Option<CertificateIssuerRef>,
59    /// Additional annotations and labels to include in the Certificate object.
60    pub secret_template: Option<CertificateSecretTemplate>,
61    /// Optional algorithm to use for the private key. If not specified, a recommended default will be chosen.
62    #[serde(skip_serializing_if = "Option::is_none")]
63    pub private_key_algorithm: Option<CertificatePrivateKeyAlgorithm>,
64    /// Optional size for the private key.
65    #[serde(skip_serializing_if = "Option::is_none")]
66    pub private_key_size: Option<i64>,
67}
68
69pub trait ManagedResource: Resource<DynamicType = ()> + Sized {
70    fn default_labels(&self) -> BTreeMap<String, String> {
71        BTreeMap::new()
72    }
73
74    fn app_name(&self) -> Option<&str> {
75        None
76    }
77
78    fn managed_resource_meta(&self, name: String) -> ObjectMeta {
79        let mut labels = self.default_labels();
80        if let Some(app) = self.app_name() {
81            labels.insert("app.kubernetes.io/name".to_owned(), app.to_owned());
82        }
83        ObjectMeta {
84            namespace: Some(self.meta().namespace.clone().unwrap()),
85            name: Some(name),
86            labels: Some(labels),
87            owner_references: Some(vec![owner_reference(self)]),
88            ..Default::default()
89        }
90    }
91}
92
93fn owner_reference<T: Resource<DynamicType = ()>>(t: &T) -> OwnerReference {
94    OwnerReference {
95        api_version: T::api_version(&()).to_string(),
96        kind: T::kind(&()).to_string(),
97        name: t.name_unchecked(),
98        uid: t.uid().unwrap(),
99        block_owner_deletion: Some(true),
100        ..Default::default()
101    }
102}
103
104#[derive(Debug, Clone)]
105pub struct VersionedCrd {
106    pub crds: Vec<CustomResourceDefinition>,
107    pub stored_version: String,
108}
109
110pub async fn register_versioned_crds(
111    kube_client: Client,
112    versioned_crds: Vec<VersionedCrd>,
113    field_manager: &str,
114) -> Result<(), anyhow::Error> {
115    let crd_futures = versioned_crds
116        .into_iter()
117        .map(|versioned_crd| register_w_retry(kube_client.clone(), versioned_crd, field_manager));
118    for res in join_all(crd_futures).await {
119        if res.is_err() {
120            return res;
121        }
122    }
123    Ok(())
124}
125
126async fn register_w_retry(
127    kube_client: Client,
128    versioned_crds: VersionedCrd,
129    field_manager: &str,
130) -> Result<(), anyhow::Error> {
131    Retry::default()
132        .max_duration(Duration::from_secs(30))
133        .clamp_backoff(Duration::from_secs(5))
134        .retry_async(|_| async {
135            let res = register_custom_resource(
136                kube_client.clone(),
137                versioned_crds.clone(),
138                field_manager,
139            )
140            .await;
141            if let Err(err) = &res {
142                warn!(err = %err);
143            }
144            res
145        })
146        .await?;
147    Ok(())
148}
149
150/// Registers a custom resource with Kubernetes,
151/// the specification of which is automatically derived from the structs.
152async fn register_custom_resource(
153    kube_client: Client,
154    versioned_crds: VersionedCrd,
155    field_manager: &str,
156) -> Result<(), anyhow::Error> {
157    let crds = versioned_crds.crds;
158    let crd_name = format!("{}.{}", &crds[0].spec.names.plural, &crds[0].spec.group);
159    info!("Registering {} crd", &crd_name);
160    let crd_api = Api::<CustomResourceDefinition>::all(kube_client);
161    let crd = merge_crds(crds, &versioned_crds.stored_version).unwrap();
162    let crd_json = serde_json::to_string(&serde_json::json!(&crd))?;
163    info!(crd_json = %crd_json);
164    crd_api
165        .patch(
166            &crd_name,
167            &PatchParams::apply(field_manager).force(),
168            &Patch::Apply(crd),
169        )
170        .await?;
171    await_condition(crd_api, &crd_name, conditions::is_crd_established()).await?;
172    info!("Done registering {} crd", &crd_name);
173    Ok(())
174}
175
176pub fn new_resource_id() -> String {
177    // DNS-1035 names are supposed to be case insensitive,
178    // so we define our own character set, rather than use the
179    // built-in Alphanumeric distribution from rand, which
180    // includes both upper and lowercase letters.
181    const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
182    rand::rng()
183        .sample_iter(Uniform::new(0, CHARSET.len()).expect("valid range"))
184        .take(10)
185        .map(|i| char::from(CHARSET[i]))
186        .collect()
187}