1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
// Copyright Materialize, Inc. and contributors. All rights reserved.
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! Management of K8S objects, such as VpcEndpoints.
use std::collections::BTreeMap;
use std::sync::Arc;
use async_trait::async_trait;
use chrono::Utc;
use futures::stream::BoxStream;
use futures::StreamExt;
use kube::api::{DeleteParams, ListParams, ObjectMeta, Patch, PatchParams};
use kube::runtime::{watcher, WatchStreamExt};
use kube::{Api, ResourceExt};
use maplit::btreemap;
use mz_repr::CatalogItemId;
use mz_cloud_resources::crd::vpc_endpoint::v1::{
VpcEndpoint, VpcEndpointSpec, VpcEndpointState, VpcEndpointStatus,
use mz_cloud_resources::{
CloudResourceController, CloudResourceReader, VpcEndpointConfig, VpcEndpointEvent,
use crate::{util, KubernetesOrchestrator, FIELD_MANAGER};
impl CloudResourceController for KubernetesOrchestrator {
async fn ensure_vpc_endpoint(
id: CatalogItemId,
config: VpcEndpointConfig,
) -> Result<(), anyhow::Error> {
let name = mz_cloud_resources::vpc_endpoint_name(id);
let mut labels = btreemap! {
"".to_owned() => id.to_string(),
for (key, value) in &self.config.service_labels {
labels.insert(key.clone(), value.clone());
let vpc_endpoint = VpcEndpoint {
metadata: ObjectMeta {
labels: Some(labels),
name: Some(name.clone()),
namespace: Some(self.kubernetes_namespace.clone()),
spec: VpcEndpointSpec {
aws_service_name: config.aws_service_name,
availability_zone_ids: config.availability_zone_ids,
role_suffix: match &self.config.aws_external_id_prefix {
None => id.to_string(),
Some(external_id) => format!("{external_id}_{id}"),
status: None,
async fn delete_vpc_endpoint(&self, id: CatalogItemId) -> Result<(), anyhow::Error> {
match self
Ok(_) => Ok(()),
// Ignore already deleted endpoints.
Err(kube::Error::Api(resp)) if resp.code == 404 => Ok(()),
Err(e) => Err(e.into()),
async fn list_vpc_endpoints(
) -> Result<BTreeMap<CatalogItemId, VpcEndpointStatus>, anyhow::Error> {
let objects = self.vpc_endpoint_api.list(&ListParams::default()).await?;
let mut endpoints = BTreeMap::new();
for object in objects {
let id = match mz_cloud_resources::id_from_vpc_endpoint_name(&object.name_any()) {
Some(id) => id,
// Ignore any object whose name can't be parsed as a GlobalId
None => continue,
endpoints.insert(id, object.status.unwrap_or_default());
async fn watch_vpc_endpoints(&self) -> BoxStream<'static, VpcEndpointEvent> {
let stream = watcher(self.vpc_endpoint_api.clone(), watcher::Config::default())
.filter_map(|object| async move {
match object {
Ok(vpce) => {
let connection_id =
if let Some(state) = vpce.status.as_ref().and_then(|st| st.state.to_owned())
Some(VpcEndpointEvent {
status: state,
// Use the 'Available' Condition on the VPCE Status to set the event-time, falling back
// to now if it's not set
time: vpce
.and_then(|c| c.into_iter().find(|c| &c.type_ == "Available"))
.and_then(|condition| Some(condition.last_transition_time.0))
} else {
// The Status/State is not yet populated on the VpcEndpoint, which means it was just
// initialized and hasn't yet been reconciled by the environment-controller
// We return an event with an 'unknown' state so that watchers know the VpcEndpoint was created
// even if we don't yet have an accurate status
Some(VpcEndpointEvent {
status: VpcEndpointState::Unknown,
time: vpce.creation_timestamp()?.0,
// TODO: Should we also check for the deletion_timestamp on the vpce? That would indicate that the
// resource is about to be deleted; however there is already a 'deleted' enum val on VpcEndpointState
// which refers to the state of the customer's VPC Endpoint Service, so we'd need to introduce a new state val
Err(error) => {
// We assume that errors returned by Kubernetes are usually transient, so we
// just log a warning and ignore them otherwise.
tracing::warn!("vpc endpoint watch error: {error}");
fn reader(&self) -> Arc<dyn CloudResourceReader> {
let reader = Arc::clone(&self.resource_reader);
impl CloudResourceReader for KubernetesOrchestrator {
async fn read(&self, id: CatalogItemId) -> Result<VpcEndpointStatus, anyhow::Error> {
/// Reads cloud resources managed by a [`KubernetesOrchestrator`].
pub struct KubernetesResourceReader {
vpc_endpoint_api: Api<VpcEndpoint>,
impl KubernetesResourceReader {
/// Constructs a new Kubernetes cloud resource reader.
/// The `context` parameter works like
/// [`KubernetesOrchestratorConfig::context`](crate::KubernetesOrchestratorConfig::context).
pub async fn new(context: String) -> Result<KubernetesResourceReader, anyhow::Error> {
let (client, _) = util::create_client(context).await?;
let vpc_endpoint_api: Api<VpcEndpoint> = Api::default_namespaced(client);
Ok(KubernetesResourceReader { vpc_endpoint_api })
impl CloudResourceReader for KubernetesResourceReader {
async fn read(&self, id: CatalogItemId) -> Result<VpcEndpointStatus, anyhow::Error> {
let name = mz_cloud_resources::vpc_endpoint_name(id);
let endpoint = self.vpc_endpoint_api.get(&name).await?;