1use std::{future::ready, time::Duration};
11
12use apiextensions::v1::CustomResourceColumnDefinition;
13use futures::StreamExt;
14use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions;
15use kube::{
16 Api, Client, CustomResourceExt, Resource, ResourceExt,
17 api::{DeleteParams, Patch, PatchParams, PostParams},
18 core::Status,
19 core::response::StatusSummary,
20 runtime::{reflector, watcher},
21};
22use serde::{Serialize, de::DeserializeOwned};
23use tracing::{info, warn};
24
25use mz_cloud_resources::crd::{self, VersionedCrd, register_versioned_crds};
26
27const FIELD_MANAGER: &str = "orchestratord.materialize.cloud";
28
29pub async fn get_resource<K>(api: &Api<K>, name: &str) -> Result<Option<K>, anyhow::Error>
30where
31 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
32 <K as Resource>::DynamicType: Default,
33{
34 match api.get(name).await {
35 Ok(k) => Ok(Some(k)),
36 Err(kube::Error::Api(e)) if e.code == 404 => Ok(None),
37 Err(e) => Err(e.into()),
38 }
39}
40
41pub async fn apply_resource<K>(api: &Api<K>, resource: &K) -> Result<K, anyhow::Error>
42where
43 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
44 <K as Resource>::DynamicType: Default,
45{
46 Ok(api
47 .patch(
48 &resource.name_unchecked(),
49 &PatchParams::apply(FIELD_MANAGER).force(),
50 &Patch::Apply(resource),
51 )
52 .await?)
53}
54
55pub async fn replace_resource<K>(api: &Api<K>, resource: &K) -> Result<K, anyhow::Error>
56where
57 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
58 <K as Resource>::DynamicType: Default,
59{
60 if resource.meta().resource_version.is_none() {
61 return Err(kube::Error::Api(
62 Box::new(Status {
63 status: Some(StatusSummary::Failure),
64 message: "Must use apply_resource instead of replace_resource to apply fully created resources.".to_string(),
65 reason: "BadRequest".to_string(),
66 code: 400,
67 metadata: None,
68 details: None,
69 }),
70 )
71 .into());
72 }
73 Ok(api
74 .replace(&resource.name_unchecked(), &PostParams::default(), resource)
75 .await?)
76}
77
78pub async fn delete_resource<K>(api: &Api<K>, name: &str) -> Result<(), anyhow::Error>
79where
80 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
81 <K as Resource>::DynamicType: Default,
82{
83 match kube::runtime::wait::delete::delete_and_finalize(
84 api.clone(),
85 name,
86 &DeleteParams::foreground(),
87 )
88 .await
89 {
90 Ok(_) => Ok(()),
91 Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e))) if e.code == 404 => {
92 Ok(())
94 }
95 Err(e) => Err(e.into()),
96 }
97}
98
99pub async fn register_crds(
100 client: Client,
101 additional_crd_columns: Vec<CustomResourceColumnDefinition>,
102) -> Result<(), anyhow::Error> {
103 let mut mz_crd = crd::materialize::v1alpha1::Materialize::crd();
104 let default_columns = mz_crd.spec.versions[0]
105 .additional_printer_columns
106 .take()
107 .expect("should contain ImageRef and UpToDate columns");
108 mz_crd.spec.versions[0].additional_printer_columns = Some(
109 additional_crd_columns
110 .into_iter()
111 .chain(default_columns)
112 .collect(),
113 );
114 tokio::time::timeout(
115 Duration::from_secs(120),
116 register_versioned_crds(
117 client.clone(),
118 vec![
119 VersionedCrd {
120 crds: vec![mz_crd],
121 stored_version: String::from("v1alpha1"),
122 },
123 VersionedCrd {
124 crds: vec![crd::balancer::v1alpha1::Balancer::crd()],
125 stored_version: String::from("v1alpha1"),
126 },
127 VersionedCrd {
128 crds: vec![crd::console::v1alpha1::Console::crd()],
129 stored_version: String::from("v1alpha1"),
130 },
131 VersionedCrd {
132 crds: vec![crd::vpc_endpoint::v1::VpcEndpoint::crd()],
133 stored_version: String::from("v1"),
134 },
135 ],
136 FIELD_MANAGER,
137 ),
138 )
139 .await??;
140
141 info!("Done rewriting CRDs");
142
143 Ok(())
144}
145
146pub async fn make_reflector<K>(client: Client) -> reflector::Store<K>
147where
148 K: kube::Resource<DynamicType = ()>
149 + Clone
150 + Send
151 + Sync
152 + DeserializeOwned
153 + Serialize
154 + std::fmt::Debug
155 + 'static,
156{
157 let api = kube::Api::all(client);
158 let (store, writer) = reflector::store();
159 let reflector =
160 reflector::reflector(writer, watcher(api, watcher::Config::default().timeout(29)));
161 mz_ore::task::spawn(
162 || format!("{} reflector", K::kind(&Default::default())),
163 async {
164 reflector
165 .for_each(|res| {
166 if let Err(e) = res {
167 warn!("error in {} reflector: {}", K::kind(&Default::default()), e);
168 }
169 ready(())
170 })
171 .await
172 },
173 );
174 store.wait_until_ready().await.expect("writer dropped");
177 store
178}