mz_orchestrator_kubernetes/
cloud_resource_controller.rs1use std::collections::BTreeMap;
13use std::sync::Arc;
14
15use async_trait::async_trait;
16use chrono::Utc;
17use futures::StreamExt;
18use futures::stream::BoxStream;
19use kube::api::{DeleteParams, ListParams, ObjectMeta, Patch, PatchParams};
20use kube::runtime::{WatchStreamExt, watcher};
21use kube::{Api, ResourceExt};
22use maplit::btreemap;
23use mz_repr::CatalogItemId;
24
25use mz_cloud_resources::crd::vpc_endpoint::v1::{
26 VpcEndpoint, VpcEndpointSpec, VpcEndpointState, VpcEndpointStatus,
27};
28use mz_cloud_resources::{
29 CloudResourceController, CloudResourceReader, VpcEndpointConfig, VpcEndpointEvent,
30};
31
32use crate::{FIELD_MANAGER, KubernetesOrchestrator, util};
33
34#[async_trait]
35impl CloudResourceController for KubernetesOrchestrator {
36 async fn ensure_vpc_endpoint(
37 &self,
38 id: CatalogItemId,
39 config: VpcEndpointConfig,
40 ) -> Result<(), anyhow::Error> {
41 let name = mz_cloud_resources::vpc_endpoint_name(id);
42 let mut labels = btreemap! {
43 "environmentd.materialize.cloud/connection-id".to_owned() => id.to_string(),
44 };
45 for (key, value) in &self.config.service_labels {
46 labels.insert(key.clone(), value.clone());
47 }
48 let vpc_endpoint = VpcEndpoint {
49 metadata: ObjectMeta {
50 labels: Some(labels),
51 name: Some(name.clone()),
52 namespace: Some(self.kubernetes_namespace.clone()),
53 ..Default::default()
54 },
55 spec: VpcEndpointSpec {
56 aws_service_name: config.aws_service_name,
57 availability_zone_ids: config.availability_zone_ids,
58 role_suffix: match &self.config.aws_external_id_prefix {
59 None => id.to_string(),
60 Some(external_id) => format!("{external_id}_{id}"),
61 },
62 },
63 status: None,
64 };
65 self.vpc_endpoint_api
66 .patch(
67 &name,
68 &PatchParams::apply(FIELD_MANAGER).force(),
69 &Patch::Apply(vpc_endpoint),
70 )
71 .await?;
72 Ok(())
73 }
74
75 async fn delete_vpc_endpoint(&self, id: CatalogItemId) -> Result<(), anyhow::Error> {
76 match self
77 .vpc_endpoint_api
78 .delete(
79 &mz_cloud_resources::vpc_endpoint_name(id),
80 &DeleteParams::default(),
81 )
82 .await
83 {
84 Ok(_) => Ok(()),
85 Err(kube::Error::Api(resp)) if resp.code == 404 => Ok(()),
87 Err(e) => Err(e.into()),
88 }
89 }
90
91 async fn list_vpc_endpoints(
92 &self,
93 ) -> Result<BTreeMap<CatalogItemId, VpcEndpointStatus>, anyhow::Error> {
94 let objects = self.vpc_endpoint_api.list(&ListParams::default()).await?;
95 let mut endpoints = BTreeMap::new();
96 for object in objects {
97 let id = match mz_cloud_resources::id_from_vpc_endpoint_name(&object.name_any()) {
98 Some(id) => id,
99 None => continue,
101 };
102 endpoints.insert(id, object.status.unwrap_or_default());
103 }
104 Ok(endpoints)
105 }
106
107 async fn watch_vpc_endpoints(&self) -> BoxStream<'static, VpcEndpointEvent> {
108 let stream = watcher(self.vpc_endpoint_api.clone(), watcher::Config::default())
109 .touched_objects()
110 .filter_map(|object| async move {
111 match object {
112 Ok(vpce) => {
113 let connection_id =
114 mz_cloud_resources::id_from_vpc_endpoint_name(&vpce.name_any())?;
115
116 if let Some(state) = vpce.status.as_ref().and_then(|st| st.state.to_owned())
117 {
118 Some(VpcEndpointEvent {
119 connection_id,
120 status: state,
121 time: vpce
124 .status
125 .unwrap()
126 .conditions
127 .and_then(|c| c.into_iter().find(|c| &c.type_ == "Available"))
128 .and_then(|condition| Some(condition.last_transition_time.0))
129 .unwrap_or_else(Utc::now),
130 })
131 } else {
132 Some(VpcEndpointEvent {
137 connection_id,
138 status: VpcEndpointState::Unknown,
139 time: vpce.creation_timestamp()?.0,
140 })
141 }
142 }
146 Err(error) => {
147 tracing::warn!("vpc endpoint watch error: {error}");
150 None
151 }
152 }
153 });
154 Box::pin(stream)
155 }
156
157 fn reader(&self) -> Arc<dyn CloudResourceReader> {
158 let reader = Arc::clone(&self.resource_reader);
159 reader
160 }
161}
162
163#[async_trait]
164impl CloudResourceReader for KubernetesOrchestrator {
165 async fn read(&self, id: CatalogItemId) -> Result<VpcEndpointStatus, anyhow::Error> {
166 self.resource_reader.read(id).await
167 }
168}
169
170#[derive(Debug)]
172pub struct KubernetesResourceReader {
173 vpc_endpoint_api: Api<VpcEndpoint>,
174}
175
176impl KubernetesResourceReader {
177 pub async fn new(context: String) -> Result<KubernetesResourceReader, anyhow::Error> {
182 let (client, _) = util::create_client(context).await?;
183 let vpc_endpoint_api: Api<VpcEndpoint> = Api::default_namespaced(client);
184 Ok(KubernetesResourceReader { vpc_endpoint_api })
185 }
186}
187
188#[async_trait]
189impl CloudResourceReader for KubernetesResourceReader {
190 async fn read(&self, id: CatalogItemId) -> Result<VpcEndpointStatus, anyhow::Error> {
191 let name = mz_cloud_resources::vpc_endpoint_name(id);
192 let endpoint = self.vpc_endpoint_api.get(&name).await?;
193 Ok(endpoint.status.unwrap_or_default())
194 }
195}