1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910use std::collections::BTreeMap;
11use std::convert::Infallible;
12use std::fmt::{self, Debug};
13use std::str::FromStr;
14use std::sync::Arc;
1516use async_trait::async_trait;
17use chrono::{DateTime, Utc};
18use futures::stream::BoxStream;
19use mz_repr::CatalogItemId;
20use mz_repr::Row;
21use serde::{Deserialize, Serialize};
2223use crate::crd::vpc_endpoint::v1::{VpcEndpointState, VpcEndpointStatus};
2425/// A prefix for an [external ID] to use for all AWS AssumeRole operations. It
26/// should be concatenanted with a non-user-provided suffix identifying the
27/// source or sink. The ID used for the suffix should never be reused if the
28/// source or sink is deleted.
29///
30/// **WARNING:** it is critical for security that this ID is **not** provided by
31/// end users of Materialize. It must be provided by the operator of the
32/// Materialize service.
33///
34/// This type protects against accidental construction of an
35/// `AwsExternalIdPrefix` through the use of an unwieldy and overly descriptive
36/// constructor method name.
37///
38/// [external ID]: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html
39#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
40pub struct AwsExternalIdPrefix(String);
4142impl AwsExternalIdPrefix {
43/// Creates a new AWS external ID prefix from a command-line argument or
44 /// an environment variable.
45 ///
46 /// **WARNING:** it is critical for security that this ID is **not**
47 /// provided by end users of Materialize. It must be provided by the
48 /// operator of the Materialize service.
49 ///
50pub fn new_from_cli_argument_or_environment_variable(
51 aws_external_id_prefix: &str,
52 ) -> Result<AwsExternalIdPrefix, Infallible> {
53Ok(AwsExternalIdPrefix(aws_external_id_prefix.into()))
54 }
55}
5657impl fmt::Display for AwsExternalIdPrefix {
58fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
59 f.write_str(&self.0)
60 }
61}
6263/// Configures a VPC endpoint.
64pub struct VpcEndpointConfig {
65/// The name of the service to connect to.
66pub aws_service_name: String,
67/// The IDs of the availability zones in which the service is available.
68pub availability_zone_ids: Vec<String>,
69}
7071#[async_trait]
72pub trait CloudResourceController: CloudResourceReader {
73/// Creates or updates the specified `VpcEndpoint` Kubernetes object.
74async fn ensure_vpc_endpoint(
75&self,
76 id: CatalogItemId,
77 vpc_endpoint: VpcEndpointConfig,
78 ) -> Result<(), anyhow::Error>;
7980/// Deletes the specified `VpcEndpoint` Kubernetes object.
81async fn delete_vpc_endpoint(&self, id: CatalogItemId) -> Result<(), anyhow::Error>;
8283/// Lists existing `VpcEndpoint` Kubernetes objects.
84async fn list_vpc_endpoints(
85&self,
86 ) -> Result<BTreeMap<CatalogItemId, VpcEndpointStatus>, anyhow::Error>;
8788/// Lists existing `VpcEndpoint` Kubernetes objects.
89async fn watch_vpc_endpoints(&self) -> BoxStream<'static, VpcEndpointEvent>;
9091/// Returns a reader for the resources managed by this controller.
92fn reader(&self) -> Arc<dyn CloudResourceReader>;
93}
9495#[async_trait]
96pub trait CloudResourceReader: Debug + Send + Sync {
97/// Reads the specified `VpcEndpoint` Kubernetes object.
98async fn read(&self, id: CatalogItemId) -> Result<VpcEndpointStatus, anyhow::Error>;
99}
100101/// Returns the name to use for the VPC endpoint with the given ID.
102pub fn vpc_endpoint_name(id: CatalogItemId) -> String {
103// This is part of the contract with the VpcEndpointController in the
104 // cloud infrastructure layer.
105format!("connection-{id}")
106}
107108/// Attempts to return the ID used to create the given VPC endpoint name.
109pub fn id_from_vpc_endpoint_name(vpc_endpoint_name: &str) -> Option<CatalogItemId> {
110 vpc_endpoint_name
111 .split_once('-')
112 .and_then(|(_, id_str)| CatalogItemId::from_str(id_str).ok())
113}
114115/// Returns the host to use for the VPC endpoint with the given ID and
116/// optionally in the given availability zone.
117pub fn vpc_endpoint_host(id: CatalogItemId, availability_zone: Option<&str>) -> String {
118let name = vpc_endpoint_name(id);
119// This naming scheme is part of the contract with the VpcEndpointController
120 // in the cloud infrastructure layer.
121match availability_zone {
122Some(az) => format!("{name}-{az}"),
123None => name,
124 }
125}
126127#[derive(Debug)]
128pub struct VpcEndpointEvent {
129pub connection_id: CatalogItemId,
130pub status: VpcEndpointState,
131pub time: DateTime<Utc>,
132}
133134impl From<VpcEndpointEvent> for Row {
135fn from(value: VpcEndpointEvent) -> Self {
136use mz_repr::Datum;
137138 Row::pack_slice(&[
139 Datum::TimestampTz(value.time.try_into().expect("must fit")),
140 Datum::String(&value.connection_id.to_string()),
141 Datum::String(&value.status.to_string()),
142 ])
143 }
144}