mz_orchestrator_kubernetes/
cloud_resource_controller.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Management of K8S objects, such as VpcEndpoints.
11
12use std::collections::BTreeMap;
13use std::sync::Arc;
14use std::time::Duration;
15
16use async_trait::async_trait;
17use chrono::Utc;
18use futures::stream::BoxStream;
19use futures::{StreamExt, TryFutureExt};
20use kube::api::{DeleteParams, ListParams, ObjectMeta, Patch, PatchParams};
21use kube::runtime::{WatchStreamExt, watcher};
22use kube::{Api, ResourceExt};
23use maplit::btreemap;
24use mz_ore::retry::Retry;
25use mz_repr::CatalogItemId;
26use tracing::warn;
27
28use mz_cloud_resources::crd::vpc_endpoint::v1::{
29    VpcEndpoint, VpcEndpointSpec, VpcEndpointState, VpcEndpointStatus,
30};
31use mz_cloud_resources::{
32    CloudResourceController, CloudResourceReader, VpcEndpointConfig, VpcEndpointEvent,
33};
34
35use crate::{FIELD_MANAGER, KubernetesOrchestrator, util};
36
37#[async_trait]
38impl CloudResourceController for KubernetesOrchestrator {
39    async fn ensure_vpc_endpoint(
40        &self,
41        id: CatalogItemId,
42        config: VpcEndpointConfig,
43    ) -> Result<(), anyhow::Error> {
44        let name = mz_cloud_resources::vpc_endpoint_name(id);
45        let mut labels = btreemap! {
46            "environmentd.materialize.cloud/connection-id".to_owned() => id.to_string(),
47        };
48        for (key, value) in &self.config.service_labels {
49            labels.insert(key.clone(), value.clone());
50        }
51        let vpc_endpoint = VpcEndpoint {
52            metadata: ObjectMeta {
53                labels: Some(labels),
54                name: Some(name.clone()),
55                namespace: Some(self.kubernetes_namespace.clone()),
56                ..Default::default()
57            },
58            spec: VpcEndpointSpec {
59                aws_service_name: config.aws_service_name,
60                availability_zone_ids: config.availability_zone_ids,
61                role_suffix: match &self.config.aws_external_id_prefix {
62                    None => id.to_string(),
63                    Some(external_id) => format!("{external_id}_{id}"),
64                },
65            },
66            status: None,
67        };
68
69        call_api(&self.vpc_endpoint_api, async |api| {
70            let endpoint = vpc_endpoint.clone();
71            api.patch(
72                &name,
73                &PatchParams::apply(FIELD_MANAGER).force(),
74                &Patch::Apply(endpoint),
75            )
76            .await
77        })
78        .await?;
79
80        Ok(())
81    }
82
83    async fn delete_vpc_endpoint(&self, id: CatalogItemId) -> Result<(), anyhow::Error> {
84        call_api(&self.vpc_endpoint_api, async |api| {
85            match api
86                .delete(
87                    &mz_cloud_resources::vpc_endpoint_name(id),
88                    &DeleteParams::default(),
89                )
90                .await
91            {
92                Ok(_) => Ok(()),
93                // Ignore already deleted endpoints.
94                Err(kube::Error::Api(resp)) if resp.code == 404 => Ok(()),
95                Err(e) => Err(e.into()),
96            }
97        })
98        .await
99    }
100
101    async fn list_vpc_endpoints(
102        &self,
103    ) -> Result<BTreeMap<CatalogItemId, VpcEndpointStatus>, anyhow::Error> {
104        let objects = call_api(&self.vpc_endpoint_api, async |api| {
105            api.list(&ListParams::default()).await
106        })
107        .await?;
108
109        let mut endpoints = BTreeMap::new();
110        for object in objects {
111            let id = match mz_cloud_resources::id_from_vpc_endpoint_name(&object.name_any()) {
112                Some(id) => id,
113                // Ignore any object whose name can't be parsed as a GlobalId
114                None => continue,
115            };
116            endpoints.insert(id, object.status.unwrap_or_default());
117        }
118        Ok(endpoints)
119    }
120
121    async fn watch_vpc_endpoints(&self) -> BoxStream<'static, VpcEndpointEvent> {
122        let stream = watcher(
123            self.vpc_endpoint_api.clone(),
124            // This watcher timeout must be shorter than the client read timeout.
125            watcher::Config::default().timeout(59),
126        )
127        .touched_objects()
128        .filter_map(|object| async move {
129            match object {
130                Ok(vpce) => {
131                    let connection_id =
132                        mz_cloud_resources::id_from_vpc_endpoint_name(&vpce.name_any())?;
133
134                    if let Some(state) = vpce.status.as_ref().and_then(|st| st.state.to_owned()) {
135                        Some(VpcEndpointEvent {
136                            connection_id,
137                            status: state,
138                            // Use the 'Available' Condition on the VPCE Status to set the event-time, falling back
139                            // to now if it's not set
140                            time: vpce
141                                .status
142                                .unwrap()
143                                .conditions
144                                .and_then(|c| c.into_iter().find(|c| &c.type_ == "Available"))
145                                .and_then(|condition| Some(condition.last_transition_time.0))
146                                .unwrap_or_else(Utc::now),
147                        })
148                    } else {
149                        // The Status/State is not yet populated on the VpcEndpoint, which means it was just
150                        // initialized and hasn't yet been reconciled by the environment-controller
151                        // We return an event with an 'unknown' state so that watchers know the VpcEndpoint was created
152                        // even if we don't yet have an accurate status
153                        Some(VpcEndpointEvent {
154                            connection_id,
155                            status: VpcEndpointState::Unknown,
156                            time: vpce.creation_timestamp()?.0,
157                        })
158                    }
159                    // TODO: Should we also check for the deletion_timestamp on the vpce? That would indicate that the
160                    // resource is about to be deleted; however there is already a 'deleted' enum val on VpcEndpointState
161                    // which refers to the state of the customer's VPC Endpoint Service, so we'd need to introduce a new state val
162                }
163                Err(error) => {
164                    // We assume that errors returned by Kubernetes are usually transient, so we
165                    // just log a warning and ignore them otherwise.
166                    tracing::warn!("vpc endpoint watch error: {error}");
167                    None
168                }
169            }
170        });
171        Box::pin(stream)
172    }
173
174    fn reader(&self) -> Arc<dyn CloudResourceReader> {
175        let reader = Arc::clone(&self.resource_reader);
176        reader
177    }
178}
179
180#[async_trait]
181impl CloudResourceReader for KubernetesOrchestrator {
182    async fn read(&self, id: CatalogItemId) -> Result<VpcEndpointStatus, anyhow::Error> {
183        self.resource_reader.read(id).await
184    }
185}
186
187/// Reads cloud resources managed by a [`KubernetesOrchestrator`].
188#[derive(Debug)]
189pub struct KubernetesResourceReader {
190    vpc_endpoint_api: Api<VpcEndpoint>,
191}
192
193impl KubernetesResourceReader {
194    /// Constructs a new Kubernetes cloud resource reader.
195    ///
196    /// The `context` parameter works like
197    /// [`KubernetesOrchestratorConfig::context`](crate::KubernetesOrchestratorConfig::context).
198    pub async fn new(context: String) -> Result<KubernetesResourceReader, anyhow::Error> {
199        let (client, _) = util::create_client(context).await?;
200        let vpc_endpoint_api: Api<VpcEndpoint> = Api::default_namespaced(client);
201        Ok(KubernetesResourceReader { vpc_endpoint_api })
202    }
203}
204
205#[async_trait]
206impl CloudResourceReader for KubernetesResourceReader {
207    async fn read(&self, id: CatalogItemId) -> Result<VpcEndpointStatus, anyhow::Error> {
208        let name = mz_cloud_resources::vpc_endpoint_name(id);
209        let endpoint = call_api(&self.vpc_endpoint_api, |api| api.get(&name)).await?;
210        Ok(endpoint.status.unwrap_or_default())
211    }
212}
213
214/// Helper for calling a kube API with (limited) retry.
215async fn call_api<'a, K, F, U, T, E>(api: &'a Api<K>, f: F) -> Result<T, E>
216where
217    F: Fn(&'a Api<K>) -> U + 'a,
218    U: Future<Output = Result<T, E>>,
219    E: std::fmt::Display,
220{
221    Retry::default()
222        .clamp_backoff(Duration::from_secs(10))
223        .max_duration(Duration::from_secs(60 * 10))
224        .retry_async(|_| f(api).inspect_err(|e| warn!("VPC endpoint API call failed: {e}")))
225        .await
226}