mz_orchestrator_kubernetes/
cloud_resource_controller.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Management of K8S objects, such as VpcEndpoints.
11
12use std::collections::BTreeMap;
13use std::sync::Arc;
14
15use async_trait::async_trait;
16use chrono::Utc;
17use futures::StreamExt;
18use futures::stream::BoxStream;
19use kube::api::{DeleteParams, ListParams, ObjectMeta, Patch, PatchParams};
20use kube::runtime::{WatchStreamExt, watcher};
21use kube::{Api, ResourceExt};
22use maplit::btreemap;
23use mz_repr::CatalogItemId;
24
25use mz_cloud_resources::crd::vpc_endpoint::v1::{
26    VpcEndpoint, VpcEndpointSpec, VpcEndpointState, VpcEndpointStatus,
27};
28use mz_cloud_resources::{
29    CloudResourceController, CloudResourceReader, VpcEndpointConfig, VpcEndpointEvent,
30};
31
32use crate::{FIELD_MANAGER, KubernetesOrchestrator, util};
33
34#[async_trait]
35impl CloudResourceController for KubernetesOrchestrator {
36    async fn ensure_vpc_endpoint(
37        &self,
38        id: CatalogItemId,
39        config: VpcEndpointConfig,
40    ) -> Result<(), anyhow::Error> {
41        let name = mz_cloud_resources::vpc_endpoint_name(id);
42        let mut labels = btreemap! {
43            "environmentd.materialize.cloud/connection-id".to_owned() => id.to_string(),
44        };
45        for (key, value) in &self.config.service_labels {
46            labels.insert(key.clone(), value.clone());
47        }
48        let vpc_endpoint = VpcEndpoint {
49            metadata: ObjectMeta {
50                labels: Some(labels),
51                name: Some(name.clone()),
52                namespace: Some(self.kubernetes_namespace.clone()),
53                ..Default::default()
54            },
55            spec: VpcEndpointSpec {
56                aws_service_name: config.aws_service_name,
57                availability_zone_ids: config.availability_zone_ids,
58                role_suffix: match &self.config.aws_external_id_prefix {
59                    None => id.to_string(),
60                    Some(external_id) => format!("{external_id}_{id}"),
61                },
62            },
63            status: None,
64        };
65        self.vpc_endpoint_api
66            .patch(
67                &name,
68                &PatchParams::apply(FIELD_MANAGER).force(),
69                &Patch::Apply(vpc_endpoint),
70            )
71            .await?;
72        Ok(())
73    }
74
75    async fn delete_vpc_endpoint(&self, id: CatalogItemId) -> Result<(), anyhow::Error> {
76        match self
77            .vpc_endpoint_api
78            .delete(
79                &mz_cloud_resources::vpc_endpoint_name(id),
80                &DeleteParams::default(),
81            )
82            .await
83        {
84            Ok(_) => Ok(()),
85            // Ignore already deleted endpoints.
86            Err(kube::Error::Api(resp)) if resp.code == 404 => Ok(()),
87            Err(e) => Err(e.into()),
88        }
89    }
90
91    async fn list_vpc_endpoints(
92        &self,
93    ) -> Result<BTreeMap<CatalogItemId, VpcEndpointStatus>, anyhow::Error> {
94        let objects = self.vpc_endpoint_api.list(&ListParams::default()).await?;
95        let mut endpoints = BTreeMap::new();
96        for object in objects {
97            let id = match mz_cloud_resources::id_from_vpc_endpoint_name(&object.name_any()) {
98                Some(id) => id,
99                // Ignore any object whose name can't be parsed as a GlobalId
100                None => continue,
101            };
102            endpoints.insert(id, object.status.unwrap_or_default());
103        }
104        Ok(endpoints)
105    }
106
107    async fn watch_vpc_endpoints(&self) -> BoxStream<'static, VpcEndpointEvent> {
108        let stream = watcher(self.vpc_endpoint_api.clone(), watcher::Config::default())
109            .touched_objects()
110            .filter_map(|object| async move {
111                match object {
112                    Ok(vpce) => {
113                        let connection_id =
114                            mz_cloud_resources::id_from_vpc_endpoint_name(&vpce.name_any())?;
115
116                        if let Some(state) = vpce.status.as_ref().and_then(|st| st.state.to_owned())
117                        {
118                            Some(VpcEndpointEvent {
119                                connection_id,
120                                status: state,
121                                // Use the 'Available' Condition on the VPCE Status to set the event-time, falling back
122                                // to now if it's not set
123                                time: vpce
124                                    .status
125                                    .unwrap()
126                                    .conditions
127                                    .and_then(|c| c.into_iter().find(|c| &c.type_ == "Available"))
128                                    .and_then(|condition| Some(condition.last_transition_time.0))
129                                    .unwrap_or_else(Utc::now),
130                            })
131                        } else {
132                            // The Status/State is not yet populated on the VpcEndpoint, which means it was just
133                            // initialized and hasn't yet been reconciled by the environment-controller
134                            // We return an event with an 'unknown' state so that watchers know the VpcEndpoint was created
135                            // even if we don't yet have an accurate status
136                            Some(VpcEndpointEvent {
137                                connection_id,
138                                status: VpcEndpointState::Unknown,
139                                time: vpce.creation_timestamp()?.0,
140                            })
141                        }
142                        // TODO: Should we also check for the deletion_timestamp on the vpce? That would indicate that the
143                        // resource is about to be deleted; however there is already a 'deleted' enum val on VpcEndpointState
144                        // which refers to the state of the customer's VPC Endpoint Service, so we'd need to introduce a new state val
145                    }
146                    Err(error) => {
147                        // We assume that errors returned by Kubernetes are usually transient, so we
148                        // just log a warning and ignore them otherwise.
149                        tracing::warn!("vpc endpoint watch error: {error}");
150                        None
151                    }
152                }
153            });
154        Box::pin(stream)
155    }
156
157    fn reader(&self) -> Arc<dyn CloudResourceReader> {
158        let reader = Arc::clone(&self.resource_reader);
159        reader
160    }
161}
162
163#[async_trait]
164impl CloudResourceReader for KubernetesOrchestrator {
165    async fn read(&self, id: CatalogItemId) -> Result<VpcEndpointStatus, anyhow::Error> {
166        self.resource_reader.read(id).await
167    }
168}
169
170/// Reads cloud resources managed by a [`KubernetesOrchestrator`].
171#[derive(Debug)]
172pub struct KubernetesResourceReader {
173    vpc_endpoint_api: Api<VpcEndpoint>,
174}
175
176impl KubernetesResourceReader {
177    /// Constructs a new Kubernetes cloud resource reader.
178    ///
179    /// The `context` parameter works like
180    /// [`KubernetesOrchestratorConfig::context`](crate::KubernetesOrchestratorConfig::context).
181    pub async fn new(context: String) -> Result<KubernetesResourceReader, anyhow::Error> {
182        let (client, _) = util::create_client(context).await?;
183        let vpc_endpoint_api: Api<VpcEndpoint> = Api::default_namespaced(client);
184        Ok(KubernetesResourceReader { vpc_endpoint_api })
185    }
186}
187
188#[async_trait]
189impl CloudResourceReader for KubernetesResourceReader {
190    async fn read(&self, id: CatalogItemId) -> Result<VpcEndpointStatus, anyhow::Error> {
191        let name = mz_cloud_resources::vpc_endpoint_name(id);
192        let endpoint = self.vpc_endpoint_api.get(&name).await?;
193        Ok(endpoint.status.unwrap_or_default())
194    }
195}