mz_cloud_resources/
crd.rs1use std::collections::BTreeMap;
13use std::time::Duration;
14
15use futures::future::join_all;
16use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
17use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
18use kube::{
19 Api, Client, Resource, ResourceExt,
20 api::{ObjectMeta, Patch, PatchParams},
21 core::crd::merge_crds,
22 runtime::{conditions, wait::await_condition},
23};
24use rand::{Rng, distr::Uniform};
25use schemars::JsonSchema;
26use serde::{Deserialize, Serialize};
27use tracing::{info, warn};
28
29use crate::crd::generated::cert_manager::certificates::{
30 CertificateIssuerRef, CertificateSecretTemplate,
31};
32use mz_ore::retry::Retry;
33
34pub mod generated;
35pub mod materialize;
36#[cfg(feature = "vpc-endpoints")]
37pub mod vpc_endpoint;
38
39#[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
43#[serde(rename_all = "camelCase")]
44pub struct MaterializeCertSpec {
45 pub dns_names: Option<Vec<String>>,
47 pub duration: Option<String>,
51 pub renew_before: Option<String>,
55 pub issuer_ref: Option<CertificateIssuerRef>,
57 pub secret_template: Option<CertificateSecretTemplate>,
59}
60
61pub trait ManagedResource: Resource<DynamicType = ()> + Sized {
62 fn default_labels(&self) -> BTreeMap<String, String> {
63 BTreeMap::new()
64 }
65
66 fn managed_resource_meta(&self, name: String) -> ObjectMeta {
67 ObjectMeta {
68 namespace: Some(self.meta().namespace.clone().unwrap()),
69 name: Some(name),
70 labels: Some(self.default_labels()),
71 owner_references: Some(vec![owner_reference(self)]),
72 ..Default::default()
73 }
74 }
75}
76
77fn owner_reference<T: Resource<DynamicType = ()>>(t: &T) -> OwnerReference {
78 OwnerReference {
79 api_version: T::api_version(&()).to_string(),
80 kind: T::kind(&()).to_string(),
81 name: t.name_unchecked(),
82 uid: t.uid().unwrap(),
83 block_owner_deletion: Some(true),
84 ..Default::default()
85 }
86}
87
88#[derive(Debug, Clone)]
89pub struct VersionedCrd {
90 pub crds: Vec<CustomResourceDefinition>,
91 pub stored_version: String,
92}
93
94pub async fn register_versioned_crds(
95 kube_client: Client,
96 versioned_crds: Vec<VersionedCrd>,
97 field_manager: &str,
98) -> Result<(), anyhow::Error> {
99 let crd_futures = versioned_crds
100 .into_iter()
101 .map(|versioned_crd| register_w_retry(kube_client.clone(), versioned_crd, field_manager));
102 for res in join_all(crd_futures).await {
103 if res.is_err() {
104 return res;
105 }
106 }
107 Ok(())
108}
109
110async fn register_w_retry(
111 kube_client: Client,
112 versioned_crds: VersionedCrd,
113 field_manager: &str,
114) -> Result<(), anyhow::Error> {
115 Retry::default()
116 .max_duration(Duration::from_secs(30))
117 .clamp_backoff(Duration::from_secs(5))
118 .retry_async(|_| async {
119 let res = register_custom_resource(
120 kube_client.clone(),
121 versioned_crds.clone(),
122 field_manager,
123 )
124 .await;
125 if let Err(err) = &res {
126 warn!(err = %err);
127 }
128 res
129 })
130 .await?;
131 Ok(())
132}
133
134async fn register_custom_resource(
137 kube_client: Client,
138 versioned_crds: VersionedCrd,
139 field_manager: &str,
140) -> Result<(), anyhow::Error> {
141 let crds = versioned_crds.crds;
142 let crd_name = format!("{}.{}", &crds[0].spec.names.plural, &crds[0].spec.group);
143 info!("Registering {} crd", &crd_name);
144 let crd_api = Api::<CustomResourceDefinition>::all(kube_client);
145 let crd = merge_crds(crds, &versioned_crds.stored_version).unwrap();
146 let crd_json = serde_json::to_string(&serde_json::json!(&crd))?;
147 info!(crd_json = %crd_json);
148 crd_api
149 .patch(
150 &crd_name,
151 &PatchParams::apply(field_manager).force(),
152 &Patch::Apply(crd),
153 )
154 .await?;
155 await_condition(crd_api, &crd_name, conditions::is_crd_established()).await?;
156 info!("Done registering {} crd", &crd_name);
157 Ok(())
158}
159
160pub fn new_resource_id() -> String {
161 const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
166 rand::rng()
167 .sample_iter(Uniform::new(0, CHARSET.len()).expect("valid range"))
168 .take(10)
169 .map(|i| char::from(CHARSET[i]))
170 .collect()
171}