1use std::time::Duration;
11
12use k8s_openapi::{
13 ByteString,
14 apiextensions_apiserver::pkg::apis::apiextensions::v1::{
15 CustomResourceColumnDefinition, CustomResourceConversion, ServiceReference,
16 WebhookClientConfig, WebhookConversion,
17 },
18};
19use kube::{
20 Api, Client, CustomResourceExt, Resource, ResourceExt,
21 api::{DeleteParams, Patch, PatchParams, PostParams},
22 core::Status,
23 core::response::StatusSummary,
24};
25use serde::{Serialize, de::DeserializeOwned};
26use tracing::info;
27
28use mz_cloud_resources::crd::{self, VersionedCrd, register_versioned_crds};
29
30const FIELD_MANAGER: &str = "orchestratord.materialize.cloud";
31
32pub async fn get_resource<K>(api: &Api<K>, name: &str) -> Result<Option<K>, anyhow::Error>
33where
34 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
35 <K as Resource>::DynamicType: Default,
36{
37 match api.get(name).await {
38 Ok(k) => Ok(Some(k)),
39 Err(kube::Error::Api(e)) if e.code == 404 => Ok(None),
40 Err(e) => Err(e.into()),
41 }
42}
43
44pub async fn apply_resource<K>(api: &Api<K>, resource: &K) -> Result<K, anyhow::Error>
45where
46 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
47 <K as Resource>::DynamicType: Default,
48{
49 Ok(api
50 .patch(
51 &resource.name_unchecked(),
52 &PatchParams::apply(FIELD_MANAGER).force(),
53 &Patch::Apply(resource),
54 )
55 .await?)
56}
57
58pub async fn replace_resource<K>(api: &Api<K>, resource: &K) -> Result<K, anyhow::Error>
59where
60 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
61 <K as Resource>::DynamicType: Default,
62{
63 if resource.meta().resource_version.is_none() {
64 return Err(kube::Error::Api(
65 Box::new(Status {
66 status: Some(StatusSummary::Failure),
67 message: "Must use apply_resource instead of replace_resource to apply fully created resources.".to_string(),
68 reason: "BadRequest".to_string(),
69 code: 400,
70 metadata: None,
71 details: None,
72 }),
73 )
74 .into());
75 }
76 Ok(api
77 .replace(&resource.name_unchecked(), &PostParams::default(), resource)
78 .await?)
79}
80
81pub async fn delete_resource<K>(api: &Api<K>, name: &str) -> Result<(), anyhow::Error>
82where
83 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
84 <K as Resource>::DynamicType: Default,
85{
86 match kube::runtime::wait::delete::delete_and_finalize(
87 api.clone(),
88 name,
89 &DeleteParams::foreground(),
90 )
91 .await
92 {
93 Ok(_) => Ok(()),
94 Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e))) if e.code == 404 => {
95 Ok(())
97 }
98 Err(e) => Err(e.into()),
99 }
100}
101
102#[derive(Debug, Clone)]
107pub struct ConversionWebhookConfig {
108 pub service_name: String,
109 pub service_namespace: String,
110 pub service_port: u16,
111 pub ca_cert_path: String,
112}
113
114pub async fn register_crds(
115 client: Client,
116 additional_crd_columns: Vec<CustomResourceColumnDefinition>,
117 conversion_webhook: Option<ConversionWebhookConfig>,
118) -> Result<(), anyhow::Error> {
119 let (mut mz_crds, mz_conversion) = match conversion_webhook {
120 Some(config) => {
121 let ca_bytes = tokio::fs::read(config.ca_cert_path).await?;
122 let conversion = CustomResourceConversion {
123 strategy: "Webhook".to_owned(),
124 webhook: Some(WebhookConversion {
125 client_config: Some(WebhookClientConfig {
126 ca_bundle: Some(ByteString(ca_bytes)),
127 service: Some(ServiceReference {
128 name: config.service_name,
129 namespace: config.service_namespace,
130 path: Some("/convert".to_owned()),
131 port: Some(config.service_port.into()),
132 }),
133 url: None,
134 }),
135 conversion_review_versions: vec!["v1".to_owned()],
136 }),
137 };
138 (
139 vec![
140 crd::materialize::v1::Materialize::crd(),
141 crd::materialize::v1alpha1::Materialize::crd(),
142 ],
143 Some(conversion),
144 )
145 }
146 None => (vec![crd::materialize::v1alpha1::Materialize::crd()], None),
147 };
148 let default_columns = mz_crds[0].spec.versions[0]
149 .additional_printer_columns
150 .take()
151 .expect("should contain ImageRef and UpToDate columns");
152 mz_crds[0].spec.versions[0].additional_printer_columns = Some(
153 additional_crd_columns
154 .into_iter()
155 .chain(default_columns)
156 .collect(),
157 );
158 tokio::time::timeout(
159 Duration::from_secs(120),
160 register_versioned_crds(
161 client.clone(),
162 vec![
163 VersionedCrd {
164 crds: mz_crds,
165 stored_version: String::from("v1alpha1"),
166 conversion: mz_conversion,
167 },
168 VersionedCrd {
169 crds: vec![crd::balancer::v1alpha1::Balancer::crd()],
170 stored_version: String::from("v1alpha1"),
171 conversion: None,
172 },
173 VersionedCrd {
174 crds: vec![crd::console::v1alpha1::Console::crd()],
175 stored_version: String::from("v1alpha1"),
176 conversion: None,
177 },
178 VersionedCrd {
179 crds: vec![crd::vpc_endpoint::v1::VpcEndpoint::crd()],
180 stored_version: String::from("v1"),
181 conversion: None,
182 },
183 ],
184 FIELD_MANAGER,
185 ),
186 )
187 .await??;
188
189 info!("Done rewriting CRDs");
190
191 Ok(())
192}