mz_cloud_resources/
vpc_endpoint.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::convert::Infallible;
12use std::fmt::{self, Debug};
13use std::str::FromStr;
14use std::sync::Arc;
15
16use async_trait::async_trait;
17use chrono::{DateTime, Utc};
18use futures::stream::BoxStream;
19use mz_repr::CatalogItemId;
20use mz_repr::Row;
21use serde::{Deserialize, Serialize};
22
23use crate::crd::vpc_endpoint::v1::{VpcEndpointState, VpcEndpointStatus};
24
25/// A prefix for an [external ID] to use for all AWS AssumeRole operations. It
26/// should be concatenanted with a non-user-provided suffix identifying the
27/// source or sink. The ID used for the suffix should never be reused if the
28/// source or sink is deleted.
29///
30/// **WARNING:** it is critical for security that this ID is **not** provided by
31/// end users of Materialize. It must be provided by the operator of the
32/// Materialize service.
33///
34/// This type protects against accidental construction of an
35/// `AwsExternalIdPrefix` through the use of an unwieldy and overly descriptive
36/// constructor method name.
37///
38/// [external ID]: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html
39#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
40pub struct AwsExternalIdPrefix(String);
41
42impl AwsExternalIdPrefix {
43    /// Creates a new AWS external ID prefix from a command-line argument or
44    /// an environment variable.
45    ///
46    /// **WARNING:** it is critical for security that this ID is **not**
47    /// provided by end users of Materialize. It must be provided by the
48    /// operator of the Materialize service.
49    ///
50    pub fn new_from_cli_argument_or_environment_variable(
51        aws_external_id_prefix: &str,
52    ) -> Result<AwsExternalIdPrefix, Infallible> {
53        Ok(AwsExternalIdPrefix(aws_external_id_prefix.into()))
54    }
55}
56
57impl fmt::Display for AwsExternalIdPrefix {
58    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
59        f.write_str(&self.0)
60    }
61}
62
63/// Configures a VPC endpoint.
64pub struct VpcEndpointConfig {
65    /// The name of the service to connect to.
66    pub aws_service_name: String,
67    /// The IDs of the availability zones in which the service is available.
68    pub availability_zone_ids: Vec<String>,
69}
70
71#[async_trait]
72pub trait CloudResourceController: CloudResourceReader {
73    /// Creates or updates the specified `VpcEndpoint` Kubernetes object.
74    async fn ensure_vpc_endpoint(
75        &self,
76        id: CatalogItemId,
77        vpc_endpoint: VpcEndpointConfig,
78    ) -> Result<(), anyhow::Error>;
79
80    /// Deletes the specified `VpcEndpoint` Kubernetes object.
81    async fn delete_vpc_endpoint(&self, id: CatalogItemId) -> Result<(), anyhow::Error>;
82
83    /// Lists existing `VpcEndpoint` Kubernetes objects.
84    async fn list_vpc_endpoints(
85        &self,
86    ) -> Result<BTreeMap<CatalogItemId, VpcEndpointStatus>, anyhow::Error>;
87
88    /// Lists existing `VpcEndpoint` Kubernetes objects.
89    async fn watch_vpc_endpoints(&self) -> BoxStream<'static, VpcEndpointEvent>;
90
91    /// Returns a reader for the resources managed by this controller.
92    fn reader(&self) -> Arc<dyn CloudResourceReader>;
93}
94
95#[async_trait]
96pub trait CloudResourceReader: Debug + Send + Sync {
97    /// Reads the specified `VpcEndpoint` Kubernetes object.
98    async fn read(&self, id: CatalogItemId) -> Result<VpcEndpointStatus, anyhow::Error>;
99}
100
101/// Returns the name to use for the VPC endpoint with the given ID.
102pub fn vpc_endpoint_name(id: CatalogItemId) -> String {
103    // This is part of the contract with the VpcEndpointController in the
104    // cloud infrastructure layer.
105    format!("connection-{id}")
106}
107
108/// Attempts to return the ID used to create the given VPC endpoint name.
109pub fn id_from_vpc_endpoint_name(vpc_endpoint_name: &str) -> Option<CatalogItemId> {
110    vpc_endpoint_name
111        .split_once('-')
112        .and_then(|(_, id_str)| CatalogItemId::from_str(id_str).ok())
113}
114
115/// Returns the host to use for the VPC endpoint with the given ID and
116/// optionally in the given availability zone.
117pub fn vpc_endpoint_host(id: CatalogItemId, availability_zone: Option<&str>) -> String {
118    let name = vpc_endpoint_name(id);
119    // This naming scheme is part of the contract with the VpcEndpointController
120    // in the cloud infrastructure layer.
121    match availability_zone {
122        Some(az) => format!("{name}-{az}"),
123        None => name,
124    }
125}
126
127#[derive(Debug)]
128pub struct VpcEndpointEvent {
129    pub connection_id: CatalogItemId,
130    pub status: VpcEndpointState,
131    pub time: DateTime<Utc>,
132}
133
134impl From<VpcEndpointEvent> for Row {
135    fn from(value: VpcEndpointEvent) -> Self {
136        use mz_repr::Datum;
137
138        Row::pack_slice(&[
139            Datum::TimestampTz(value.time.try_into().expect("must fit")),
140            Datum::String(&value.connection_id.to_string()),
141            Datum::String(&value.status.to_string()),
142        ])
143    }
144}