1use std::time::Duration;
11
12use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceColumnDefinition;
13use kube::{
14 Api, Client, CustomResourceExt, Resource, ResourceExt,
15 api::{DeleteParams, Patch, PatchParams},
16};
17use mz_cloud_resources::crd::{self, VersionedCrd, register_versioned_crds};
18use serde::{Serialize, de::DeserializeOwned};
19use tracing::info;
20
21const FIELD_MANAGER: &str = "orchestratord.materialize.cloud";
22
23pub async fn get_resource<K>(api: &Api<K>, name: &str) -> Result<Option<K>, anyhow::Error>
24where
25 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
26 <K as Resource>::DynamicType: Default,
27{
28 match api.get(name).await {
29 Ok(k) => Ok(Some(k)),
30 Err(kube::Error::Api(e)) if e.code == 404 => Ok(None),
31 Err(e) => Err(e.into()),
32 }
33}
34
35pub async fn apply_resource<K>(api: &Api<K>, resource: &K) -> Result<(), anyhow::Error>
36where
37 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
38 <K as Resource>::DynamicType: Default,
39{
40 api.patch(
41 &resource.name_unchecked(),
42 &PatchParams::apply(FIELD_MANAGER).force(),
43 &Patch::Apply(resource),
44 )
45 .await?;
46 Ok(())
47}
48
49pub async fn delete_resource<K>(api: &Api<K>, name: &str) -> Result<(), anyhow::Error>
50where
51 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
52 <K as Resource>::DynamicType: Default,
53{
54 match kube::runtime::wait::delete::delete_and_finalize(
55 api.clone(),
56 name,
57 &DeleteParams::foreground(),
58 )
59 .await
60 {
61 Ok(_) => Ok(()),
62 Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e))) if e.code == 404 => {
63 Ok(())
65 }
66 Err(e) => Err(e.into()),
67 }
68}
69
70pub async fn register_crds(
71 client: Client,
72 additional_crd_columns: Vec<CustomResourceColumnDefinition>,
73) -> Result<(), anyhow::Error> {
74 let mut mz_crd = crd::materialize::v1alpha1::Materialize::crd();
75 let default_columns = mz_crd.spec.versions[0]
76 .additional_printer_columns
77 .take()
78 .expect("should contain ImageRef and UpToDate columns");
79 mz_crd.spec.versions[0].additional_printer_columns = Some(
80 additional_crd_columns
81 .into_iter()
82 .chain(default_columns)
83 .collect(),
84 );
85 tokio::time::timeout(
86 Duration::from_secs(120),
87 register_versioned_crds(
88 client.clone(),
89 vec![
90 VersionedCrd {
91 crds: vec![mz_crd],
92 stored_version: String::from("v1alpha1"),
93 },
94 VersionedCrd {
95 crds: vec![crd::vpc_endpoint::v1::VpcEndpoint::crd()],
96 stored_version: String::from("v1"),
97 },
98 ],
99 FIELD_MANAGER,
100 ),
101 )
102 .await??;
103
104 info!("Done rewriting CRDs");
105
106 Ok(())
107}