1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! Abstractions for management of cloud resources that have no equivalent when running
//! locally, like AWS PrivateLink endpoints.
use std::collections::BTreeMap;
use std::fmt::{self, Debug};
use std::str::FromStr;
use std::sync::Arc;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use futures::stream::BoxStream;
use mz_repr::CatalogItemId;
use mz_repr::Row;
use serde::{Deserialize, Serialize};
use crate::crd::vpc_endpoint::v1::{VpcEndpointState, VpcEndpointStatus};
pub mod crd;
/// A prefix for an [external ID] to use for all AWS AssumeRole operations. It
/// should be concatenanted with a non-user-provided suffix identifying the
/// source or sink. The ID used for the suffix should never be reused if the
/// source or sink is deleted.
///
/// **WARNING:** it is critical for security that this ID is **not** provided by
/// end users of Materialize. It must be provided by the operator of the
/// Materialize service.
///
/// This type protects against accidental construction of an
/// `AwsExternalIdPrefix` through the use of an unwieldy and overly descriptive
/// constructor method name.
///
/// [external ID]: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct AwsExternalIdPrefix(String);
impl AwsExternalIdPrefix {
/// Creates a new AWS external ID prefix from a command-line argument or
/// an environment variable.
///
/// **WARNING:** it is critical for security that this ID is **not**
/// provided by end users of Materialize. It must be provided by the
/// operator of the Materialize service.
///
pub fn new_from_cli_argument_or_environment_variable(
aws_external_id_prefix: &str,
) -> AwsExternalIdPrefix {
AwsExternalIdPrefix(aws_external_id_prefix.into())
}
}
impl fmt::Display for AwsExternalIdPrefix {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str(&self.0)
}
}
/// Configures a VPC endpoint.
pub struct VpcEndpointConfig {
/// The name of the service to connect to.
pub aws_service_name: String,
/// The IDs of the availability zones in which the service is available.
pub availability_zone_ids: Vec<String>,
}
#[async_trait]
pub trait CloudResourceController: CloudResourceReader {
/// Creates or updates the specified `VpcEndpoint` Kubernetes object.
async fn ensure_vpc_endpoint(
&self,
id: CatalogItemId,
vpc_endpoint: VpcEndpointConfig,
) -> Result<(), anyhow::Error>;
/// Deletes the specified `VpcEndpoint` Kubernetes object.
async fn delete_vpc_endpoint(&self, id: CatalogItemId) -> Result<(), anyhow::Error>;
/// Lists existing `VpcEndpoint` Kubernetes objects.
async fn list_vpc_endpoints(
&self,
) -> Result<BTreeMap<CatalogItemId, VpcEndpointStatus>, anyhow::Error>;
/// Lists existing `VpcEndpoint` Kubernetes objects.
async fn watch_vpc_endpoints(&self) -> BoxStream<'static, VpcEndpointEvent>;
/// Returns a reader for the resources managed by this controller.
fn reader(&self) -> Arc<dyn CloudResourceReader>;
}
#[async_trait]
pub trait CloudResourceReader: Debug + Send + Sync {
/// Reads the specified `VpcEndpoint` Kubernetes object.
async fn read(&self, id: CatalogItemId) -> Result<VpcEndpointStatus, anyhow::Error>;
}
/// Returns the name to use for the VPC endpoint with the given ID.
pub fn vpc_endpoint_name(id: CatalogItemId) -> String {
// This is part of the contract with the VpcEndpointController in the
// cloud infrastructure layer.
format!("connection-{id}")
}
/// Attempts to return the ID used to create the given VPC endpoint name.
pub fn id_from_vpc_endpoint_name(vpc_endpoint_name: &str) -> Option<CatalogItemId> {
vpc_endpoint_name
.split_once('-')
.and_then(|(_, id_str)| CatalogItemId::from_str(id_str).ok())
}
/// Returns the host to use for the VPC endpoint with the given ID and
/// optionally in the given availability zone.
pub fn vpc_endpoint_host(id: CatalogItemId, availability_zone: Option<&str>) -> String {
let name = vpc_endpoint_name(id);
// This naming scheme is part of the contract with the VpcEndpointController
// in the cloud infrastructure layer.
match availability_zone {
Some(az) => format!("{name}-{az}"),
None => name,
}
}
#[derive(Debug)]
pub struct VpcEndpointEvent {
pub connection_id: CatalogItemId,
pub status: VpcEndpointState,
pub time: DateTime<Utc>,
}
impl From<VpcEndpointEvent> for Row {
fn from(value: VpcEndpointEvent) -> Self {
use mz_repr::Datum;
Row::pack_slice(&[
Datum::TimestampTz(value.time.try_into().expect("must fit")),
Datum::String(&value.connection_id.to_string()),
Datum::String(&value.status.to_string()),
])
}
}