mz_cloud_resources/
crd.rs
1use std::time::Duration;
13
14use futures::future::join_all;
15use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
16use kube::api::{Patch, PatchParams};
17use kube::{Api, Client};
18use kube::{
19 core::crd::merge_crds,
20 runtime::{conditions, wait::await_condition},
21};
22use tracing::{info, warn};
23
24use mz_ore::retry::Retry;
25
26pub mod generated;
27pub mod materialize;
28#[cfg(feature = "vpc-endpoints")]
29pub mod vpc_endpoint;
30
31#[derive(Debug, Clone)]
32pub struct VersionedCrd {
33 pub crds: Vec<CustomResourceDefinition>,
34 pub stored_version: String,
35}
36
37pub async fn register_versioned_crds(
38 kube_client: Client,
39 versioned_crds: Vec<VersionedCrd>,
40 field_manager: &str,
41) -> Result<(), anyhow::Error> {
42 let crd_futures = versioned_crds
43 .into_iter()
44 .map(|versioned_crd| register_w_retry(kube_client.clone(), versioned_crd, field_manager));
45 for res in join_all(crd_futures).await {
46 if res.is_err() {
47 return res;
48 }
49 }
50 Ok(())
51}
52
53async fn register_w_retry(
54 kube_client: Client,
55 versioned_crds: VersionedCrd,
56 field_manager: &str,
57) -> Result<(), anyhow::Error> {
58 Retry::default()
59 .max_duration(Duration::from_secs(30))
60 .clamp_backoff(Duration::from_secs(5))
61 .retry_async(|_| async {
62 let res = register_custom_resource(
63 kube_client.clone(),
64 versioned_crds.clone(),
65 field_manager,
66 )
67 .await;
68 if let Err(err) = &res {
69 warn!(err = %err);
70 }
71 res
72 })
73 .await?;
74 Ok(())
75}
76
77async fn register_custom_resource(
80 kube_client: Client,
81 versioned_crds: VersionedCrd,
82 field_manager: &str,
83) -> Result<(), anyhow::Error> {
84 let crds = versioned_crds.crds;
85 let crd_name = format!("{}.{}", &crds[0].spec.names.plural, &crds[0].spec.group);
86 info!("Registering {} crd", &crd_name);
87 let crd_api = Api::<CustomResourceDefinition>::all(kube_client);
88 let crd = merge_crds(crds, &versioned_crds.stored_version).unwrap();
89 let crd_json = serde_json::to_string(&serde_json::json!(&crd))?;
90 info!(crd_json = %crd_json);
91 crd_api
92 .patch(
93 &crd_name,
94 &PatchParams::apply(field_manager).force(),
95 &Patch::Apply(crd),
96 )
97 .await?;
98 await_condition(crd_api, &crd_name, conditions::is_crd_established()).await?;
99 info!("Done registering {} crd", &crd_name);
100 Ok(())
101}