1use std::{future::ready, time::Duration};
11
12use futures::StreamExt;
13use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceColumnDefinition;
14use kube::{
15 Api, Client, CustomResourceExt, Resource, ResourceExt,
16 api::{DeleteParams, Patch, PatchParams, PostParams},
17 core::ErrorResponse,
18 runtime::{reflector, watcher},
19};
20use serde::{Serialize, de::DeserializeOwned};
21use tracing::{info, warn};
22
23use mz_cloud_resources::crd::{self, VersionedCrd, register_versioned_crds};
24
25const FIELD_MANAGER: &str = "orchestratord.materialize.cloud";
26
27pub async fn get_resource<K>(api: &Api<K>, name: &str) -> Result<Option<K>, anyhow::Error>
28where
29 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
30 <K as Resource>::DynamicType: Default,
31{
32 match api.get(name).await {
33 Ok(k) => Ok(Some(k)),
34 Err(kube::Error::Api(e)) if e.code == 404 => Ok(None),
35 Err(e) => Err(e.into()),
36 }
37}
38
39pub async fn apply_resource<K>(api: &Api<K>, resource: &K) -> Result<K, anyhow::Error>
40where
41 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
42 <K as Resource>::DynamicType: Default,
43{
44 Ok(api
45 .patch(
46 &resource.name_unchecked(),
47 &PatchParams::apply(FIELD_MANAGER).force(),
48 &Patch::Apply(resource),
49 )
50 .await?)
51}
52
53pub async fn replace_resource<K>(api: &Api<K>, resource: &K) -> Result<K, anyhow::Error>
54where
55 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
56 <K as Resource>::DynamicType: Default,
57{
58 if resource.meta().resource_version.is_none() {
59 return Err(kube::Error::Api(ErrorResponse {
60 status: "Failure".to_string(),
61 message: "Must use apply_resource instead of replace_resource to apply fully created resources.".to_string(),
62 reason: "BadRequest".to_string(),
63 code: 400,
64 }).into());
65 }
66 Ok(api
67 .replace(&resource.name_unchecked(), &PostParams::default(), resource)
68 .await?)
69}
70
71pub async fn delete_resource<K>(api: &Api<K>, name: &str) -> Result<(), anyhow::Error>
72where
73 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
74 <K as Resource>::DynamicType: Default,
75{
76 match kube::runtime::wait::delete::delete_and_finalize(
77 api.clone(),
78 name,
79 &DeleteParams::foreground(),
80 )
81 .await
82 {
83 Ok(_) => Ok(()),
84 Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e))) if e.code == 404 => {
85 Ok(())
87 }
88 Err(e) => Err(e.into()),
89 }
90}
91
92pub async fn register_crds(
93 client: Client,
94 additional_crd_columns: Vec<CustomResourceColumnDefinition>,
95) -> Result<(), anyhow::Error> {
96 let mut mz_crd = crd::materialize::v1alpha1::Materialize::crd();
97 let default_columns = mz_crd.spec.versions[0]
98 .additional_printer_columns
99 .take()
100 .expect("should contain ImageRef and UpToDate columns");
101 mz_crd.spec.versions[0].additional_printer_columns = Some(
102 additional_crd_columns
103 .into_iter()
104 .chain(default_columns)
105 .collect(),
106 );
107 tokio::time::timeout(
108 Duration::from_secs(120),
109 register_versioned_crds(
110 client.clone(),
111 vec![
112 VersionedCrd {
113 crds: vec![mz_crd],
114 stored_version: String::from("v1alpha1"),
115 },
116 VersionedCrd {
117 crds: vec![crd::balancer::v1alpha1::Balancer::crd()],
118 stored_version: String::from("v1alpha1"),
119 },
120 VersionedCrd {
121 crds: vec![crd::console::v1alpha1::Console::crd()],
122 stored_version: String::from("v1alpha1"),
123 },
124 VersionedCrd {
125 crds: vec![crd::vpc_endpoint::v1::VpcEndpoint::crd()],
126 stored_version: String::from("v1"),
127 },
128 ],
129 FIELD_MANAGER,
130 ),
131 )
132 .await??;
133
134 info!("Done rewriting CRDs");
135
136 Ok(())
137}
138
139pub async fn make_reflector<K>(client: Client) -> reflector::Store<K>
140where
141 K: kube::Resource<DynamicType = ()>
142 + Clone
143 + Send
144 + Sync
145 + DeserializeOwned
146 + Serialize
147 + std::fmt::Debug
148 + 'static,
149{
150 let api = kube::Api::all(client);
151 let (store, writer) = reflector::store();
152 let reflector =
153 reflector::reflector(writer, watcher(api, watcher::Config::default().timeout(29)));
154 mz_ore::task::spawn(
155 || format!("{} reflector", K::kind(&Default::default())),
156 async {
157 reflector
158 .for_each(|res| {
159 if let Err(e) = res {
160 warn!("error in {} reflector: {}", K::kind(&Default::default()), e);
161 }
162 ready(())
163 })
164 .await
165 },
166 );
167 store.wait_until_ready().await.expect("writer dropped");
170 store
171}