mz_cloud_resources/
vpc_endpoint.rs1use std::collections::BTreeMap;
11use std::convert::Infallible;
12use std::fmt::{self, Debug};
13use std::str::FromStr;
14use std::sync::Arc;
15
16use async_trait::async_trait;
17use chrono::DateTime;
18use futures::stream::BoxStream;
19use k8s_openapi::jiff::Timestamp;
20use mz_repr::CatalogItemId;
21use mz_repr::Row;
22use serde::{Deserialize, Serialize};
23
24use crate::crd::vpc_endpoint::v1::{VpcEndpointState, VpcEndpointStatus};
25
26#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
41pub struct AwsExternalIdPrefix(String);
42
43impl AwsExternalIdPrefix {
44 pub fn new_from_cli_argument_or_environment_variable(
52 aws_external_id_prefix: &str,
53 ) -> Result<AwsExternalIdPrefix, Infallible> {
54 Ok(AwsExternalIdPrefix(aws_external_id_prefix.into()))
55 }
56}
57
58impl fmt::Display for AwsExternalIdPrefix {
59 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
60 f.write_str(&self.0)
61 }
62}
63
64pub struct VpcEndpointConfig {
66 pub aws_service_name: String,
68 pub availability_zone_ids: Vec<String>,
70}
71
72#[async_trait]
73pub trait CloudResourceController: CloudResourceReader {
74 async fn ensure_vpc_endpoint(
76 &self,
77 id: CatalogItemId,
78 vpc_endpoint: VpcEndpointConfig,
79 ) -> Result<(), anyhow::Error>;
80
81 async fn delete_vpc_endpoint(&self, id: CatalogItemId) -> Result<(), anyhow::Error>;
83
84 async fn list_vpc_endpoints(
86 &self,
87 ) -> Result<BTreeMap<CatalogItemId, VpcEndpointStatus>, anyhow::Error>;
88
89 async fn watch_vpc_endpoints(&self) -> BoxStream<'static, VpcEndpointEvent>;
91
92 fn reader(&self) -> Arc<dyn CloudResourceReader>;
94}
95
96#[async_trait]
97pub trait CloudResourceReader: Debug + Send + Sync {
98 async fn read(&self, id: CatalogItemId) -> Result<VpcEndpointStatus, anyhow::Error>;
100}
101
102pub fn vpc_endpoint_name(id: CatalogItemId) -> String {
104 format!("connection-{id}")
107}
108
109pub fn id_from_vpc_endpoint_name(vpc_endpoint_name: &str) -> Option<CatalogItemId> {
111 vpc_endpoint_name
112 .split_once('-')
113 .and_then(|(_, id_str)| CatalogItemId::from_str(id_str).ok())
114}
115
116pub fn vpc_endpoint_host(id: CatalogItemId, availability_zone: Option<&str>) -> String {
119 let name = vpc_endpoint_name(id);
120 match availability_zone {
123 Some(az) => format!("{name}-{az}"),
124 None => name,
125 }
126}
127
128#[derive(Debug)]
129pub struct VpcEndpointEvent {
130 pub connection_id: CatalogItemId,
131 pub status: VpcEndpointState,
132 pub time: Timestamp,
133}
134
135impl From<VpcEndpointEvent> for Row {
136 fn from(value: VpcEndpointEvent) -> Self {
137 use mz_repr::Datum;
138
139 Row::pack_slice(&[
140 Datum::TimestampTz(
141 DateTime::from_timestamp_nanos(
142 value.time.as_nanosecond().try_into().expect("must fit"),
143 )
144 .try_into()
145 .expect("must fit"),
146 ),
147 Datum::String(&value.connection_id.to_string()),
148 Datum::String(&value.status.to_string()),
149 ])
150 }
151}