mz_orchestrator_kubernetes/
cloud_resource_controller.rs1use std::collections::BTreeMap;
13use std::sync::Arc;
14use std::time::Duration;
15
16use async_trait::async_trait;
17use chrono::Utc;
18use futures::stream::BoxStream;
19use futures::{StreamExt, TryFutureExt};
20use kube::api::{DeleteParams, ListParams, ObjectMeta, Patch, PatchParams};
21use kube::runtime::{WatchStreamExt, watcher};
22use kube::{Api, ResourceExt};
23use maplit::btreemap;
24use mz_ore::retry::Retry;
25use mz_repr::CatalogItemId;
26use tracing::warn;
27
28use mz_cloud_resources::crd::vpc_endpoint::v1::{
29 VpcEndpoint, VpcEndpointSpec, VpcEndpointState, VpcEndpointStatus,
30};
31use mz_cloud_resources::{
32 CloudResourceController, CloudResourceReader, VpcEndpointConfig, VpcEndpointEvent,
33};
34
35use crate::{FIELD_MANAGER, KubernetesOrchestrator, util};
36
37#[async_trait]
38impl CloudResourceController for KubernetesOrchestrator {
39 async fn ensure_vpc_endpoint(
40 &self,
41 id: CatalogItemId,
42 config: VpcEndpointConfig,
43 ) -> Result<(), anyhow::Error> {
44 let name = mz_cloud_resources::vpc_endpoint_name(id);
45 let mut labels = btreemap! {
46 "environmentd.materialize.cloud/connection-id".to_owned() => id.to_string(),
47 };
48 for (key, value) in &self.config.service_labels {
49 labels.insert(key.clone(), value.clone());
50 }
51 let vpc_endpoint = VpcEndpoint {
52 metadata: ObjectMeta {
53 labels: Some(labels),
54 name: Some(name.clone()),
55 namespace: Some(self.kubernetes_namespace.clone()),
56 ..Default::default()
57 },
58 spec: VpcEndpointSpec {
59 aws_service_name: config.aws_service_name,
60 availability_zone_ids: config.availability_zone_ids,
61 role_suffix: match &self.config.aws_external_id_prefix {
62 None => id.to_string(),
63 Some(external_id) => format!("{external_id}_{id}"),
64 },
65 },
66 status: None,
67 };
68
69 call_api(&self.vpc_endpoint_api, async |api| {
70 let endpoint = vpc_endpoint.clone();
71 api.patch(
72 &name,
73 &PatchParams::apply(FIELD_MANAGER).force(),
74 &Patch::Apply(endpoint),
75 )
76 .await
77 })
78 .await?;
79
80 Ok(())
81 }
82
83 async fn delete_vpc_endpoint(&self, id: CatalogItemId) -> Result<(), anyhow::Error> {
84 call_api(&self.vpc_endpoint_api, async |api| {
85 match api
86 .delete(
87 &mz_cloud_resources::vpc_endpoint_name(id),
88 &DeleteParams::default(),
89 )
90 .await
91 {
92 Ok(_) => Ok(()),
93 Err(kube::Error::Api(resp)) if resp.code == 404 => Ok(()),
95 Err(e) => Err(e.into()),
96 }
97 })
98 .await
99 }
100
101 async fn list_vpc_endpoints(
102 &self,
103 ) -> Result<BTreeMap<CatalogItemId, VpcEndpointStatus>, anyhow::Error> {
104 let objects = call_api(&self.vpc_endpoint_api, async |api| {
105 api.list(&ListParams::default()).await
106 })
107 .await?;
108
109 let mut endpoints = BTreeMap::new();
110 for object in objects {
111 let id = match mz_cloud_resources::id_from_vpc_endpoint_name(&object.name_any()) {
112 Some(id) => id,
113 None => continue,
115 };
116 endpoints.insert(id, object.status.unwrap_or_default());
117 }
118 Ok(endpoints)
119 }
120
121 async fn watch_vpc_endpoints(&self) -> BoxStream<'static, VpcEndpointEvent> {
122 let stream = watcher(
123 self.vpc_endpoint_api.clone(),
124 watcher::Config::default().timeout(59),
126 )
127 .touched_objects()
128 .filter_map(|object| async move {
129 match object {
130 Ok(vpce) => {
131 let connection_id =
132 mz_cloud_resources::id_from_vpc_endpoint_name(&vpce.name_any())?;
133
134 if let Some(state) = vpce.status.as_ref().and_then(|st| st.state.to_owned()) {
135 Some(VpcEndpointEvent {
136 connection_id,
137 status: state,
138 time: vpce
141 .status
142 .unwrap()
143 .conditions
144 .and_then(|c| c.into_iter().find(|c| &c.type_ == "Available"))
145 .and_then(|condition| Some(condition.last_transition_time.0))
146 .unwrap_or_else(Utc::now),
147 })
148 } else {
149 Some(VpcEndpointEvent {
154 connection_id,
155 status: VpcEndpointState::Unknown,
156 time: vpce.creation_timestamp()?.0,
157 })
158 }
159 }
163 Err(error) => {
164 tracing::warn!("vpc endpoint watch error: {error}");
167 None
168 }
169 }
170 });
171 Box::pin(stream)
172 }
173
174 fn reader(&self) -> Arc<dyn CloudResourceReader> {
175 let reader = Arc::clone(&self.resource_reader);
176 reader
177 }
178}
179
180#[async_trait]
181impl CloudResourceReader for KubernetesOrchestrator {
182 async fn read(&self, id: CatalogItemId) -> Result<VpcEndpointStatus, anyhow::Error> {
183 self.resource_reader.read(id).await
184 }
185}
186
187#[derive(Debug)]
189pub struct KubernetesResourceReader {
190 vpc_endpoint_api: Api<VpcEndpoint>,
191}
192
193impl KubernetesResourceReader {
194 pub async fn new(context: String) -> Result<KubernetesResourceReader, anyhow::Error> {
199 let (client, _) = util::create_client(context).await?;
200 let vpc_endpoint_api: Api<VpcEndpoint> = Api::default_namespaced(client);
201 Ok(KubernetesResourceReader { vpc_endpoint_api })
202 }
203}
204
205#[async_trait]
206impl CloudResourceReader for KubernetesResourceReader {
207 async fn read(&self, id: CatalogItemId) -> Result<VpcEndpointStatus, anyhow::Error> {
208 let name = mz_cloud_resources::vpc_endpoint_name(id);
209 let endpoint = call_api(&self.vpc_endpoint_api, |api| api.get(&name)).await?;
210 Ok(endpoint.status.unwrap_or_default())
211 }
212}
213
214async fn call_api<'a, K, F, U, T, E>(api: &'a Api<K>, f: F) -> Result<T, E>
216where
217 F: Fn(&'a Api<K>) -> U + 'a,
218 U: Future<Output = Result<T, E>>,
219 E: std::fmt::Display,
220{
221 Retry::default()
222 .clamp_backoff(Duration::from_secs(10))
223 .max_duration(Duration::from_secs(60 * 10))
224 .retry_async(|_| f(api).inspect_err(|e| warn!("VPC endpoint API call failed: {e}")))
225 .await
226}