mz_cloud_resources/
crd.rs1use std::collections::BTreeMap;
13use std::time::Duration;
14
15use futures::future::join_all;
16use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
17use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
18use kube::{
19 Api, Client, Resource, ResourceExt,
20 api::{ObjectMeta, Patch, PatchParams},
21 core::crd::merge_crds,
22 runtime::{conditions, wait::await_condition},
23};
24use rand::{Rng, distr::Uniform};
25use schemars::JsonSchema;
26use serde::{Deserialize, Serialize};
27use tracing::{info, warn};
28
29use crate::crd::generated::cert_manager::certificates::{
30 CertificateIssuerRef, CertificatePrivateKeyAlgorithm, CertificateSecretTemplate,
31};
32use mz_ore::retry::Retry;
33
34pub mod balancer;
35pub mod console;
36pub mod generated;
37pub mod materialize;
38#[cfg(feature = "vpc-endpoints")]
39pub mod vpc_endpoint;
40
41#[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
45#[serde(rename_all = "camelCase")]
46pub struct MaterializeCertSpec {
47 pub dns_names: Option<Vec<String>>,
49 pub duration: Option<String>,
53 pub renew_before: Option<String>,
57 pub issuer_ref: Option<CertificateIssuerRef>,
59 pub secret_template: Option<CertificateSecretTemplate>,
61 #[serde(skip_serializing_if = "Option::is_none")]
63 pub private_key_algorithm: Option<CertificatePrivateKeyAlgorithm>,
64 #[serde(skip_serializing_if = "Option::is_none")]
66 pub private_key_size: Option<i64>,
67}
68
69pub trait ManagedResource: Resource<DynamicType = ()> + Sized {
70 fn default_labels(&self) -> BTreeMap<String, String> {
71 BTreeMap::new()
72 }
73
74 fn managed_resource_meta(&self, name: String) -> ObjectMeta {
75 ObjectMeta {
76 namespace: Some(self.meta().namespace.clone().unwrap()),
77 name: Some(name),
78 labels: Some(self.default_labels()),
79 owner_references: Some(vec![owner_reference(self)]),
80 ..Default::default()
81 }
82 }
83}
84
85fn owner_reference<T: Resource<DynamicType = ()>>(t: &T) -> OwnerReference {
86 OwnerReference {
87 api_version: T::api_version(&()).to_string(),
88 kind: T::kind(&()).to_string(),
89 name: t.name_unchecked(),
90 uid: t.uid().unwrap(),
91 block_owner_deletion: Some(true),
92 ..Default::default()
93 }
94}
95
96#[derive(Debug, Clone)]
97pub struct VersionedCrd {
98 pub crds: Vec<CustomResourceDefinition>,
99 pub stored_version: String,
100}
101
102pub async fn register_versioned_crds(
103 kube_client: Client,
104 versioned_crds: Vec<VersionedCrd>,
105 field_manager: &str,
106) -> Result<(), anyhow::Error> {
107 let crd_futures = versioned_crds
108 .into_iter()
109 .map(|versioned_crd| register_w_retry(kube_client.clone(), versioned_crd, field_manager));
110 for res in join_all(crd_futures).await {
111 if res.is_err() {
112 return res;
113 }
114 }
115 Ok(())
116}
117
118async fn register_w_retry(
119 kube_client: Client,
120 versioned_crds: VersionedCrd,
121 field_manager: &str,
122) -> Result<(), anyhow::Error> {
123 Retry::default()
124 .max_duration(Duration::from_secs(30))
125 .clamp_backoff(Duration::from_secs(5))
126 .retry_async(|_| async {
127 let res = register_custom_resource(
128 kube_client.clone(),
129 versioned_crds.clone(),
130 field_manager,
131 )
132 .await;
133 if let Err(err) = &res {
134 warn!(err = %err);
135 }
136 res
137 })
138 .await?;
139 Ok(())
140}
141
142async fn register_custom_resource(
145 kube_client: Client,
146 versioned_crds: VersionedCrd,
147 field_manager: &str,
148) -> Result<(), anyhow::Error> {
149 let crds = versioned_crds.crds;
150 let crd_name = format!("{}.{}", &crds[0].spec.names.plural, &crds[0].spec.group);
151 info!("Registering {} crd", &crd_name);
152 let crd_api = Api::<CustomResourceDefinition>::all(kube_client);
153 let crd = merge_crds(crds, &versioned_crds.stored_version).unwrap();
154 let crd_json = serde_json::to_string(&serde_json::json!(&crd))?;
155 info!(crd_json = %crd_json);
156 crd_api
157 .patch(
158 &crd_name,
159 &PatchParams::apply(field_manager).force(),
160 &Patch::Apply(crd),
161 )
162 .await?;
163 await_condition(crd_api, &crd_name, conditions::is_crd_established()).await?;
164 info!("Done registering {} crd", &crd_name);
165 Ok(())
166}
167
168pub fn new_resource_id() -> String {
169 const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
174 rand::rng()
175 .sample_iter(Uniform::new(0, CHARSET.len()).expect("valid range"))
176 .take(10)
177 .map(|i| char::from(CHARSET[i]))
178 .collect()
179}