mz_cloud_resources/
vpc_endpoint.rs1use std::collections::BTreeMap;
11use std::convert::Infallible;
12use std::fmt::{self, Debug};
13use std::str::FromStr;
14use std::sync::Arc;
15
16use async_trait::async_trait;
17use chrono::{DateTime, Utc};
18use futures::stream::BoxStream;
19use mz_repr::CatalogItemId;
20use mz_repr::Row;
21use serde::{Deserialize, Serialize};
22
23use crate::crd::vpc_endpoint::v1::{VpcEndpointState, VpcEndpointStatus};
24
25#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
40pub struct AwsExternalIdPrefix(String);
41
42impl AwsExternalIdPrefix {
43 pub fn new_from_cli_argument_or_environment_variable(
51 aws_external_id_prefix: &str,
52 ) -> Result<AwsExternalIdPrefix, Infallible> {
53 Ok(AwsExternalIdPrefix(aws_external_id_prefix.into()))
54 }
55}
56
57impl fmt::Display for AwsExternalIdPrefix {
58 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
59 f.write_str(&self.0)
60 }
61}
62
63pub struct VpcEndpointConfig {
65 pub aws_service_name: String,
67 pub availability_zone_ids: Vec<String>,
69}
70
71#[async_trait]
72pub trait CloudResourceController: CloudResourceReader {
73 async fn ensure_vpc_endpoint(
75 &self,
76 id: CatalogItemId,
77 vpc_endpoint: VpcEndpointConfig,
78 ) -> Result<(), anyhow::Error>;
79
80 async fn delete_vpc_endpoint(&self, id: CatalogItemId) -> Result<(), anyhow::Error>;
82
83 async fn list_vpc_endpoints(
85 &self,
86 ) -> Result<BTreeMap<CatalogItemId, VpcEndpointStatus>, anyhow::Error>;
87
88 async fn watch_vpc_endpoints(&self) -> BoxStream<'static, VpcEndpointEvent>;
90
91 fn reader(&self) -> Arc<dyn CloudResourceReader>;
93}
94
95#[async_trait]
96pub trait CloudResourceReader: Debug + Send + Sync {
97 async fn read(&self, id: CatalogItemId) -> Result<VpcEndpointStatus, anyhow::Error>;
99}
100
101pub fn vpc_endpoint_name(id: CatalogItemId) -> String {
103 format!("connection-{id}")
106}
107
108pub fn id_from_vpc_endpoint_name(vpc_endpoint_name: &str) -> Option<CatalogItemId> {
110 vpc_endpoint_name
111 .split_once('-')
112 .and_then(|(_, id_str)| CatalogItemId::from_str(id_str).ok())
113}
114
115pub fn vpc_endpoint_host(id: CatalogItemId, availability_zone: Option<&str>) -> String {
118 let name = vpc_endpoint_name(id);
119 match availability_zone {
122 Some(az) => format!("{name}-{az}"),
123 None => name,
124 }
125}
126
127#[derive(Debug)]
128pub struct VpcEndpointEvent {
129 pub connection_id: CatalogItemId,
130 pub status: VpcEndpointState,
131 pub time: DateTime<Utc>,
132}
133
134impl From<VpcEndpointEvent> for Row {
135 fn from(value: VpcEndpointEvent) -> Self {
136 use mz_repr::Datum;
137
138 Row::pack_slice(&[
139 Datum::TimestampTz(value.time.try_into().expect("must fit")),
140 Datum::String(&value.connection_id.to_string()),
141 Datum::String(&value.status.to_string()),
142 ])
143 }
144}