1use std::{future::ready, time::Duration};
11
12use futures::StreamExt;
13use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceColumnDefinition;
14use kube::{
15 Api, Client, CustomResourceExt, Resource, ResourceExt,
16 api::{DeleteParams, Patch, PatchParams, PostParams},
17 core::Status,
18 core::response::StatusSummary,
19 runtime::{reflector, watcher},
20};
21use serde::{Serialize, de::DeserializeOwned};
22use tracing::{info, warn};
23
24use mz_cloud_resources::crd::{self, VersionedCrd, register_versioned_crds};
25
26const FIELD_MANAGER: &str = "orchestratord.materialize.cloud";
27
28pub async fn get_resource<K>(api: &Api<K>, name: &str) -> Result<Option<K>, anyhow::Error>
29where
30 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
31 <K as Resource>::DynamicType: Default,
32{
33 match api.get(name).await {
34 Ok(k) => Ok(Some(k)),
35 Err(kube::Error::Api(e)) if e.code == 404 => Ok(None),
36 Err(e) => Err(e.into()),
37 }
38}
39
40pub async fn apply_resource<K>(api: &Api<K>, resource: &K) -> Result<K, anyhow::Error>
41where
42 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
43 <K as Resource>::DynamicType: Default,
44{
45 Ok(api
46 .patch(
47 &resource.name_unchecked(),
48 &PatchParams::apply(FIELD_MANAGER).force(),
49 &Patch::Apply(resource),
50 )
51 .await?)
52}
53
54pub async fn replace_resource<K>(api: &Api<K>, resource: &K) -> Result<K, anyhow::Error>
55where
56 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
57 <K as Resource>::DynamicType: Default,
58{
59 if resource.meta().resource_version.is_none() {
60 return Err(kube::Error::Api(
61 Box::new(Status {
62 status: Some(StatusSummary::Failure),
63 message: "Must use apply_resource instead of replace_resource to apply fully created resources.".to_string(),
64 reason: "BadRequest".to_string(),
65 code: 400,
66 metadata: None,
67 details: None,
68 }),
69 )
70 .into());
71 }
72 Ok(api
73 .replace(&resource.name_unchecked(), &PostParams::default(), resource)
74 .await?)
75}
76
77pub async fn delete_resource<K>(api: &Api<K>, name: &str) -> Result<(), anyhow::Error>
78where
79 K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
80 <K as Resource>::DynamicType: Default,
81{
82 match kube::runtime::wait::delete::delete_and_finalize(
83 api.clone(),
84 name,
85 &DeleteParams::foreground(),
86 )
87 .await
88 {
89 Ok(_) => Ok(()),
90 Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e))) if e.code == 404 => {
91 Ok(())
93 }
94 Err(e) => Err(e.into()),
95 }
96}
97
98pub async fn register_crds(
99 client: Client,
100 additional_crd_columns: Vec<CustomResourceColumnDefinition>,
101) -> Result<(), anyhow::Error> {
102 let mut mz_crd = crd::materialize::v1alpha1::Materialize::crd();
103 let default_columns = mz_crd.spec.versions[0]
104 .additional_printer_columns
105 .take()
106 .expect("should contain ImageRef and UpToDate columns");
107 mz_crd.spec.versions[0].additional_printer_columns = Some(
108 additional_crd_columns
109 .into_iter()
110 .chain(default_columns)
111 .collect(),
112 );
113 tokio::time::timeout(
114 Duration::from_secs(120),
115 register_versioned_crds(
116 client.clone(),
117 vec![
118 VersionedCrd {
119 crds: vec![mz_crd],
120 stored_version: String::from("v1alpha1"),
121 },
122 VersionedCrd {
123 crds: vec![crd::balancer::v1alpha1::Balancer::crd()],
124 stored_version: String::from("v1alpha1"),
125 },
126 VersionedCrd {
127 crds: vec![crd::console::v1alpha1::Console::crd()],
128 stored_version: String::from("v1alpha1"),
129 },
130 VersionedCrd {
131 crds: vec![crd::vpc_endpoint::v1::VpcEndpoint::crd()],
132 stored_version: String::from("v1"),
133 },
134 ],
135 FIELD_MANAGER,
136 ),
137 )
138 .await??;
139
140 info!("Done rewriting CRDs");
141
142 Ok(())
143}
144
145pub async fn make_reflector<K>(client: Client) -> reflector::Store<K>
146where
147 K: kube::Resource<DynamicType = ()>
148 + Clone
149 + Send
150 + Sync
151 + DeserializeOwned
152 + Serialize
153 + std::fmt::Debug
154 + 'static,
155{
156 let api = kube::Api::all(client);
157 let (store, writer) = reflector::store();
158 let reflector =
159 reflector::reflector(writer, watcher(api, watcher::Config::default().timeout(29)));
160 mz_ore::task::spawn(
161 || format!("{} reflector", K::kind(&Default::default())),
162 async {
163 reflector
164 .for_each(|res| {
165 if let Err(e) = res {
166 warn!("error in {} reflector: {}", K::kind(&Default::default()), e);
167 }
168 ready(())
169 })
170 .await
171 },
172 );
173 store.wait_until_ready().await.expect("writer dropped");
176 store
177}