mz_orchestratord/
k8s.rs
1use std::time::Duration;
11
12use kube::{
13 Api, Client, CustomResourceExt, Resource, ResourceExt,
14 api::{DeleteParams, Patch, PatchParams},
15};
16use mz_cloud_resources::crd::{self, VersionedCrd, register_versioned_crds};
17use serde::{Serialize, de::DeserializeOwned};
18use tracing::info;
19
20const FIELD_MANAGER: &str = "orchestratord.materialize.cloud";
21
22pub async fn get_resource<K>(api: &Api<K>, name: &str) -> Result<Option<K>, anyhow::Error>
23where
24 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
25 <K as Resource>::DynamicType: Default,
26{
27 match api.get(name).await {
28 Ok(k) => Ok(Some(k)),
29 Err(kube::Error::Api(e)) if e.code == 404 => Ok(None),
30 Err(e) => Err(e.into()),
31 }
32}
33
34pub async fn apply_resource<K>(api: &Api<K>, resource: &K) -> Result<(), anyhow::Error>
35where
36 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
37 <K as Resource>::DynamicType: Default,
38{
39 api.patch(
40 &resource.name_unchecked(),
41 &PatchParams::apply(FIELD_MANAGER).force(),
42 &Patch::Apply(resource),
43 )
44 .await?;
45 Ok(())
46}
47
48pub async fn delete_resource<K>(api: &Api<K>, name: &str) -> Result<(), anyhow::Error>
49where
50 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
51 <K as Resource>::DynamicType: Default,
52{
53 match kube::runtime::wait::delete::delete_and_finalize(
54 api.clone(),
55 name,
56 &DeleteParams::foreground(),
57 )
58 .await
59 {
60 Ok(_) => Ok(()),
61 Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e))) if e.code == 404 => {
62 Ok(())
64 }
65 Err(e) => Err(e.into()),
66 }
67}
68
69pub async fn register_crds(client: Client) -> Result<(), anyhow::Error> {
70 tokio::time::timeout(
71 Duration::from_secs(120),
72 register_versioned_crds(
73 client.clone(),
74 vec![
75 VersionedCrd {
76 crds: vec![crd::materialize::v1alpha1::Materialize::crd()],
77 stored_version: String::from("v1alpha1"),
78 },
79 VersionedCrd {
80 crds: vec![crd::vpc_endpoint::v1::VpcEndpoint::crd()],
81 stored_version: String::from("v1"),
82 },
83 ],
84 FIELD_MANAGER,
85 ),
86 )
87 .await??;
88
89 info!("Done rewriting CRDs");
90
91 Ok(())
92}