1use std::time::Duration;
11
12use apiextensions::v1::CustomResourceColumnDefinition;
13use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions;
14use kube::{
15 Api, Client, CustomResourceExt, Resource, ResourceExt,
16 api::{DeleteParams, Patch, PatchParams, PostParams},
17 core::Status,
18 core::response::StatusSummary,
19};
20use serde::{Serialize, de::DeserializeOwned};
21use tracing::info;
22
23use mz_cloud_resources::crd::{self, VersionedCrd, register_versioned_crds};
24
25const FIELD_MANAGER: &str = "orchestratord.materialize.cloud";
26
27pub async fn get_resource<K>(api: &Api<K>, name: &str) -> Result<Option<K>, anyhow::Error>
28where
29 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
30 <K as Resource>::DynamicType: Default,
31{
32 match api.get(name).await {
33 Ok(k) => Ok(Some(k)),
34 Err(kube::Error::Api(e)) if e.code == 404 => Ok(None),
35 Err(e) => Err(e.into()),
36 }
37}
38
39pub async fn apply_resource<K>(api: &Api<K>, resource: &K) -> Result<K, anyhow::Error>
40where
41 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
42 <K as Resource>::DynamicType: Default,
43{
44 Ok(api
45 .patch(
46 &resource.name_unchecked(),
47 &PatchParams::apply(FIELD_MANAGER).force(),
48 &Patch::Apply(resource),
49 )
50 .await?)
51}
52
53pub async fn replace_resource<K>(api: &Api<K>, resource: &K) -> Result<K, anyhow::Error>
54where
55 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
56 <K as Resource>::DynamicType: Default,
57{
58 if resource.meta().resource_version.is_none() {
59 return Err(kube::Error::Api(
60 Box::new(Status {
61 status: Some(StatusSummary::Failure),
62 message: "Must use apply_resource instead of replace_resource to apply fully created resources.".to_string(),
63 reason: "BadRequest".to_string(),
64 code: 400,
65 metadata: None,
66 details: None,
67 }),
68 )
69 .into());
70 }
71 Ok(api
72 .replace(&resource.name_unchecked(), &PostParams::default(), resource)
73 .await?)
74}
75
76pub async fn delete_resource<K>(api: &Api<K>, name: &str) -> Result<(), anyhow::Error>
77where
78 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
79 <K as Resource>::DynamicType: Default,
80{
81 match kube::runtime::wait::delete::delete_and_finalize(
82 api.clone(),
83 name,
84 &DeleteParams::foreground(),
85 )
86 .await
87 {
88 Ok(_) => Ok(()),
89 Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e))) if e.code == 404 => {
90 Ok(())
92 }
93 Err(e) => Err(e.into()),
94 }
95}
96
97pub async fn register_crds(
98 client: Client,
99 additional_crd_columns: Vec<CustomResourceColumnDefinition>,
100) -> Result<(), anyhow::Error> {
101 let mut mz_crd = crd::materialize::v1alpha1::Materialize::crd();
102 let default_columns = mz_crd.spec.versions[0]
103 .additional_printer_columns
104 .take()
105 .expect("should contain ImageRef and UpToDate columns");
106 mz_crd.spec.versions[0].additional_printer_columns = Some(
107 additional_crd_columns
108 .into_iter()
109 .chain(default_columns)
110 .collect(),
111 );
112 tokio::time::timeout(
113 Duration::from_secs(120),
114 register_versioned_crds(
115 client.clone(),
116 vec![
117 VersionedCrd {
118 crds: vec![mz_crd],
119 stored_version: String::from("v1alpha1"),
120 },
121 VersionedCrd {
122 crds: vec![crd::balancer::v1alpha1::Balancer::crd()],
123 stored_version: String::from("v1alpha1"),
124 },
125 VersionedCrd {
126 crds: vec![crd::console::v1alpha1::Console::crd()],
127 stored_version: String::from("v1alpha1"),
128 },
129 VersionedCrd {
130 crds: vec![crd::vpc_endpoint::v1::VpcEndpoint::crd()],
131 stored_version: String::from("v1"),
132 },
133 ],
134 FIELD_MANAGER,
135 ),
136 )
137 .await??;
138
139 info!("Done rewriting CRDs");
140
141 Ok(())
142}