Skip to main content

mz_cloud_resources/
vpc_endpoint.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::convert::Infallible;
12use std::fmt::{self, Debug};
13use std::str::FromStr;
14use std::sync::Arc;
15
16use async_trait::async_trait;
17use chrono::DateTime;
18use futures::stream::BoxStream;
19use k8s_openapi::jiff::Timestamp;
20use mz_repr::CatalogItemId;
21use mz_repr::Row;
22use serde::{Deserialize, Serialize};
23
24use crate::crd::vpc_endpoint::v1::{VpcEndpointState, VpcEndpointStatus};
25
26/// A prefix for an [external ID] to use for all AWS AssumeRole operations. It
27/// should be concatenanted with a non-user-provided suffix identifying the
28/// source or sink. The ID used for the suffix should never be reused if the
29/// source or sink is deleted.
30///
31/// **WARNING:** it is critical for security that this ID is **not** provided by
32/// end users of Materialize. It must be provided by the operator of the
33/// Materialize service.
34///
35/// This type protects against accidental construction of an
36/// `AwsExternalIdPrefix` through the use of an unwieldy and overly descriptive
37/// constructor method name.
38///
39/// [external ID]: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html
40#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
41pub struct AwsExternalIdPrefix(String);
42
43impl AwsExternalIdPrefix {
44    /// Creates a new AWS external ID prefix from a command-line argument or
45    /// an environment variable.
46    ///
47    /// **WARNING:** it is critical for security that this ID is **not**
48    /// provided by end users of Materialize. It must be provided by the
49    /// operator of the Materialize service.
50    ///
51    pub fn new_from_cli_argument_or_environment_variable(
52        aws_external_id_prefix: &str,
53    ) -> Result<AwsExternalIdPrefix, Infallible> {
54        Ok(AwsExternalIdPrefix(aws_external_id_prefix.into()))
55    }
56}
57
58impl fmt::Display for AwsExternalIdPrefix {
59    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
60        f.write_str(&self.0)
61    }
62}
63
64/// Configures a VPC endpoint.
65pub struct VpcEndpointConfig {
66    /// The name of the service to connect to.
67    pub aws_service_name: String,
68    /// The IDs of the availability zones in which the service is available.
69    pub availability_zone_ids: Vec<String>,
70}
71
72#[async_trait]
73pub trait CloudResourceController: CloudResourceReader {
74    /// Creates or updates the specified `VpcEndpoint` Kubernetes object.
75    async fn ensure_vpc_endpoint(
76        &self,
77        id: CatalogItemId,
78        vpc_endpoint: VpcEndpointConfig,
79    ) -> Result<(), anyhow::Error>;
80
81    /// Deletes the specified `VpcEndpoint` Kubernetes object.
82    async fn delete_vpc_endpoint(&self, id: CatalogItemId) -> Result<(), anyhow::Error>;
83
84    /// Lists existing `VpcEndpoint` Kubernetes objects.
85    async fn list_vpc_endpoints(
86        &self,
87    ) -> Result<BTreeMap<CatalogItemId, VpcEndpointStatus>, anyhow::Error>;
88
89    /// Lists existing `VpcEndpoint` Kubernetes objects.
90    async fn watch_vpc_endpoints(&self) -> BoxStream<'static, VpcEndpointEvent>;
91
92    /// Returns a reader for the resources managed by this controller.
93    fn reader(&self) -> Arc<dyn CloudResourceReader>;
94}
95
96#[async_trait]
97pub trait CloudResourceReader: Debug + Send + Sync {
98    /// Reads the specified `VpcEndpoint` Kubernetes object.
99    async fn read(&self, id: CatalogItemId) -> Result<VpcEndpointStatus, anyhow::Error>;
100}
101
102/// Returns the name to use for the VPC endpoint with the given ID.
103pub fn vpc_endpoint_name(id: CatalogItemId) -> String {
104    // This is part of the contract with the VpcEndpointController in the
105    // cloud infrastructure layer.
106    format!("connection-{id}")
107}
108
109/// Attempts to return the ID used to create the given VPC endpoint name.
110pub fn id_from_vpc_endpoint_name(vpc_endpoint_name: &str) -> Option<CatalogItemId> {
111    vpc_endpoint_name
112        .split_once('-')
113        .and_then(|(_, id_str)| CatalogItemId::from_str(id_str).ok())
114}
115
116/// Returns the host to use for the VPC endpoint with the given ID and
117/// optionally in the given availability zone.
118pub fn vpc_endpoint_host(id: CatalogItemId, availability_zone: Option<&str>) -> String {
119    let name = vpc_endpoint_name(id);
120    // This naming scheme is part of the contract with the VpcEndpointController
121    // in the cloud infrastructure layer.
122    match availability_zone {
123        Some(az) => format!("{name}-{az}"),
124        None => name,
125    }
126}
127
128#[derive(Debug)]
129pub struct VpcEndpointEvent {
130    pub connection_id: CatalogItemId,
131    pub status: VpcEndpointState,
132    pub time: Timestamp,
133}
134
135impl From<VpcEndpointEvent> for Row {
136    fn from(value: VpcEndpointEvent) -> Self {
137        use mz_repr::Datum;
138
139        Row::pack_slice(&[
140            Datum::TimestampTz(
141                DateTime::from_timestamp_nanos(
142                    value.time.as_nanosecond().try_into().expect("must fit"),
143                )
144                .try_into()
145                .expect("must fit"),
146            ),
147            Datum::String(&value.connection_id.to_string()),
148            Datum::String(&value.status.to_string()),
149        ])
150    }
151}