1use std::{future::ready, time::Duration};
11
12use futures::StreamExt;
13use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceColumnDefinition;
14use kube::{
15 Api, Client, CustomResourceExt, Resource, ResourceExt,
16 api::{DeleteParams, Patch, PatchParams},
17 runtime::{reflector, watcher},
18};
19use serde::{Serialize, de::DeserializeOwned};
20use tracing::{info, warn};
21
22use mz_cloud_resources::crd::{self, VersionedCrd, register_versioned_crds};
23
24const FIELD_MANAGER: &str = "orchestratord.materialize.cloud";
25
26pub async fn get_resource<K>(api: &Api<K>, name: &str) -> Result<Option<K>, anyhow::Error>
27where
28 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
29 <K as Resource>::DynamicType: Default,
30{
31 match api.get(name).await {
32 Ok(k) => Ok(Some(k)),
33 Err(kube::Error::Api(e)) if e.code == 404 => Ok(None),
34 Err(e) => Err(e.into()),
35 }
36}
37
38pub async fn apply_resource<K>(api: &Api<K>, resource: &K) -> Result<(), anyhow::Error>
39where
40 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
41 <K as Resource>::DynamicType: Default,
42{
43 api.patch(
44 &resource.name_unchecked(),
45 &PatchParams::apply(FIELD_MANAGER).force(),
46 &Patch::Apply(resource),
47 )
48 .await?;
49 Ok(())
50}
51
52pub async fn delete_resource<K>(api: &Api<K>, name: &str) -> Result<(), anyhow::Error>
53where
54 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
55 <K as Resource>::DynamicType: Default,
56{
57 match kube::runtime::wait::delete::delete_and_finalize(
58 api.clone(),
59 name,
60 &DeleteParams::foreground(),
61 )
62 .await
63 {
64 Ok(_) => Ok(()),
65 Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e))) if e.code == 404 => {
66 Ok(())
68 }
69 Err(e) => Err(e.into()),
70 }
71}
72
73pub async fn register_crds(
74 client: Client,
75 additional_crd_columns: Vec<CustomResourceColumnDefinition>,
76) -> Result<(), anyhow::Error> {
77 let mut mz_crd = crd::materialize::v1alpha1::Materialize::crd();
78 let default_columns = mz_crd.spec.versions[0]
79 .additional_printer_columns
80 .take()
81 .expect("should contain ImageRef and UpToDate columns");
82 mz_crd.spec.versions[0].additional_printer_columns = Some(
83 additional_crd_columns
84 .into_iter()
85 .chain(default_columns)
86 .collect(),
87 );
88 tokio::time::timeout(
89 Duration::from_secs(120),
90 register_versioned_crds(
91 client.clone(),
92 vec![
93 VersionedCrd {
94 crds: vec![mz_crd],
95 stored_version: String::from("v1alpha1"),
96 },
97 VersionedCrd {
98 crds: vec![crd::vpc_endpoint::v1::VpcEndpoint::crd()],
99 stored_version: String::from("v1"),
100 },
101 ],
102 FIELD_MANAGER,
103 ),
104 )
105 .await??;
106
107 info!("Done rewriting CRDs");
108
109 Ok(())
110}
111
112pub async fn make_reflector<K>(client: Client) -> reflector::Store<K>
113where
114 K: kube::Resource<DynamicType = ()>
115 + Clone
116 + Send
117 + Sync
118 + DeserializeOwned
119 + Serialize
120 + std::fmt::Debug
121 + 'static,
122{
123 let api = kube::Api::all(client);
124 let (store, writer) = reflector::store();
125 let reflector =
126 reflector::reflector(writer, watcher(api, watcher::Config::default().timeout(29)));
127 mz_ore::task::spawn(
128 || format!("{} reflector", K::kind(&Default::default())),
129 async {
130 reflector
131 .for_each(|res| {
132 if let Err(e) = res {
133 warn!("error in {} reflector: {}", K::kind(&Default::default()), e);
134 }
135 ready(())
136 })
137 .await
138 },
139 );
140 store.wait_until_ready().await.expect("writer dropped");
143 store
144}