Skip to main content

mz_storage_types/
connections.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Connection types.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::net::SocketAddr;
15use std::sync::Arc;
16
17use anyhow::{Context, anyhow};
18use async_trait::async_trait;
19use aws_credential_types::provider::ProvideCredentials;
20use iceberg::Catalog;
21use iceberg::CatalogBuilder;
22use iceberg::io::{
23    AwsCredential, AwsCredentialLoad, CustomAwsCredentialLoader, S3_ACCESS_KEY_ID,
24    S3_DISABLE_EC2_METADATA, S3_REGION, S3_SECRET_ACCESS_KEY,
25};
26use iceberg_catalog_rest::{
27    REST_CATALOG_PROP_URI, REST_CATALOG_PROP_WAREHOUSE, RestCatalogBuilder,
28};
29use itertools::Itertools;
30use mz_ccsr::tls::{Certificate, Identity};
31use mz_cloud_resources::{AwsExternalIdPrefix, CloudResourceReader, vpc_endpoint_host};
32use mz_dyncfg::ConfigSet;
33use mz_kafka_util::client::{
34    BrokerAddr, BrokerRewrite, MzClientContext, MzKafkaError, TunnelConfig, TunnelingClientContext,
35};
36use mz_mysql_util::{MySqlConn, MySqlError};
37use mz_ore::assert_none;
38use mz_ore::error::ErrorExt;
39use mz_ore::future::{InTask, OreFutureExt};
40use mz_ore::netio::resolve_address;
41use mz_ore::num::NonNeg;
42use mz_repr::{CatalogItemId, GlobalId};
43use mz_secrets::SecretsReader;
44use mz_ssh_util::keys::SshKeyPair;
45use mz_ssh_util::tunnel::SshTunnelConfig;
46use mz_ssh_util::tunnel_manager::{ManagedSshTunnelHandle, SshTunnelManager};
47use mz_tracing::CloneableEnvFilter;
48use rdkafka::ClientContext;
49use rdkafka::config::FromClientConfigAndContext;
50use rdkafka::consumer::{BaseConsumer, Consumer};
51use regex::Regex;
52use serde::{Deserialize, Deserializer, Serialize};
53use tokio::net;
54use tokio::runtime::Handle;
55use tokio_postgres::config::SslMode;
56use tracing::{debug, warn};
57use url::Url;
58
59use crate::AlterCompatible;
60use crate::configuration::StorageConfiguration;
61use crate::connections::aws::{
62    AwsAuth, AwsConnection, AwsConnectionReference, AwsConnectionValidationError,
63};
64use crate::connections::string_or_secret::StringOrSecret;
65use crate::controller::AlterError;
66use crate::dyncfgs::{
67    ENFORCE_EXTERNAL_ADDRESSES, KAFKA_CLIENT_ID_ENRICHMENT_RULES,
68    KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM, KAFKA_RECONNECT_BACKOFF,
69    KAFKA_RECONNECT_BACKOFF_MAX, KAFKA_RETRY_BACKOFF, KAFKA_RETRY_BACKOFF_MAX,
70};
71use crate::errors::{ContextCreationError, CsrConnectError};
72
73pub mod aws;
74pub mod inline;
75pub mod string_or_secret;
76
77const REST_CATALOG_PROP_SCOPE: &str = "scope";
78const REST_CATALOG_PROP_CREDENTIAL: &str = "credential";
79
80/// A credential loader that wraps an aws-sdk-rust credentials provider for use with
81/// iceberg/OpenDAL. This allows us to provide refreshable credentials from the AWS SDK
82/// credential chain (including the full assume role chain) to OpenDAL's S3 implementation.
83///
84/// We use this instead of OpenDAL's built-in assume role support because Materialize
85/// has a runtime-defined credential chain (ambient → jump role → user role with external ID)
86/// that can't be expressed via OpenDAL's static configuration properties.
87struct AwsSdkCredentialLoader {
88    /// The underlying AWS SDK credentials provider. For assume role auth, this provider
89    /// already handles the full chain: ambient creds -> jump role -> user role.
90    provider: aws_credential_types::provider::SharedCredentialsProvider,
91}
92
93impl AwsSdkCredentialLoader {
94    fn new(provider: aws_credential_types::provider::SharedCredentialsProvider) -> Self {
95        Self { provider }
96    }
97}
98
99#[async_trait]
100impl AwsCredentialLoad for AwsSdkCredentialLoader {
101    async fn load_credential(
102        &self,
103        _client: reqwest::Client,
104    ) -> anyhow::Result<Option<AwsCredential>> {
105        let creds = self
106            .provider
107            .provide_credentials()
108            .await
109            .map_err(|e| {
110                warn!(
111                    error = %e.display_with_causes(),
112                    "failed to load AWS credentials for Iceberg FileIO from SDK provider"
113                );
114                e
115            })
116            .context(
117                "failed to load AWS credentials from SDK provider for Iceberg FileIO \
118                 (credential source may be temporarily unavailable)",
119            )?;
120
121        Ok(Some(AwsCredential {
122            access_key_id: creds.access_key_id().to_string(),
123            secret_access_key: creds.secret_access_key().to_string(),
124            session_token: creds.session_token().map(|s| s.to_string()),
125            expires_in: creds.expiry().map(|t| t.into()),
126        }))
127    }
128}
129
130/// An extension trait for [`SecretsReader`]
131#[async_trait::async_trait]
132trait SecretsReaderExt {
133    /// `SecretsReader::read`, but optionally run in a task.
134    async fn read_in_task_if(
135        &self,
136        in_task: InTask,
137        id: CatalogItemId,
138    ) -> Result<Vec<u8>, anyhow::Error>;
139
140    /// `SecretsReader::read_string`, but optionally run in a task.
141    async fn read_string_in_task_if(
142        &self,
143        in_task: InTask,
144        id: CatalogItemId,
145    ) -> Result<String, anyhow::Error>;
146}
147
148#[async_trait::async_trait]
149impl SecretsReaderExt for Arc<dyn SecretsReader> {
150    async fn read_in_task_if(
151        &self,
152        in_task: InTask,
153        id: CatalogItemId,
154    ) -> Result<Vec<u8>, anyhow::Error> {
155        let sr = Arc::clone(self);
156        async move { sr.read(id).await }
157            .run_in_task_if(in_task, || "secrets_reader_read".to_string())
158            .await
159    }
160    async fn read_string_in_task_if(
161        &self,
162        in_task: InTask,
163        id: CatalogItemId,
164    ) -> Result<String, anyhow::Error> {
165        let sr = Arc::clone(self);
166        async move { sr.read_string(id).await }
167            .run_in_task_if(in_task, || "secrets_reader_read".to_string())
168            .await
169    }
170}
171
172/// Extra context to pass through when instantiating a connection for a source
173/// or sink.
174///
175/// Should be kept cheaply cloneable.
176#[derive(Debug, Clone)]
177pub struct ConnectionContext {
178    /// An opaque identifier for the environment in which this process is
179    /// running.
180    ///
181    /// The storage layer is intentionally unaware of the structure within this
182    /// identifier. Higher layers of the stack can make use of that structure,
183    /// but the storage layer should be oblivious to it.
184    pub environment_id: String,
185    /// The level for librdkafka's logs.
186    pub librdkafka_log_level: tracing::Level,
187    /// A prefix for an external ID to use for all AWS AssumeRole operations.
188    pub aws_external_id_prefix: Option<AwsExternalIdPrefix>,
189    /// The ARN for a Materialize-controlled role to assume before assuming
190    /// a customer's requested role for an AWS connection.
191    pub aws_connection_role_arn: Option<String>,
192    /// A secrets reader.
193    pub secrets_reader: Arc<dyn SecretsReader>,
194    /// A cloud resource reader, if supported in this configuration.
195    pub cloud_resource_reader: Option<Arc<dyn CloudResourceReader>>,
196    /// A manager for SSH tunnels.
197    pub ssh_tunnel_manager: SshTunnelManager,
198}
199
200impl ConnectionContext {
201    /// Constructs a new connection context from command line arguments.
202    ///
203    /// **WARNING:** it is critical for security that the `aws_external_id` be
204    /// provided by the operator of the Materialize service (i.e., via a CLI
205    /// argument or environment variable) and not the end user of Materialize
206    /// (e.g., via a configuration option in a SQL statement). See
207    /// [`AwsExternalIdPrefix`] for details.
208    pub fn from_cli_args(
209        environment_id: String,
210        startup_log_level: &CloneableEnvFilter,
211        aws_external_id_prefix: Option<AwsExternalIdPrefix>,
212        aws_connection_role_arn: Option<String>,
213        secrets_reader: Arc<dyn SecretsReader>,
214        cloud_resource_reader: Option<Arc<dyn CloudResourceReader>>,
215    ) -> ConnectionContext {
216        ConnectionContext {
217            environment_id,
218            librdkafka_log_level: mz_ore::tracing::crate_level(
219                &startup_log_level.clone().into(),
220                "librdkafka",
221            ),
222            aws_external_id_prefix,
223            aws_connection_role_arn,
224            secrets_reader,
225            cloud_resource_reader,
226            ssh_tunnel_manager: SshTunnelManager::default(),
227        }
228    }
229
230    /// Constructs a new connection context for usage in tests.
231    pub fn for_tests(secrets_reader: Arc<dyn SecretsReader>) -> ConnectionContext {
232        ConnectionContext {
233            environment_id: "test-environment-id".into(),
234            librdkafka_log_level: tracing::Level::INFO,
235            aws_external_id_prefix: Some(
236                AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable(
237                    "test-aws-external-id-prefix",
238                )
239                .expect("infallible"),
240            ),
241            aws_connection_role_arn: Some(
242                "arn:aws:iam::123456789000:role/MaterializeConnection".into(),
243            ),
244            secrets_reader,
245            cloud_resource_reader: None,
246            ssh_tunnel_manager: SshTunnelManager::default(),
247        }
248    }
249}
250
251#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
252pub enum Connection<C: ConnectionAccess = InlinedConnection> {
253    Kafka(KafkaConnection<C>),
254    Csr(CsrConnection<C>),
255    Postgres(PostgresConnection<C>),
256    Ssh(SshConnection),
257    Aws(AwsConnection),
258    AwsPrivatelink(AwsPrivatelinkConnection),
259    MySql(MySqlConnection<C>),
260    SqlServer(SqlServerConnectionDetails<C>),
261    IcebergCatalog(IcebergCatalogConnection<C>),
262}
263
264impl<R: ConnectionResolver> IntoInlineConnection<Connection, R>
265    for Connection<ReferencedConnection>
266{
267    fn into_inline_connection(self, r: R) -> Connection {
268        match self {
269            Connection::Kafka(kafka) => Connection::Kafka(kafka.into_inline_connection(r)),
270            Connection::Csr(csr) => Connection::Csr(csr.into_inline_connection(r)),
271            Connection::Postgres(pg) => Connection::Postgres(pg.into_inline_connection(r)),
272            Connection::Ssh(ssh) => Connection::Ssh(ssh),
273            Connection::Aws(aws) => Connection::Aws(aws),
274            Connection::AwsPrivatelink(awspl) => Connection::AwsPrivatelink(awspl),
275            Connection::MySql(mysql) => Connection::MySql(mysql.into_inline_connection(r)),
276            Connection::SqlServer(sql_server) => {
277                Connection::SqlServer(sql_server.into_inline_connection(r))
278            }
279            Connection::IcebergCatalog(iceberg) => {
280                Connection::IcebergCatalog(iceberg.into_inline_connection(r))
281            }
282        }
283    }
284}
285
286impl<C: ConnectionAccess> Connection<C> {
287    /// Whether this connection should be validated by default on creation.
288    pub fn validate_by_default(&self) -> bool {
289        match self {
290            Connection::Kafka(conn) => conn.validate_by_default(),
291            Connection::Csr(conn) => conn.validate_by_default(),
292            Connection::Postgres(conn) => conn.validate_by_default(),
293            Connection::Ssh(conn) => conn.validate_by_default(),
294            Connection::Aws(conn) => conn.validate_by_default(),
295            Connection::AwsPrivatelink(conn) => conn.validate_by_default(),
296            Connection::MySql(conn) => conn.validate_by_default(),
297            Connection::SqlServer(conn) => conn.validate_by_default(),
298            Connection::IcebergCatalog(conn) => conn.validate_by_default(),
299        }
300    }
301}
302
303impl Connection<InlinedConnection> {
304    /// Validates this connection by attempting to connect to the upstream system.
305    pub async fn validate(
306        &self,
307        id: CatalogItemId,
308        storage_configuration: &StorageConfiguration,
309    ) -> Result<(), ConnectionValidationError> {
310        match self {
311            Connection::Kafka(conn) => conn.validate(id, storage_configuration).await?,
312            Connection::Csr(conn) => conn.validate(id, storage_configuration).await?,
313            Connection::Postgres(conn) => {
314                conn.validate(id, storage_configuration).await?;
315            }
316            Connection::Ssh(conn) => conn.validate(id, storage_configuration).await?,
317            Connection::Aws(conn) => conn.validate(id, storage_configuration).await?,
318            Connection::AwsPrivatelink(conn) => conn.validate(id, storage_configuration).await?,
319            Connection::MySql(conn) => {
320                conn.validate(id, storage_configuration).await?;
321            }
322            Connection::SqlServer(conn) => {
323                conn.validate(id, storage_configuration).await?;
324            }
325            Connection::IcebergCatalog(conn) => conn.validate(id, storage_configuration).await?,
326        }
327        Ok(())
328    }
329
330    pub fn unwrap_kafka(self) -> <InlinedConnection as ConnectionAccess>::Kafka {
331        match self {
332            Self::Kafka(conn) => conn,
333            o => unreachable!("{o:?} is not a Kafka connection"),
334        }
335    }
336
337    pub fn unwrap_pg(self) -> <InlinedConnection as ConnectionAccess>::Pg {
338        match self {
339            Self::Postgres(conn) => conn,
340            o => unreachable!("{o:?} is not a Postgres connection"),
341        }
342    }
343
344    pub fn unwrap_mysql(self) -> <InlinedConnection as ConnectionAccess>::MySql {
345        match self {
346            Self::MySql(conn) => conn,
347            o => unreachable!("{o:?} is not a MySQL connection"),
348        }
349    }
350
351    pub fn unwrap_sql_server(self) -> <InlinedConnection as ConnectionAccess>::SqlServer {
352        match self {
353            Self::SqlServer(conn) => conn,
354            o => unreachable!("{o:?} is not a SQL Server connection"),
355        }
356    }
357
358    pub fn unwrap_aws(self) -> <InlinedConnection as ConnectionAccess>::Aws {
359        match self {
360            Self::Aws(conn) => conn,
361            o => unreachable!("{o:?} is not an AWS connection"),
362        }
363    }
364
365    pub fn unwrap_ssh(self) -> <InlinedConnection as ConnectionAccess>::Ssh {
366        match self {
367            Self::Ssh(conn) => conn,
368            o => unreachable!("{o:?} is not an SSH connection"),
369        }
370    }
371
372    pub fn unwrap_csr(self) -> <InlinedConnection as ConnectionAccess>::Csr {
373        match self {
374            Self::Csr(conn) => conn,
375            o => unreachable!("{o:?} is not a Kafka connection"),
376        }
377    }
378
379    pub fn unwrap_iceberg_catalog(self) -> <InlinedConnection as ConnectionAccess>::IcebergCatalog {
380        match self {
381            Self::IcebergCatalog(conn) => conn,
382            o => unreachable!("{o:?} is not an Iceberg catalog connection"),
383        }
384    }
385}
386
387/// An error returned by [`Connection::validate`].
388#[derive(thiserror::Error, Debug)]
389pub enum ConnectionValidationError {
390    #[error(transparent)]
391    Postgres(#[from] PostgresConnectionValidationError),
392    #[error(transparent)]
393    MySql(#[from] MySqlConnectionValidationError),
394    #[error(transparent)]
395    SqlServer(#[from] SqlServerConnectionValidationError),
396    #[error(transparent)]
397    Aws(#[from] AwsConnectionValidationError),
398    #[error("{}", .0.display_with_causes())]
399    Other(#[from] anyhow::Error),
400}
401
402impl ConnectionValidationError {
403    /// Reports additional details about the error, if any are available.
404    pub fn detail(&self) -> Option<String> {
405        match self {
406            ConnectionValidationError::Postgres(e) => e.detail(),
407            ConnectionValidationError::MySql(e) => e.detail(),
408            ConnectionValidationError::SqlServer(e) => e.detail(),
409            ConnectionValidationError::Aws(e) => e.detail(),
410            ConnectionValidationError::Other(_) => None,
411        }
412    }
413
414    /// Reports a hint for the user about how the error could be fixed.
415    pub fn hint(&self) -> Option<String> {
416        match self {
417            ConnectionValidationError::Postgres(e) => e.hint(),
418            ConnectionValidationError::MySql(e) => e.hint(),
419            ConnectionValidationError::SqlServer(e) => e.hint(),
420            ConnectionValidationError::Aws(e) => e.hint(),
421            ConnectionValidationError::Other(_) => None,
422        }
423    }
424}
425
426impl<C: ConnectionAccess> AlterCompatible for Connection<C> {
427    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
428        match (self, other) {
429            (Self::Aws(s), Self::Aws(o)) => s.alter_compatible(id, o),
430            (Self::AwsPrivatelink(s), Self::AwsPrivatelink(o)) => s.alter_compatible(id, o),
431            (Self::Ssh(s), Self::Ssh(o)) => s.alter_compatible(id, o),
432            (Self::Csr(s), Self::Csr(o)) => s.alter_compatible(id, o),
433            (Self::Kafka(s), Self::Kafka(o)) => s.alter_compatible(id, o),
434            (Self::Postgres(s), Self::Postgres(o)) => s.alter_compatible(id, o),
435            (Self::MySql(s), Self::MySql(o)) => s.alter_compatible(id, o),
436            _ => {
437                tracing::warn!(
438                    "Connection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
439                    self,
440                    other
441                );
442                Err(AlterError { id })
443            }
444        }
445    }
446}
447
448#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
449pub struct RestIcebergCatalog {
450    /// For REST catalogs, the oauth2 credential in a `CLIENT_ID:CLIENT_SECRET` format
451    pub credential: StringOrSecret,
452    /// The oauth2 scope for REST catalogs
453    pub scope: Option<String>,
454    /// The warehouse for REST catalogs
455    pub warehouse: Option<String>,
456}
457
458#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
459pub struct S3TablesRestIcebergCatalog<C: ConnectionAccess = InlinedConnection> {
460    /// The AWS connection details, for s3tables
461    pub aws_connection: AwsConnectionReference<C>,
462    /// The warehouse for s3tables
463    pub warehouse: String,
464}
465
466impl<R: ConnectionResolver> IntoInlineConnection<S3TablesRestIcebergCatalog, R>
467    for S3TablesRestIcebergCatalog<ReferencedConnection>
468{
469    fn into_inline_connection(self, r: R) -> S3TablesRestIcebergCatalog {
470        S3TablesRestIcebergCatalog {
471            aws_connection: self.aws_connection.into_inline_connection(&r),
472            warehouse: self.warehouse,
473        }
474    }
475}
476
477#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
478pub enum IcebergCatalogType {
479    Rest,
480    S3TablesRest,
481}
482
483#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
484pub enum IcebergCatalogImpl<C: ConnectionAccess = InlinedConnection> {
485    Rest(RestIcebergCatalog),
486    S3TablesRest(S3TablesRestIcebergCatalog<C>),
487}
488
489impl<R: ConnectionResolver> IntoInlineConnection<IcebergCatalogImpl, R>
490    for IcebergCatalogImpl<ReferencedConnection>
491{
492    fn into_inline_connection(self, r: R) -> IcebergCatalogImpl {
493        match self {
494            IcebergCatalogImpl::Rest(rest) => IcebergCatalogImpl::Rest(rest),
495            IcebergCatalogImpl::S3TablesRest(s3tables) => {
496                IcebergCatalogImpl::S3TablesRest(s3tables.into_inline_connection(r))
497            }
498        }
499    }
500}
501
502#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
503pub struct IcebergCatalogConnection<C: ConnectionAccess = InlinedConnection> {
504    /// The catalog impl impl of that catalog
505    pub catalog: IcebergCatalogImpl<C>,
506    /// Where the catalog is located
507    pub uri: reqwest::Url,
508}
509
510impl AlterCompatible for IcebergCatalogConnection {
511    fn alter_compatible(&self, id: GlobalId, _other: &Self) -> Result<(), AlterError> {
512        Err(AlterError { id })
513    }
514}
515
516impl<R: ConnectionResolver> IntoInlineConnection<IcebergCatalogConnection, R>
517    for IcebergCatalogConnection<ReferencedConnection>
518{
519    fn into_inline_connection(self, r: R) -> IcebergCatalogConnection {
520        IcebergCatalogConnection {
521            catalog: self.catalog.into_inline_connection(&r),
522            uri: self.uri,
523        }
524    }
525}
526
527impl<C: ConnectionAccess> IcebergCatalogConnection<C> {
528    fn validate_by_default(&self) -> bool {
529        true
530    }
531}
532
533impl IcebergCatalogConnection<InlinedConnection> {
534    pub async fn connect(
535        &self,
536        storage_configuration: &StorageConfiguration,
537        in_task: InTask,
538    ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
539        match self.catalog {
540            IcebergCatalogImpl::S3TablesRest(ref s3tables) => {
541                self.connect_s3tables(s3tables, storage_configuration, in_task)
542                    .await
543            }
544            IcebergCatalogImpl::Rest(ref rest) => {
545                self.connect_rest(rest, storage_configuration, in_task)
546                    .await
547            }
548        }
549    }
550
551    pub fn catalog_type(&self) -> IcebergCatalogType {
552        match self.catalog {
553            IcebergCatalogImpl::S3TablesRest(_) => IcebergCatalogType::S3TablesRest,
554            IcebergCatalogImpl::Rest(_) => IcebergCatalogType::Rest,
555        }
556    }
557
558    pub fn s3tables_catalog(&self) -> Option<&S3TablesRestIcebergCatalog> {
559        match &self.catalog {
560            IcebergCatalogImpl::S3TablesRest(s3tables) => Some(s3tables),
561            IcebergCatalogImpl::Rest(_) => None,
562        }
563    }
564
565    pub fn rest_catalog(&self) -> Option<&RestIcebergCatalog> {
566        match &self.catalog {
567            IcebergCatalogImpl::Rest(rest) => Some(rest),
568            IcebergCatalogImpl::S3TablesRest(_) => None,
569        }
570    }
571
572    async fn connect_s3tables(
573        &self,
574        s3tables: &S3TablesRestIcebergCatalog,
575        storage_configuration: &StorageConfiguration,
576        in_task: InTask,
577    ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
578        let secret_reader = &storage_configuration.connection_context.secrets_reader;
579        let aws_ref = &s3tables.aws_connection;
580        let aws_config = aws_ref
581            .connection
582            .load_sdk_config(
583                &storage_configuration.connection_context,
584                aws_ref.connection_id,
585                in_task,
586            )
587            .await
588            .with_context(|| {
589                format!(
590                    "failed to load AWS SDK config for S3 Tables Iceberg catalog \
591                     (connection id: {}, auth method: {}, catalog uri: {}, warehouse: {})",
592                    aws_ref.connection_id,
593                    aws_ref.connection.auth_method(),
594                    self.uri,
595                    s3tables.warehouse
596                )
597            })?;
598
599        let aws_region = aws_ref
600            .connection
601            .region
602            .clone()
603            .unwrap_or_else(|| "us-east-1".to_string());
604
605        let mut props = vec![
606            (S3_REGION.to_string(), aws_region),
607            (S3_DISABLE_EC2_METADATA.to_string(), "true".to_string()),
608            (
609                REST_CATALOG_PROP_WAREHOUSE.to_string(),
610                s3tables.warehouse.clone(),
611            ),
612            (REST_CATALOG_PROP_URI.to_string(), self.uri.to_string()),
613        ];
614
615        let aws_auth = aws_ref.connection.auth.clone();
616
617        if let AwsAuth::Credentials(creds) = &aws_auth {
618            props.push((
619                S3_ACCESS_KEY_ID.to_string(),
620                creds
621                    .access_key_id
622                    .get_string(in_task, secret_reader)
623                    .await?,
624            ));
625            props.push((
626                S3_SECRET_ACCESS_KEY.to_string(),
627                secret_reader.read_string(creds.secret_access_key).await?,
628            ));
629        }
630
631        // Build the catalog with aws_config for REST API signing.
632        // For AssumeRole auth, we also add a FileIO extension so OpenDAL can
633        // use our credential chain for S3 object access.
634        let catalog = RestCatalogBuilder::default()
635            .with_aws_config(aws_config.clone())
636            .load("IcebergCatalog", props.into_iter().collect())
637            .await
638            .with_context(|| {
639                format!(
640                    "failed to create S3 Tables Iceberg catalog \
641                     (connection id: {}, catalog uri: {}, warehouse: {})",
642                    aws_ref.connection_id, self.uri, s3tables.warehouse
643                )
644            })?;
645
646        let catalog = if matches!(aws_auth, AwsAuth::AssumeRole(_)) {
647            let credentials_provider = aws_config
648                .credentials_provider()
649                .ok_or_else(|| anyhow!("aws_config missing credentials provider"))?;
650            let file_io_loader = CustomAwsCredentialLoader::new(Arc::new(
651                AwsSdkCredentialLoader::new(credentials_provider),
652            ));
653            catalog.with_file_io_extension(file_io_loader)
654        } else {
655            catalog
656        };
657
658        Ok(Arc::new(catalog))
659    }
660
661    async fn connect_rest(
662        &self,
663        rest: &RestIcebergCatalog,
664        storage_configuration: &StorageConfiguration,
665        in_task: InTask,
666    ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
667        let mut props = BTreeMap::from([(
668            REST_CATALOG_PROP_URI.to_string(),
669            self.uri.to_string().clone(),
670        )]);
671
672        if let Some(warehouse) = &rest.warehouse {
673            props.insert(REST_CATALOG_PROP_WAREHOUSE.to_string(), warehouse.clone());
674        }
675
676        let credential = rest
677            .credential
678            .get_string(
679                in_task,
680                &storage_configuration.connection_context.secrets_reader,
681            )
682            .await
683            .map_err(|e| anyhow!("failed to read Iceberg catalog credential: {e}"))?;
684        props.insert(REST_CATALOG_PROP_CREDENTIAL.to_string(), credential);
685
686        if let Some(scope) = &rest.scope {
687            props.insert(REST_CATALOG_PROP_SCOPE.to_string(), scope.clone());
688        }
689
690        let catalog = RestCatalogBuilder::default()
691            .load("IcebergCatalog", props.into_iter().collect())
692            .await
693            .map_err(|e| anyhow!("failed to create Iceberg catalog: {e}"))?;
694        Ok(Arc::new(catalog))
695    }
696
697    async fn validate(
698        &self,
699        _id: CatalogItemId,
700        storage_configuration: &StorageConfiguration,
701    ) -> Result<(), ConnectionValidationError> {
702        let catalog = self
703            .connect(storage_configuration, InTask::No)
704            .await
705            .map_err(|e| {
706                ConnectionValidationError::Other(anyhow!("failed to connect to catalog: {e}"))
707            })?;
708
709        // If we can list namespaces, the connection is valid.
710        catalog.list_namespaces(None).await.map_err(|e| {
711            ConnectionValidationError::Other(anyhow!("failed to list namespaces: {e}"))
712        })?;
713
714        Ok(())
715    }
716}
717
718#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
719pub struct AwsPrivatelinkConnection {
720    pub service_name: String,
721    pub availability_zones: Vec<String>,
722}
723
724impl AlterCompatible for AwsPrivatelinkConnection {
725    fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
726        // Every element of the AwsPrivatelinkConnection connection is configurable.
727        Ok(())
728    }
729}
730
731#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
732pub struct KafkaTlsConfig {
733    pub identity: Option<TlsIdentity>,
734    pub root_cert: Option<StringOrSecret>,
735}
736
737#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
738pub struct KafkaSaslConfig<C: ConnectionAccess = InlinedConnection> {
739    pub mechanism: String,
740    pub username: StringOrSecret,
741    pub password: Option<CatalogItemId>,
742    pub aws: Option<AwsConnectionReference<C>>,
743}
744
745impl<R: ConnectionResolver> IntoInlineConnection<KafkaSaslConfig, R>
746    for KafkaSaslConfig<ReferencedConnection>
747{
748    fn into_inline_connection(self, r: R) -> KafkaSaslConfig {
749        KafkaSaslConfig {
750            mechanism: self.mechanism,
751            username: self.username,
752            password: self.password,
753            aws: self.aws.map(|aws| aws.into_inline_connection(&r)),
754        }
755    }
756}
757
758/// Specifies a Kafka broker in a [`KafkaConnection`].
759#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
760pub struct KafkaBroker<C: ConnectionAccess = InlinedConnection> {
761    /// The address of the Kafka broker.
762    pub address: String,
763    /// An optional tunnel to use when connecting to the broker.
764    pub tunnel: Tunnel<C>,
765}
766
767impl<R: ConnectionResolver> IntoInlineConnection<KafkaBroker, R>
768    for KafkaBroker<ReferencedConnection>
769{
770    fn into_inline_connection(self, r: R) -> KafkaBroker {
771        let KafkaBroker { address, tunnel } = self;
772        KafkaBroker {
773            address,
774            tunnel: tunnel.into_inline_connection(r),
775        }
776    }
777}
778
779#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Default)]
780pub struct KafkaTopicOptions {
781    /// The replication factor for the topic.
782    /// If `None`, the broker default will be used.
783    pub replication_factor: Option<NonNeg<i32>>,
784    /// The number of partitions to create.
785    /// If `None`, the broker default will be used.
786    pub partition_count: Option<NonNeg<i32>>,
787    /// The initial configuration parameters for the topic.
788    pub topic_config: BTreeMap<String, String>,
789}
790
791#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
792pub struct KafkaConnection<C: ConnectionAccess = InlinedConnection> {
793    pub brokers: Vec<KafkaBroker<C>>,
794    /// A tunnel through which to route traffic,
795    /// that can be overridden for individual brokers
796    /// in `brokers`.
797    pub default_tunnel: Tunnel<C>,
798    pub progress_topic: Option<String>,
799    pub progress_topic_options: KafkaTopicOptions,
800    pub options: BTreeMap<String, StringOrSecret>,
801    pub tls: Option<KafkaTlsConfig>,
802    pub sasl: Option<KafkaSaslConfig<C>>,
803}
804
805impl<R: ConnectionResolver> IntoInlineConnection<KafkaConnection, R>
806    for KafkaConnection<ReferencedConnection>
807{
808    fn into_inline_connection(self, r: R) -> KafkaConnection {
809        let KafkaConnection {
810            brokers,
811            progress_topic,
812            progress_topic_options,
813            default_tunnel,
814            options,
815            tls,
816            sasl,
817        } = self;
818
819        let brokers = brokers
820            .into_iter()
821            .map(|broker| broker.into_inline_connection(&r))
822            .collect();
823
824        KafkaConnection {
825            brokers,
826            progress_topic,
827            progress_topic_options,
828            default_tunnel: default_tunnel.into_inline_connection(&r),
829            options,
830            tls,
831            sasl: sasl.map(|sasl| sasl.into_inline_connection(&r)),
832        }
833    }
834}
835
836impl<C: ConnectionAccess> KafkaConnection<C> {
837    /// Returns the name of the progress topic to use for the connection.
838    ///
839    /// The caller is responsible for providing the connection ID as it is not
840    /// known to `KafkaConnection`.
841    pub fn progress_topic(
842        &self,
843        connection_context: &ConnectionContext,
844        connection_id: CatalogItemId,
845    ) -> Cow<'_, str> {
846        if let Some(progress_topic) = &self.progress_topic {
847            Cow::Borrowed(progress_topic)
848        } else {
849            Cow::Owned(format!(
850                "_materialize-progress-{}-{}",
851                connection_context.environment_id, connection_id,
852            ))
853        }
854    }
855
856    fn validate_by_default(&self) -> bool {
857        true
858    }
859}
860
861impl KafkaConnection {
862    /// Generates a string that can be used as the base for a configuration ID
863    /// (e.g., `client.id`, `group.id`, `transactional.id`) for a Kafka source
864    /// or sink.
865    pub fn id_base(
866        connection_context: &ConnectionContext,
867        connection_id: CatalogItemId,
868        object_id: GlobalId,
869    ) -> String {
870        format!(
871            "materialize-{}-{}-{}",
872            connection_context.environment_id, connection_id, object_id,
873        )
874    }
875
876    /// Enriches the provided `client_id` according to any enrichment rules in
877    /// the `kafka_client_id_enrichment_rules` configuration parameter.
878    pub fn enrich_client_id(&self, configs: &ConfigSet, client_id: &mut String) {
879        #[derive(Debug, Deserialize)]
880        struct EnrichmentRule {
881            #[serde(deserialize_with = "deserialize_regex")]
882            pattern: Regex,
883            payload: String,
884        }
885
886        fn deserialize_regex<'de, D>(deserializer: D) -> Result<Regex, D::Error>
887        where
888            D: Deserializer<'de>,
889        {
890            let buf = String::deserialize(deserializer)?;
891            Regex::new(&buf).map_err(serde::de::Error::custom)
892        }
893
894        let rules = KAFKA_CLIENT_ID_ENRICHMENT_RULES.get(configs);
895        let rules = match serde_json::from_value::<Vec<EnrichmentRule>>(rules) {
896            Ok(rules) => rules,
897            Err(e) => {
898                warn!(%e, "failed to decode kafka_client_id_enrichment_rules");
899                return;
900            }
901        };
902
903        // Check every rule against every broker. Rules are matched in the order
904        // that they are specified. It is usually a configuration error if
905        // multiple rules match the same list of Kafka brokers, but we
906        // nonetheless want to provide well defined semantics.
907        debug!(?self.brokers, "evaluating client ID enrichment rules");
908        for rule in rules {
909            let is_match = self
910                .brokers
911                .iter()
912                .any(|b| rule.pattern.is_match(&b.address));
913            debug!(?rule, is_match, "evaluated client ID enrichment rule");
914            if is_match {
915                client_id.push('-');
916                client_id.push_str(&rule.payload);
917            }
918        }
919    }
920
921    /// Creates a Kafka client for the connection.
922    pub async fn create_with_context<C, T>(
923        &self,
924        storage_configuration: &StorageConfiguration,
925        context: C,
926        extra_options: &BTreeMap<&str, String>,
927        in_task: InTask,
928    ) -> Result<T, ContextCreationError>
929    where
930        C: ClientContext,
931        T: FromClientConfigAndContext<TunnelingClientContext<C>>,
932    {
933        let mut options = self.options.clone();
934
935        // Ensure that Kafka topics are *not* automatically created when
936        // consuming, producing, or fetching metadata for a topic. This ensures
937        // that we don't accidentally create topics with the wrong number of
938        // partitions.
939        options.insert("allow.auto.create.topics".into(), "false".into());
940
941        let brokers = match &self.default_tunnel {
942            Tunnel::AwsPrivatelink(t) => {
943                assert!(&self.brokers.is_empty());
944
945                let algo = KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM
946                    .get(storage_configuration.config_set());
947                options.insert("ssl.endpoint.identification.algorithm".into(), algo.into());
948
949                // When using a default privatelink tunnel broker/brokers cannot be specified
950                // instead the tunnel connection_id and port are used for the initial connection.
951                format!(
952                    "{}:{}",
953                    vpc_endpoint_host(
954                        t.connection_id,
955                        None, // Default tunnel does not support availability zones.
956                    ),
957                    t.port.unwrap_or(9092)
958                )
959            }
960            _ => self.brokers.iter().map(|b| &b.address).join(","),
961        };
962        options.insert("bootstrap.servers".into(), brokers.into());
963        let security_protocol = match (self.tls.is_some(), self.sasl.is_some()) {
964            (false, false) => "PLAINTEXT",
965            (true, false) => "SSL",
966            (false, true) => "SASL_PLAINTEXT",
967            (true, true) => "SASL_SSL",
968        };
969        options.insert("security.protocol".into(), security_protocol.into());
970        if let Some(tls) = &self.tls {
971            if let Some(root_cert) = &tls.root_cert {
972                options.insert("ssl.ca.pem".into(), root_cert.clone());
973            }
974            if let Some(identity) = &tls.identity {
975                options.insert("ssl.key.pem".into(), StringOrSecret::Secret(identity.key));
976                options.insert("ssl.certificate.pem".into(), identity.cert.clone());
977            }
978        }
979        if let Some(sasl) = &self.sasl {
980            options.insert("sasl.mechanisms".into(), (&sasl.mechanism).into());
981            options.insert("sasl.username".into(), sasl.username.clone());
982            if let Some(password) = sasl.password {
983                options.insert("sasl.password".into(), StringOrSecret::Secret(password));
984            }
985        }
986
987        options.insert(
988            "retry.backoff.ms".into(),
989            KAFKA_RETRY_BACKOFF
990                .get(storage_configuration.config_set())
991                .as_millis()
992                .into(),
993        );
994        options.insert(
995            "retry.backoff.max.ms".into(),
996            KAFKA_RETRY_BACKOFF_MAX
997                .get(storage_configuration.config_set())
998                .as_millis()
999                .into(),
1000        );
1001        options.insert(
1002            "reconnect.backoff.ms".into(),
1003            KAFKA_RECONNECT_BACKOFF
1004                .get(storage_configuration.config_set())
1005                .as_millis()
1006                .into(),
1007        );
1008        options.insert(
1009            "reconnect.backoff.max.ms".into(),
1010            KAFKA_RECONNECT_BACKOFF_MAX
1011                .get(storage_configuration.config_set())
1012                .as_millis()
1013                .into(),
1014        );
1015
1016        let mut config = mz_kafka_util::client::create_new_client_config(
1017            storage_configuration
1018                .connection_context
1019                .librdkafka_log_level,
1020            storage_configuration.parameters.kafka_timeout_config,
1021        );
1022        for (k, v) in options {
1023            config.set(
1024                k,
1025                v.get_string(
1026                    in_task,
1027                    &storage_configuration.connection_context.secrets_reader,
1028                )
1029                .await
1030                .context("reading kafka secret")?,
1031            );
1032        }
1033        for (k, v) in extra_options {
1034            config.set(*k, v);
1035        }
1036
1037        let aws_config = match self.sasl.as_ref().and_then(|sasl| sasl.aws.as_ref()) {
1038            None => None,
1039            Some(aws) => Some(
1040                aws.connection
1041                    .load_sdk_config(
1042                        &storage_configuration.connection_context,
1043                        aws.connection_id,
1044                        in_task,
1045                    )
1046                    .await?,
1047            ),
1048        };
1049
1050        // TODO(roshan): Implement enforcement of external address validation once
1051        // rdkafka client has been updated to support providing multiple resolved
1052        // addresses for brokers
1053        let mut context = TunnelingClientContext::new(
1054            context,
1055            Handle::current(),
1056            storage_configuration
1057                .connection_context
1058                .ssh_tunnel_manager
1059                .clone(),
1060            storage_configuration.parameters.ssh_timeout_config,
1061            aws_config,
1062            in_task,
1063        );
1064
1065        match &self.default_tunnel {
1066            Tunnel::Direct => {
1067                // By default, don't offer a default override for broker address lookup.
1068            }
1069            Tunnel::AwsPrivatelink(pl) => {
1070                context.set_default_tunnel(TunnelConfig::StaticHost(vpc_endpoint_host(
1071                    pl.connection_id,
1072                    None, // Default tunnel does not support availability zones.
1073                )));
1074            }
1075            Tunnel::Ssh(ssh_tunnel) => {
1076                let secret = storage_configuration
1077                    .connection_context
1078                    .secrets_reader
1079                    .read_in_task_if(in_task, ssh_tunnel.connection_id)
1080                    .await?;
1081                let key_pair = SshKeyPair::from_bytes(&secret)?;
1082
1083                // Ensure any ssh-bastion address we connect to is resolved to an external address.
1084                let resolved = resolve_address(
1085                    &ssh_tunnel.connection.host,
1086                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1087                )
1088                .await?;
1089                context.set_default_tunnel(TunnelConfig::Ssh(SshTunnelConfig {
1090                    host: resolved
1091                        .iter()
1092                        .map(|a| a.to_string())
1093                        .collect::<BTreeSet<_>>(),
1094                    port: ssh_tunnel.connection.port,
1095                    user: ssh_tunnel.connection.user.clone(),
1096                    key_pair,
1097                }));
1098            }
1099        }
1100
1101        for broker in &self.brokers {
1102            let mut addr_parts = broker.address.splitn(2, ':');
1103            let addr = BrokerAddr {
1104                host: addr_parts
1105                    .next()
1106                    .context("BROKER is not address:port")?
1107                    .into(),
1108                port: addr_parts
1109                    .next()
1110                    .unwrap_or("9092")
1111                    .parse()
1112                    .context("parsing BROKER port")?,
1113            };
1114            match &broker.tunnel {
1115                Tunnel::Direct => {
1116                    // By default, don't override broker address lookup.
1117                    //
1118                    // N.B.
1119                    //
1120                    // We _could_ pre-setup the default ssh tunnel for all known brokers here, but
1121                    // we avoid doing because:
1122                    // - Its not necessary.
1123                    // - Not doing so makes it easier to test the `FailedDefaultSshTunnel` path
1124                    // in the `TunnelingClientContext`.
1125                }
1126                Tunnel::AwsPrivatelink(aws_privatelink) => {
1127                    let host = mz_cloud_resources::vpc_endpoint_host(
1128                        aws_privatelink.connection_id,
1129                        aws_privatelink.availability_zone.as_deref(),
1130                    );
1131                    let port = aws_privatelink.port;
1132                    context.add_broker_rewrite(
1133                        addr,
1134                        BrokerRewrite {
1135                            host: host.clone(),
1136                            port,
1137                        },
1138                    );
1139                }
1140                Tunnel::Ssh(ssh_tunnel) => {
1141                    // Ensure any SSH bastion address we connect to is resolved to an external address.
1142                    let ssh_host_resolved = resolve_address(
1143                        &ssh_tunnel.connection.host,
1144                        ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1145                    )
1146                    .await?;
1147                    context
1148                        .add_ssh_tunnel(
1149                            addr,
1150                            SshTunnelConfig {
1151                                host: ssh_host_resolved
1152                                    .iter()
1153                                    .map(|a| a.to_string())
1154                                    .collect::<BTreeSet<_>>(),
1155                                port: ssh_tunnel.connection.port,
1156                                user: ssh_tunnel.connection.user.clone(),
1157                                key_pair: SshKeyPair::from_bytes(
1158                                    &storage_configuration
1159                                        .connection_context
1160                                        .secrets_reader
1161                                        .read_in_task_if(in_task, ssh_tunnel.connection_id)
1162                                        .await?,
1163                                )?,
1164                            },
1165                        )
1166                        .await
1167                        .map_err(ContextCreationError::Ssh)?;
1168                }
1169            }
1170        }
1171
1172        Ok(config.create_with_context(context)?)
1173    }
1174
1175    async fn validate(
1176        &self,
1177        _id: CatalogItemId,
1178        storage_configuration: &StorageConfiguration,
1179    ) -> Result<(), anyhow::Error> {
1180        let (context, error_rx) = MzClientContext::with_errors();
1181        let consumer: BaseConsumer<_> = self
1182            .create_with_context(
1183                storage_configuration,
1184                context,
1185                &BTreeMap::new(),
1186                // We are in a normal tokio context during validation, already.
1187                InTask::No,
1188            )
1189            .await?;
1190        let consumer = Arc::new(consumer);
1191
1192        let timeout = storage_configuration
1193            .parameters
1194            .kafka_timeout_config
1195            .fetch_metadata_timeout;
1196
1197        // librdkafka doesn't expose an API for determining whether a connection to
1198        // the Kafka cluster has been successfully established. So we make a
1199        // metadata request, though we don't care about the results, so that we can
1200        // report any errors making that request. If the request succeeds, we know
1201        // we were able to contact at least one broker, and that's a good proxy for
1202        // being able to contact all the brokers in the cluster.
1203        //
1204        // The downside of this approach is it produces a generic error message like
1205        // "metadata fetch error" with no additional details. The real networking
1206        // error is buried in the librdkafka logs, which are not visible to users.
1207        let result = mz_ore::task::spawn_blocking(|| "kafka_get_metadata", {
1208            let consumer = Arc::clone(&consumer);
1209            move || consumer.fetch_metadata(None, timeout)
1210        })
1211        .await;
1212        match result {
1213            Ok(_) => Ok(()),
1214            // The error returned by `fetch_metadata` does not provide any details which makes for
1215            // a crappy user facing error message. For this reason we attempt to grab a better
1216            // error message from the client context, which should contain any error logs emitted
1217            // by librdkafka, and fallback to the generic error if there is nothing there.
1218            Err(err) => {
1219                // Multiple errors might have been logged during this validation but some are more
1220                // relevant than others. Specifically, we prefer non-internal errors over internal
1221                // errors since those give much more useful information to the users.
1222                let main_err = error_rx.try_iter().reduce(|cur, new| match cur {
1223                    MzKafkaError::Internal(_) => new,
1224                    _ => cur,
1225                });
1226
1227                // Don't drop the consumer until after we've drained the errors
1228                // channel. Dropping the consumer can introduce spurious errors.
1229                // See database-issues#7432.
1230                drop(consumer);
1231
1232                match main_err {
1233                    Some(err) => Err(err.into()),
1234                    None => Err(err.into()),
1235                }
1236            }
1237        }
1238    }
1239}
1240
1241impl<C: ConnectionAccess> AlterCompatible for KafkaConnection<C> {
1242    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1243        let KafkaConnection {
1244            brokers: _,
1245            default_tunnel: _,
1246            progress_topic,
1247            progress_topic_options,
1248            options: _,
1249            tls: _,
1250            sasl: _,
1251        } = self;
1252
1253        let compatibility_checks = [
1254            (progress_topic == &other.progress_topic, "progress_topic"),
1255            (
1256                progress_topic_options == &other.progress_topic_options,
1257                "progress_topic_options",
1258            ),
1259        ];
1260
1261        for (compatible, field) in compatibility_checks {
1262            if !compatible {
1263                tracing::warn!(
1264                    "KafkaConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1265                    self,
1266                    other
1267                );
1268
1269                return Err(AlterError { id });
1270            }
1271        }
1272
1273        Ok(())
1274    }
1275}
1276
1277/// A connection to a Confluent Schema Registry.
1278#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1279pub struct CsrConnection<C: ConnectionAccess = InlinedConnection> {
1280    /// The URL of the schema registry.
1281    pub url: Url,
1282    /// Trusted root TLS certificate in PEM format.
1283    pub tls_root_cert: Option<StringOrSecret>,
1284    /// An optional TLS client certificate for authentication with the schema
1285    /// registry.
1286    pub tls_identity: Option<TlsIdentity>,
1287    /// Optional HTTP authentication credentials for the schema registry.
1288    pub http_auth: Option<CsrConnectionHttpAuth>,
1289    /// A tunnel through which to route traffic.
1290    pub tunnel: Tunnel<C>,
1291}
1292
1293impl<R: ConnectionResolver> IntoInlineConnection<CsrConnection, R>
1294    for CsrConnection<ReferencedConnection>
1295{
1296    fn into_inline_connection(self, r: R) -> CsrConnection {
1297        let CsrConnection {
1298            url,
1299            tls_root_cert,
1300            tls_identity,
1301            http_auth,
1302            tunnel,
1303        } = self;
1304        CsrConnection {
1305            url,
1306            tls_root_cert,
1307            tls_identity,
1308            http_auth,
1309            tunnel: tunnel.into_inline_connection(r),
1310        }
1311    }
1312}
1313
1314impl<C: ConnectionAccess> CsrConnection<C> {
1315    fn validate_by_default(&self) -> bool {
1316        true
1317    }
1318}
1319
1320impl CsrConnection {
1321    /// Constructs a schema registry client from the connection.
1322    pub async fn connect(
1323        &self,
1324        storage_configuration: &StorageConfiguration,
1325        in_task: InTask,
1326    ) -> Result<mz_ccsr::Client, CsrConnectError> {
1327        let mut client_config = mz_ccsr::ClientConfig::new(self.url.clone());
1328        if let Some(root_cert) = &self.tls_root_cert {
1329            let root_cert = root_cert
1330                .get_string(
1331                    in_task,
1332                    &storage_configuration.connection_context.secrets_reader,
1333                )
1334                .await?;
1335            let root_cert = Certificate::from_pem(root_cert.as_bytes())?;
1336            client_config = client_config.add_root_certificate(root_cert);
1337        }
1338
1339        if let Some(tls_identity) = &self.tls_identity {
1340            let key = &storage_configuration
1341                .connection_context
1342                .secrets_reader
1343                .read_string_in_task_if(in_task, tls_identity.key)
1344                .await?;
1345            let cert = tls_identity
1346                .cert
1347                .get_string(
1348                    in_task,
1349                    &storage_configuration.connection_context.secrets_reader,
1350                )
1351                .await?;
1352            let ident = Identity::from_pem(key.as_bytes(), cert.as_bytes())?;
1353            client_config = client_config.identity(ident);
1354        }
1355
1356        if let Some(http_auth) = &self.http_auth {
1357            let username = http_auth
1358                .username
1359                .get_string(
1360                    in_task,
1361                    &storage_configuration.connection_context.secrets_reader,
1362                )
1363                .await?;
1364            let password = match http_auth.password {
1365                None => None,
1366                Some(password) => Some(
1367                    storage_configuration
1368                        .connection_context
1369                        .secrets_reader
1370                        .read_string_in_task_if(in_task, password)
1371                        .await?,
1372                ),
1373            };
1374            client_config = client_config.auth(username, password);
1375        }
1376
1377        // TODO: use types to enforce that the URL has a string hostname.
1378        let host = self
1379            .url
1380            .host_str()
1381            .ok_or_else(|| anyhow!("url missing host"))?;
1382        match &self.tunnel {
1383            Tunnel::Direct => {
1384                // Ensure any host we connect to is resolved to an external address.
1385                let resolved = resolve_address(
1386                    host,
1387                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1388                )
1389                .await?;
1390                client_config = client_config.resolve_to_addrs(
1391                    host,
1392                    &resolved
1393                        .iter()
1394                        .map(|addr| SocketAddr::new(*addr, 0))
1395                        .collect::<Vec<_>>(),
1396                )
1397            }
1398            Tunnel::Ssh(ssh_tunnel) => {
1399                let ssh_tunnel = ssh_tunnel
1400                    .connect(
1401                        storage_configuration,
1402                        host,
1403                        // Default to the default http port, but this
1404                        // could default to 8081...
1405                        self.url.port().unwrap_or(80),
1406                        in_task,
1407                    )
1408                    .await
1409                    .map_err(CsrConnectError::Ssh)?;
1410
1411                // Carefully inject the SSH tunnel into the client
1412                // configuration. This is delicate because we need TLS
1413                // verification to continue to use the remote hostname rather
1414                // than the tunnel hostname.
1415
1416                client_config = client_config
1417                    // `resolve_to_addrs` allows us to rewrite the hostname
1418                    // at the DNS level, which means the TCP connection is
1419                    // correctly routed through the tunnel, but TLS verification
1420                    // is still performed against the remote hostname.
1421                    // Unfortunately the port here is ignored if the URL also
1422                    // specifies a port...
1423                    .resolve_to_addrs(host, &[SocketAddr::new(ssh_tunnel.local_addr().ip(), 0)])
1424                    // ...so we also dynamically rewrite the URL to use the
1425                    // current port for the SSH tunnel.
1426                    //
1427                    // WARNING: this is brittle, because we only dynamically
1428                    // update the client configuration with the tunnel *port*,
1429                    // and not the hostname This works fine in practice, because
1430                    // only the SSH tunnel port will change if the tunnel fails
1431                    // and has to be restarted (the hostname is always
1432                    // 127.0.0.1)--but this is an an implementation detail of
1433                    // the SSH tunnel code that we're relying on.
1434                    .dynamic_url({
1435                        let remote_url = self.url.clone();
1436                        move || {
1437                            let mut url = remote_url.clone();
1438                            url.set_port(Some(ssh_tunnel.local_addr().port()))
1439                                .expect("cannot fail");
1440                            url
1441                        }
1442                    });
1443            }
1444            Tunnel::AwsPrivatelink(connection) => {
1445                assert_none!(connection.port);
1446
1447                let privatelink_host = mz_cloud_resources::vpc_endpoint_host(
1448                    connection.connection_id,
1449                    connection.availability_zone.as_deref(),
1450                );
1451                let addrs: Vec<_> = net::lookup_host((privatelink_host, 0))
1452                    .await
1453                    .context("resolving PrivateLink host")?
1454                    .collect();
1455                client_config = client_config.resolve_to_addrs(host, &addrs)
1456            }
1457        }
1458
1459        Ok(client_config.build()?)
1460    }
1461
1462    async fn validate(
1463        &self,
1464        _id: CatalogItemId,
1465        storage_configuration: &StorageConfiguration,
1466    ) -> Result<(), anyhow::Error> {
1467        let client = self
1468            .connect(
1469                storage_configuration,
1470                // We are in a normal tokio context during validation, already.
1471                InTask::No,
1472            )
1473            .await?;
1474        client.list_subjects().await?;
1475        Ok(())
1476    }
1477}
1478
1479impl<C: ConnectionAccess> AlterCompatible for CsrConnection<C> {
1480    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1481        let CsrConnection {
1482            tunnel,
1483            // All non-tunnel fields may change
1484            url: _,
1485            tls_root_cert: _,
1486            tls_identity: _,
1487            http_auth: _,
1488        } = self;
1489
1490        let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
1491
1492        for (compatible, field) in compatibility_checks {
1493            if !compatible {
1494                tracing::warn!(
1495                    "CsrConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1496                    self,
1497                    other
1498                );
1499
1500                return Err(AlterError { id });
1501            }
1502        }
1503        Ok(())
1504    }
1505}
1506
1507/// A TLS key pair used for client identity.
1508#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1509pub struct TlsIdentity {
1510    /// The client's TLS public certificate in PEM format.
1511    pub cert: StringOrSecret,
1512    /// The ID of the secret containing the client's TLS private key in PEM
1513    /// format.
1514    pub key: CatalogItemId,
1515}
1516
1517/// HTTP authentication credentials in a [`CsrConnection`].
1518#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1519pub struct CsrConnectionHttpAuth {
1520    /// The username.
1521    pub username: StringOrSecret,
1522    /// The ID of the secret containing the password, if any.
1523    pub password: Option<CatalogItemId>,
1524}
1525
1526/// A connection to a PostgreSQL server.
1527#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1528pub struct PostgresConnection<C: ConnectionAccess = InlinedConnection> {
1529    /// The hostname of the server.
1530    pub host: String,
1531    /// The port of the server.
1532    pub port: u16,
1533    /// The name of the database to connect to.
1534    pub database: String,
1535    /// The username to authenticate as.
1536    pub user: StringOrSecret,
1537    /// An optional password for authentication.
1538    pub password: Option<CatalogItemId>,
1539    /// A tunnel through which to route traffic.
1540    pub tunnel: Tunnel<C>,
1541    /// Whether to use TLS for encryption, authentication, or both.
1542    pub tls_mode: SslMode,
1543    /// An optional root TLS certificate in PEM format, to verify the server's
1544    /// identity.
1545    pub tls_root_cert: Option<StringOrSecret>,
1546    /// An optional TLS client certificate for authentication.
1547    pub tls_identity: Option<TlsIdentity>,
1548}
1549
1550impl<R: ConnectionResolver> IntoInlineConnection<PostgresConnection, R>
1551    for PostgresConnection<ReferencedConnection>
1552{
1553    fn into_inline_connection(self, r: R) -> PostgresConnection {
1554        let PostgresConnection {
1555            host,
1556            port,
1557            database,
1558            user,
1559            password,
1560            tunnel,
1561            tls_mode,
1562            tls_root_cert,
1563            tls_identity,
1564        } = self;
1565
1566        PostgresConnection {
1567            host,
1568            port,
1569            database,
1570            user,
1571            password,
1572            tunnel: tunnel.into_inline_connection(r),
1573            tls_mode,
1574            tls_root_cert,
1575            tls_identity,
1576        }
1577    }
1578}
1579
1580impl<C: ConnectionAccess> PostgresConnection<C> {
1581    fn validate_by_default(&self) -> bool {
1582        true
1583    }
1584}
1585
1586impl PostgresConnection<InlinedConnection> {
1587    pub async fn config(
1588        &self,
1589        secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
1590        storage_configuration: &StorageConfiguration,
1591        in_task: InTask,
1592    ) -> Result<mz_postgres_util::Config, anyhow::Error> {
1593        let params = &storage_configuration.parameters;
1594
1595        let mut config = tokio_postgres::Config::new();
1596        config
1597            .host(&self.host)
1598            .port(self.port)
1599            .dbname(&self.database)
1600            .user(&self.user.get_string(in_task, secrets_reader).await?)
1601            .ssl_mode(self.tls_mode);
1602        if let Some(password) = self.password {
1603            let password = secrets_reader
1604                .read_string_in_task_if(in_task, password)
1605                .await?;
1606            config.password(password);
1607        }
1608        if let Some(tls_root_cert) = &self.tls_root_cert {
1609            let tls_root_cert = tls_root_cert.get_string(in_task, secrets_reader).await?;
1610            config.ssl_root_cert(tls_root_cert.as_bytes());
1611        }
1612        if let Some(tls_identity) = &self.tls_identity {
1613            let cert = tls_identity
1614                .cert
1615                .get_string(in_task, secrets_reader)
1616                .await?;
1617            let key = secrets_reader
1618                .read_string_in_task_if(in_task, tls_identity.key)
1619                .await?;
1620            config.ssl_cert(cert.as_bytes()).ssl_key(key.as_bytes());
1621        }
1622
1623        if let Some(connect_timeout) = params.pg_source_connect_timeout {
1624            config.connect_timeout(connect_timeout);
1625        }
1626        if let Some(keepalives_retries) = params.pg_source_tcp_keepalives_retries {
1627            config.keepalives_retries(keepalives_retries);
1628        }
1629        if let Some(keepalives_idle) = params.pg_source_tcp_keepalives_idle {
1630            config.keepalives_idle(keepalives_idle);
1631        }
1632        if let Some(keepalives_interval) = params.pg_source_tcp_keepalives_interval {
1633            config.keepalives_interval(keepalives_interval);
1634        }
1635        if let Some(tcp_user_timeout) = params.pg_source_tcp_user_timeout {
1636            config.tcp_user_timeout(tcp_user_timeout);
1637        }
1638
1639        let mut options = vec![];
1640        if let Some(wal_sender_timeout) = params.pg_source_wal_sender_timeout {
1641            options.push(format!(
1642                "--wal_sender_timeout={}",
1643                wal_sender_timeout.as_millis()
1644            ));
1645        };
1646        if params.pg_source_tcp_configure_server {
1647            if let Some(keepalives_retries) = params.pg_source_tcp_keepalives_retries {
1648                options.push(format!("--tcp_keepalives_count={}", keepalives_retries));
1649            }
1650            if let Some(keepalives_idle) = params.pg_source_tcp_keepalives_idle {
1651                options.push(format!(
1652                    "--tcp_keepalives_idle={}",
1653                    keepalives_idle.as_secs()
1654                ));
1655            }
1656            if let Some(keepalives_interval) = params.pg_source_tcp_keepalives_interval {
1657                options.push(format!(
1658                    "--tcp_keepalives_interval={}",
1659                    keepalives_interval.as_secs()
1660                ));
1661            }
1662            if let Some(tcp_user_timeout) = params.pg_source_tcp_user_timeout {
1663                options.push(format!(
1664                    "--tcp_user_timeout={}",
1665                    tcp_user_timeout.as_millis()
1666                ));
1667            }
1668        }
1669        config.options(options.join(" ").as_str());
1670
1671        let tunnel = match &self.tunnel {
1672            Tunnel::Direct => {
1673                // Ensure any host we connect to is resolved to an external address.
1674                let resolved = resolve_address(
1675                    &self.host,
1676                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1677                )
1678                .await?;
1679                mz_postgres_util::TunnelConfig::Direct {
1680                    resolved_ips: Some(resolved),
1681                }
1682            }
1683            Tunnel::Ssh(SshTunnel {
1684                connection_id,
1685                connection,
1686            }) => {
1687                let secret = secrets_reader
1688                    .read_in_task_if(in_task, *connection_id)
1689                    .await?;
1690                let key_pair = SshKeyPair::from_bytes(&secret)?;
1691                // Ensure any ssh-bastion host we connect to is resolved to an external address.
1692                let resolved = resolve_address(
1693                    &connection.host,
1694                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1695                )
1696                .await?;
1697                mz_postgres_util::TunnelConfig::Ssh {
1698                    config: SshTunnelConfig {
1699                        host: resolved
1700                            .iter()
1701                            .map(|a| a.to_string())
1702                            .collect::<BTreeSet<_>>(),
1703                        port: connection.port,
1704                        user: connection.user.clone(),
1705                        key_pair,
1706                    },
1707                }
1708            }
1709            Tunnel::AwsPrivatelink(connection) => {
1710                assert_none!(connection.port);
1711                mz_postgres_util::TunnelConfig::AwsPrivatelink {
1712                    connection_id: connection.connection_id,
1713                }
1714            }
1715        };
1716
1717        Ok(mz_postgres_util::Config::new(
1718            config,
1719            tunnel,
1720            params.ssh_timeout_config,
1721            in_task,
1722        )?)
1723    }
1724
1725    pub async fn validate(
1726        &self,
1727        _id: CatalogItemId,
1728        storage_configuration: &StorageConfiguration,
1729    ) -> Result<mz_postgres_util::Client, anyhow::Error> {
1730        let config = self
1731            .config(
1732                &storage_configuration.connection_context.secrets_reader,
1733                storage_configuration,
1734                // We are in a normal tokio context during validation, already.
1735                InTask::No,
1736            )
1737            .await?;
1738        let client = config
1739            .connect(
1740                "connection validation",
1741                &storage_configuration.connection_context.ssh_tunnel_manager,
1742            )
1743            .await?;
1744
1745        let wal_level = mz_postgres_util::get_wal_level(&client).await?;
1746
1747        if wal_level < mz_postgres_util::replication::WalLevel::Logical {
1748            Err(PostgresConnectionValidationError::InsufficientWalLevel { wal_level })?;
1749        }
1750
1751        let max_wal_senders = mz_postgres_util::get_max_wal_senders(&client).await?;
1752
1753        if max_wal_senders < 1 {
1754            Err(PostgresConnectionValidationError::ReplicationDisabled)?;
1755        }
1756
1757        let available_replication_slots =
1758            mz_postgres_util::available_replication_slots(&client).await?;
1759
1760        // We need 1 replication slot for the snapshots and 1 for the continuing replication
1761        if available_replication_slots < 2 {
1762            Err(
1763                PostgresConnectionValidationError::InsufficientReplicationSlotsAvailable {
1764                    count: 2,
1765                },
1766            )?;
1767        }
1768
1769        Ok(client)
1770    }
1771}
1772
1773#[derive(Debug, Clone, thiserror::Error)]
1774pub enum PostgresConnectionValidationError {
1775    #[error("PostgreSQL server has insufficient number of replication slots available")]
1776    InsufficientReplicationSlotsAvailable { count: usize },
1777    #[error("server must have wal_level >= logical, but has {wal_level}")]
1778    InsufficientWalLevel {
1779        wal_level: mz_postgres_util::replication::WalLevel,
1780    },
1781    #[error("replication disabled on server")]
1782    ReplicationDisabled,
1783}
1784
1785impl PostgresConnectionValidationError {
1786    pub fn detail(&self) -> Option<String> {
1787        match self {
1788            Self::InsufficientReplicationSlotsAvailable { count } => Some(format!(
1789                "executing this statement requires {} replication slot{}",
1790                count,
1791                if *count == 1 { "" } else { "s" }
1792            )),
1793            _ => None,
1794        }
1795    }
1796
1797    pub fn hint(&self) -> Option<String> {
1798        match self {
1799            Self::InsufficientReplicationSlotsAvailable { .. } => Some(
1800                "you might be able to wait for other sources to finish snapshotting and try again"
1801                    .into(),
1802            ),
1803            Self::ReplicationDisabled => Some("set max_wal_senders to a value > 0".into()),
1804            Self::InsufficientWalLevel { .. } => None,
1805        }
1806    }
1807}
1808
1809impl<C: ConnectionAccess> AlterCompatible for PostgresConnection<C> {
1810    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1811        let PostgresConnection {
1812            tunnel,
1813            // All non-tunnel options may change arbitrarily
1814            host: _,
1815            port: _,
1816            database: _,
1817            user: _,
1818            password: _,
1819            tls_mode: _,
1820            tls_root_cert: _,
1821            tls_identity: _,
1822        } = self;
1823
1824        let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
1825
1826        for (compatible, field) in compatibility_checks {
1827            if !compatible {
1828                tracing::warn!(
1829                    "PostgresConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1830                    self,
1831                    other
1832                );
1833
1834                return Err(AlterError { id });
1835            }
1836        }
1837        Ok(())
1838    }
1839}
1840
1841/// Specifies how to tunnel a connection.
1842#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1843pub enum Tunnel<C: ConnectionAccess = InlinedConnection> {
1844    /// No tunneling.
1845    Direct,
1846    /// Via the specified SSH tunnel connection.
1847    Ssh(SshTunnel<C>),
1848    /// Via the specified AWS PrivateLink connection.
1849    AwsPrivatelink(AwsPrivatelink),
1850}
1851
1852impl<R: ConnectionResolver> IntoInlineConnection<Tunnel, R> for Tunnel<ReferencedConnection> {
1853    fn into_inline_connection(self, r: R) -> Tunnel {
1854        match self {
1855            Tunnel::Direct => Tunnel::Direct,
1856            Tunnel::Ssh(ssh) => Tunnel::Ssh(ssh.into_inline_connection(r)),
1857            Tunnel::AwsPrivatelink(awspl) => Tunnel::AwsPrivatelink(awspl),
1858        }
1859    }
1860}
1861
1862impl<C: ConnectionAccess> AlterCompatible for Tunnel<C> {
1863    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1864        let compatible = match (self, other) {
1865            (Self::Ssh(s), Self::Ssh(o)) => s.alter_compatible(id, o).is_ok(),
1866            (s, o) => s == o,
1867        };
1868
1869        if !compatible {
1870            tracing::warn!(
1871                "Tunnel incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
1872                self,
1873                other
1874            );
1875
1876            return Err(AlterError { id });
1877        }
1878
1879        Ok(())
1880    }
1881}
1882
1883/// Specifies which MySQL SSL Mode to use:
1884/// <https://dev.mysql.com/doc/refman/8.0/en/connection-options.html#option_general_ssl-mode>
1885/// This is not available as an enum in the mysql-async crate, so we define our own.
1886#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1887pub enum MySqlSslMode {
1888    Disabled,
1889    Required,
1890    VerifyCa,
1891    VerifyIdentity,
1892}
1893
1894/// A connection to a MySQL server.
1895#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1896pub struct MySqlConnection<C: ConnectionAccess = InlinedConnection> {
1897    /// The hostname of the server.
1898    pub host: String,
1899    /// The port of the server.
1900    pub port: u16,
1901    /// The username to authenticate as.
1902    pub user: StringOrSecret,
1903    /// An optional password for authentication.
1904    pub password: Option<CatalogItemId>,
1905    /// A tunnel through which to route traffic.
1906    pub tunnel: Tunnel<C>,
1907    /// Whether to use TLS for encryption, verify the server's certificate, and identity.
1908    pub tls_mode: MySqlSslMode,
1909    /// An optional root TLS certificate in PEM format, to verify the server's
1910    /// identity.
1911    pub tls_root_cert: Option<StringOrSecret>,
1912    /// An optional TLS client certificate for authentication.
1913    pub tls_identity: Option<TlsIdentity>,
1914    /// Reference to the AWS connection information to be used for IAM authenitcation and
1915    /// assuming AWS roles.
1916    pub aws_connection: Option<AwsConnectionReference<C>>,
1917}
1918
1919impl<R: ConnectionResolver> IntoInlineConnection<MySqlConnection, R>
1920    for MySqlConnection<ReferencedConnection>
1921{
1922    fn into_inline_connection(self, r: R) -> MySqlConnection {
1923        let MySqlConnection {
1924            host,
1925            port,
1926            user,
1927            password,
1928            tunnel,
1929            tls_mode,
1930            tls_root_cert,
1931            tls_identity,
1932            aws_connection,
1933        } = self;
1934
1935        MySqlConnection {
1936            host,
1937            port,
1938            user,
1939            password,
1940            tunnel: tunnel.into_inline_connection(&r),
1941            tls_mode,
1942            tls_root_cert,
1943            tls_identity,
1944            aws_connection: aws_connection.map(|aws| aws.into_inline_connection(&r)),
1945        }
1946    }
1947}
1948
1949impl<C: ConnectionAccess> MySqlConnection<C> {
1950    fn validate_by_default(&self) -> bool {
1951        true
1952    }
1953}
1954
1955impl MySqlConnection<InlinedConnection> {
1956    pub async fn config(
1957        &self,
1958        secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
1959        storage_configuration: &StorageConfiguration,
1960        in_task: InTask,
1961    ) -> Result<mz_mysql_util::Config, anyhow::Error> {
1962        // TODO(roshan): Set appropriate connection timeouts
1963        let mut opts = mysql_async::OptsBuilder::default()
1964            .ip_or_hostname(&self.host)
1965            .tcp_port(self.port)
1966            .user(Some(&self.user.get_string(in_task, secrets_reader).await?));
1967
1968        if let Some(password) = self.password {
1969            let password = secrets_reader
1970                .read_string_in_task_if(in_task, password)
1971                .await?;
1972            opts = opts.pass(Some(password));
1973        }
1974
1975        // Our `MySqlSslMode` enum matches the official MySQL Client `--ssl-mode` parameter values
1976        // which uses opt-in security features (SSL, CA verification, & Identity verification).
1977        // The mysql_async crate `SslOpts` struct uses an opt-out mechanism for each of these, so
1978        // we need to appropriately disable features to match the intent of each enum value.
1979        let mut ssl_opts = match self.tls_mode {
1980            MySqlSslMode::Disabled => None,
1981            MySqlSslMode::Required => Some(
1982                mysql_async::SslOpts::default()
1983                    .with_danger_accept_invalid_certs(true)
1984                    .with_danger_skip_domain_validation(true),
1985            ),
1986            MySqlSslMode::VerifyCa => {
1987                Some(mysql_async::SslOpts::default().with_danger_skip_domain_validation(true))
1988            }
1989            MySqlSslMode::VerifyIdentity => Some(mysql_async::SslOpts::default()),
1990        };
1991
1992        if matches!(
1993            self.tls_mode,
1994            MySqlSslMode::VerifyCa | MySqlSslMode::VerifyIdentity
1995        ) {
1996            if let Some(tls_root_cert) = &self.tls_root_cert {
1997                let tls_root_cert = tls_root_cert.get_string(in_task, secrets_reader).await?;
1998                ssl_opts = ssl_opts.map(|opts| {
1999                    opts.with_root_certs(vec![tls_root_cert.as_bytes().to_vec().into()])
2000                });
2001            }
2002        }
2003
2004        if let Some(identity) = &self.tls_identity {
2005            let key = secrets_reader
2006                .read_string_in_task_if(in_task, identity.key)
2007                .await?;
2008            let cert = identity.cert.get_string(in_task, secrets_reader).await?;
2009            let mut archive = mz_tls_util::pkcs12der_from_pem(key.as_bytes(), cert.as_bytes())?;
2010            let der = std::mem::take(&mut archive.der);
2011            let pass = std::mem::take(&mut archive.pass);
2012
2013            // Add client identity to SSLOpts
2014            ssl_opts = ssl_opts.map(|opts| {
2015                opts.with_client_identity(Some(
2016                    mysql_async::ClientIdentity::new(der.into()).with_password(pass),
2017                ))
2018            });
2019        }
2020
2021        opts = opts.ssl_opts(ssl_opts);
2022
2023        let tunnel = match &self.tunnel {
2024            Tunnel::Direct => {
2025                // Ensure any host we connect to is resolved to an external address.
2026                let resolved = resolve_address(
2027                    &self.host,
2028                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2029                )
2030                .await?;
2031                mz_mysql_util::TunnelConfig::Direct {
2032                    resolved_ips: Some(resolved),
2033                }
2034            }
2035            Tunnel::Ssh(SshTunnel {
2036                connection_id,
2037                connection,
2038            }) => {
2039                let secret = secrets_reader
2040                    .read_in_task_if(in_task, *connection_id)
2041                    .await?;
2042                let key_pair = SshKeyPair::from_bytes(&secret)?;
2043                // Ensure any ssh-bastion host we connect to is resolved to an external address.
2044                let resolved = resolve_address(
2045                    &connection.host,
2046                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2047                )
2048                .await?;
2049                mz_mysql_util::TunnelConfig::Ssh {
2050                    config: SshTunnelConfig {
2051                        host: resolved
2052                            .iter()
2053                            .map(|a| a.to_string())
2054                            .collect::<BTreeSet<_>>(),
2055                        port: connection.port,
2056                        user: connection.user.clone(),
2057                        key_pair,
2058                    },
2059                }
2060            }
2061            Tunnel::AwsPrivatelink(connection) => {
2062                assert_none!(connection.port);
2063                mz_mysql_util::TunnelConfig::AwsPrivatelink {
2064                    connection_id: connection.connection_id,
2065                }
2066            }
2067        };
2068
2069        let aws_config = match self.aws_connection.as_ref() {
2070            None => None,
2071            Some(aws_ref) => Some(
2072                aws_ref
2073                    .connection
2074                    .load_sdk_config(
2075                        &storage_configuration.connection_context,
2076                        aws_ref.connection_id,
2077                        in_task,
2078                    )
2079                    .await?,
2080            ),
2081        };
2082
2083        Ok(mz_mysql_util::Config::new(
2084            opts,
2085            tunnel,
2086            storage_configuration.parameters.ssh_timeout_config,
2087            in_task,
2088            storage_configuration
2089                .parameters
2090                .mysql_source_timeouts
2091                .clone(),
2092            aws_config,
2093        )?)
2094    }
2095
2096    pub async fn validate(
2097        &self,
2098        _id: CatalogItemId,
2099        storage_configuration: &StorageConfiguration,
2100    ) -> Result<MySqlConn, MySqlConnectionValidationError> {
2101        let config = self
2102            .config(
2103                &storage_configuration.connection_context.secrets_reader,
2104                storage_configuration,
2105                // We are in a normal tokio context during validation, already.
2106                InTask::No,
2107            )
2108            .await?;
2109        let mut conn = config
2110            .connect(
2111                "connection validation",
2112                &storage_configuration.connection_context.ssh_tunnel_manager,
2113            )
2114            .await?;
2115
2116        // Check if the MySQL database is configured to allow row-based consistent GTID replication
2117        let mut setting_errors = vec![];
2118        let gtid_res = mz_mysql_util::ensure_gtid_consistency(&mut conn).await;
2119        let binlog_res = mz_mysql_util::ensure_full_row_binlog_format(&mut conn).await;
2120        let order_res = mz_mysql_util::ensure_replication_commit_order(&mut conn).await;
2121        for res in [gtid_res, binlog_res, order_res] {
2122            match res {
2123                Err(MySqlError::InvalidSystemSetting {
2124                    setting,
2125                    expected,
2126                    actual,
2127                }) => {
2128                    setting_errors.push((setting, expected, actual));
2129                }
2130                Err(err) => Err(err)?,
2131                Ok(()) => {}
2132            }
2133        }
2134        if !setting_errors.is_empty() {
2135            Err(MySqlConnectionValidationError::ReplicationSettingsError(
2136                setting_errors,
2137            ))?;
2138        }
2139
2140        Ok(conn)
2141    }
2142}
2143
2144#[derive(Debug, thiserror::Error)]
2145pub enum MySqlConnectionValidationError {
2146    #[error("Invalid MySQL system replication settings")]
2147    ReplicationSettingsError(Vec<(String, String, String)>),
2148    #[error(transparent)]
2149    Client(#[from] MySqlError),
2150    #[error("{}", .0.display_with_causes())]
2151    Other(#[from] anyhow::Error),
2152}
2153
2154impl MySqlConnectionValidationError {
2155    pub fn detail(&self) -> Option<String> {
2156        match self {
2157            Self::ReplicationSettingsError(settings) => Some(format!(
2158                "Invalid MySQL system replication settings: {}",
2159                itertools::join(
2160                    settings.iter().map(|(setting, expected, actual)| format!(
2161                        "{}: expected {}, got {}",
2162                        setting, expected, actual
2163                    )),
2164                    "; "
2165                )
2166            )),
2167            _ => None,
2168        }
2169    }
2170
2171    pub fn hint(&self) -> Option<String> {
2172        match self {
2173            Self::ReplicationSettingsError(_) => {
2174                Some("Set the necessary MySQL database system settings.".into())
2175            }
2176            _ => None,
2177        }
2178    }
2179}
2180
2181impl<C: ConnectionAccess> AlterCompatible for MySqlConnection<C> {
2182    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2183        let MySqlConnection {
2184            tunnel,
2185            // All non-tunnel options may change arbitrarily
2186            host: _,
2187            port: _,
2188            user: _,
2189            password: _,
2190            tls_mode: _,
2191            tls_root_cert: _,
2192            tls_identity: _,
2193            aws_connection: _,
2194        } = self;
2195
2196        let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
2197
2198        for (compatible, field) in compatibility_checks {
2199            if !compatible {
2200                tracing::warn!(
2201                    "MySqlConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2202                    self,
2203                    other
2204                );
2205
2206                return Err(AlterError { id });
2207            }
2208        }
2209        Ok(())
2210    }
2211}
2212
2213/// Details how to connect to an instance of Microsoft SQL Server.
2214///
2215/// For specifics of connecting to SQL Server for purposes of creating a
2216/// Materialize Source, see [`SqlServerSourceConnection`] which wraps this type.
2217///
2218/// [`SqlServerSourceConnection`]: crate::sources::SqlServerSourceConnection
2219#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2220pub struct SqlServerConnectionDetails<C: ConnectionAccess = InlinedConnection> {
2221    /// The hostname of the server.
2222    pub host: String,
2223    /// The port of the server.
2224    pub port: u16,
2225    /// Database we should connect to.
2226    pub database: String,
2227    /// The username to authenticate as.
2228    pub user: StringOrSecret,
2229    /// Password used for authentication.
2230    pub password: CatalogItemId,
2231    /// A tunnel through which to route traffic.
2232    pub tunnel: Tunnel<C>,
2233    /// Level of encryption to use for the connection.
2234    pub encryption: mz_sql_server_util::config::EncryptionLevel,
2235    /// Certificate validation policy
2236    pub certificate_validation_policy: mz_sql_server_util::config::CertificateValidationPolicy,
2237    /// TLS CA Certifiecate in PEM format
2238    pub tls_root_cert: Option<StringOrSecret>,
2239}
2240
2241impl<C: ConnectionAccess> SqlServerConnectionDetails<C> {
2242    fn validate_by_default(&self) -> bool {
2243        true
2244    }
2245}
2246
2247impl SqlServerConnectionDetails<InlinedConnection> {
2248    /// Attempts to open a connection to the upstream SQL Server instance.
2249    pub async fn validate(
2250        &self,
2251        _id: CatalogItemId,
2252        storage_configuration: &StorageConfiguration,
2253    ) -> Result<mz_sql_server_util::Client, anyhow::Error> {
2254        let config = self
2255            .resolve_config(
2256                &storage_configuration.connection_context.secrets_reader,
2257                storage_configuration,
2258                InTask::No,
2259            )
2260            .await?;
2261        tracing::debug!(?config, "Validating SQL Server connection");
2262
2263        let mut client = mz_sql_server_util::Client::connect(config).await?;
2264
2265        // Ensure the upstream SQL Server instance is configured to allow CDC.
2266        //
2267        // Run all of the checks necessary and collect the errors to provide the best
2268        // guidance as to which system settings need to be enabled.
2269        let mut replication_errors = vec![];
2270        for error in [
2271            mz_sql_server_util::inspect::ensure_database_cdc_enabled(&mut client).await,
2272            mz_sql_server_util::inspect::ensure_snapshot_isolation_enabled(&mut client).await,
2273            mz_sql_server_util::inspect::ensure_sql_server_agent_running(&mut client).await,
2274        ] {
2275            match error {
2276                Err(mz_sql_server_util::SqlServerError::InvalidSystemSetting {
2277                    name,
2278                    expected,
2279                    actual,
2280                }) => replication_errors.push((name, expected, actual)),
2281                Err(other) => Err(other)?,
2282                Ok(()) => (),
2283            }
2284        }
2285        if !replication_errors.is_empty() {
2286            Err(SqlServerConnectionValidationError::ReplicationSettingsError(replication_errors))?;
2287        }
2288
2289        Ok(client)
2290    }
2291
2292    /// Resolve all of the connection details (e.g. read from the [`SecretsReader`])
2293    /// so the returned [`Config`] can be used to open a connection with the
2294    /// upstream system.
2295    ///
2296    /// The provided [`InTask`] argument determines whether any I/O is run in an
2297    /// [`mz_ore::task`] (i.e. a different thread) or directly in the returned
2298    /// future. The main goal here is to prevent running I/O in timely threads.
2299    ///
2300    /// [`Config`]: mz_sql_server_util::Config
2301    pub async fn resolve_config(
2302        &self,
2303        secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
2304        storage_configuration: &StorageConfiguration,
2305        in_task: InTask,
2306    ) -> Result<mz_sql_server_util::Config, anyhow::Error> {
2307        let dyncfg = storage_configuration.config_set();
2308        let mut inner_config = tiberius::Config::new();
2309
2310        // Setup default connection params.
2311        inner_config.host(&self.host);
2312        inner_config.port(self.port);
2313        inner_config.database(self.database.clone());
2314        inner_config.encryption(self.encryption.into());
2315        match self.certificate_validation_policy {
2316            mz_sql_server_util::config::CertificateValidationPolicy::TrustAll => {
2317                inner_config.trust_cert()
2318            }
2319            mz_sql_server_util::config::CertificateValidationPolicy::VerifyCA => {
2320                inner_config.trust_cert_ca_pem(
2321                    self.tls_root_cert
2322                        .as_ref()
2323                        .unwrap()
2324                        .get_string(in_task, secrets_reader)
2325                        .await
2326                        .context("ca certificate")?,
2327                );
2328            }
2329            mz_sql_server_util::config::CertificateValidationPolicy::VerifySystem => (), // no-op
2330        }
2331
2332        inner_config.application_name("materialize");
2333
2334        // Read our auth settings from
2335        let user = self
2336            .user
2337            .get_string(in_task, secrets_reader)
2338            .await
2339            .context("username")?;
2340        let password = secrets_reader
2341            .read_string_in_task_if(in_task, self.password)
2342            .await
2343            .context("password")?;
2344        // TODO(sql_server3): Support other methods of authentication besides
2345        // username and password.
2346        inner_config.authentication(tiberius::AuthMethod::sql_server(user, password));
2347
2348        // Prevent users from probing our internal network ports by trying to
2349        // connect to localhost, or another non-external IP.
2350        let enfoce_external_addresses = ENFORCE_EXTERNAL_ADDRESSES.get(dyncfg);
2351
2352        let tunnel = match &self.tunnel {
2353            Tunnel::Direct => mz_sql_server_util::config::TunnelConfig::Direct,
2354            Tunnel::Ssh(SshTunnel {
2355                connection_id,
2356                connection: ssh_connection,
2357            }) => {
2358                let secret = secrets_reader
2359                    .read_in_task_if(in_task, *connection_id)
2360                    .await
2361                    .context("ssh secret")?;
2362                let key_pair = SshKeyPair::from_bytes(&secret).context("ssh key pair")?;
2363                // Ensure any SSH-bastion host we connect to is resolved to an
2364                // external address.
2365                let addresses = resolve_address(&ssh_connection.host, enfoce_external_addresses)
2366                    .await
2367                    .context("ssh tunnel")?;
2368
2369                let config = SshTunnelConfig {
2370                    host: addresses.into_iter().map(|a| a.to_string()).collect(),
2371                    port: ssh_connection.port,
2372                    user: ssh_connection.user.clone(),
2373                    key_pair,
2374                };
2375                mz_sql_server_util::config::TunnelConfig::Ssh {
2376                    config,
2377                    manager: storage_configuration
2378                        .connection_context
2379                        .ssh_tunnel_manager
2380                        .clone(),
2381                    timeout: storage_configuration.parameters.ssh_timeout_config.clone(),
2382                    host: self.host.clone(),
2383                    port: self.port,
2384                }
2385            }
2386            Tunnel::AwsPrivatelink(private_link_connection) => {
2387                assert_none!(private_link_connection.port);
2388                mz_sql_server_util::config::TunnelConfig::AwsPrivatelink {
2389                    connection_id: private_link_connection.connection_id,
2390                    port: self.port,
2391                }
2392            }
2393        };
2394
2395        Ok(mz_sql_server_util::Config::new(
2396            inner_config,
2397            tunnel,
2398            in_task,
2399        ))
2400    }
2401}
2402
2403#[derive(Debug, Clone, thiserror::Error)]
2404pub enum SqlServerConnectionValidationError {
2405    #[error("Invalid SQL Server system replication settings")]
2406    ReplicationSettingsError(Vec<(String, String, String)>),
2407}
2408
2409impl SqlServerConnectionValidationError {
2410    pub fn detail(&self) -> Option<String> {
2411        match self {
2412            Self::ReplicationSettingsError(settings) => Some(format!(
2413                "Invalid SQL Server system replication settings: {}",
2414                itertools::join(
2415                    settings.iter().map(|(setting, expected, actual)| format!(
2416                        "{}: expected {}, got {}",
2417                        setting, expected, actual
2418                    )),
2419                    "; "
2420                )
2421            )),
2422        }
2423    }
2424
2425    pub fn hint(&self) -> Option<String> {
2426        match self {
2427            _ => None,
2428        }
2429    }
2430}
2431
2432impl<R: ConnectionResolver> IntoInlineConnection<SqlServerConnectionDetails, R>
2433    for SqlServerConnectionDetails<ReferencedConnection>
2434{
2435    fn into_inline_connection(self, r: R) -> SqlServerConnectionDetails {
2436        let SqlServerConnectionDetails {
2437            host,
2438            port,
2439            database,
2440            user,
2441            password,
2442            tunnel,
2443            encryption,
2444            certificate_validation_policy,
2445            tls_root_cert,
2446        } = self;
2447
2448        SqlServerConnectionDetails {
2449            host,
2450            port,
2451            database,
2452            user,
2453            password,
2454            tunnel: tunnel.into_inline_connection(&r),
2455            encryption,
2456            certificate_validation_policy,
2457            tls_root_cert,
2458        }
2459    }
2460}
2461
2462impl<C: ConnectionAccess> AlterCompatible for SqlServerConnectionDetails<C> {
2463    fn alter_compatible(
2464        &self,
2465        id: mz_repr::GlobalId,
2466        other: &Self,
2467    ) -> Result<(), crate::controller::AlterError> {
2468        let SqlServerConnectionDetails {
2469            tunnel,
2470            // TODO(sql_server2): Figure out how these variables are allowed to change.
2471            host: _,
2472            port: _,
2473            database: _,
2474            user: _,
2475            password: _,
2476            encryption: _,
2477            certificate_validation_policy: _,
2478            tls_root_cert: _,
2479        } = self;
2480
2481        let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
2482
2483        for (compatible, field) in compatibility_checks {
2484            if !compatible {
2485                tracing::warn!(
2486                    "SqlServerConnectionDetails incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2487                    self,
2488                    other
2489                );
2490
2491                return Err(AlterError { id });
2492            }
2493        }
2494        Ok(())
2495    }
2496}
2497
2498/// A connection to an SSH tunnel.
2499#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2500pub struct SshConnection {
2501    pub host: String,
2502    pub port: u16,
2503    pub user: String,
2504}
2505
2506use self::inline::{
2507    ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
2508    ReferencedConnection,
2509};
2510
2511impl AlterCompatible for SshConnection {
2512    fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
2513        // Every element of the SSH connection is configurable.
2514        Ok(())
2515    }
2516}
2517
2518/// Specifies an AWS PrivateLink service for a [`Tunnel`].
2519#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2520pub struct AwsPrivatelink {
2521    /// The ID of the connection to the AWS PrivateLink service.
2522    pub connection_id: CatalogItemId,
2523    // The availability zone to use when connecting to the AWS PrivateLink service.
2524    pub availability_zone: Option<String>,
2525    /// The port to use when connecting to the AWS PrivateLink service, if
2526    /// different from the port in [`KafkaBroker::address`].
2527    pub port: Option<u16>,
2528}
2529
2530impl AlterCompatible for AwsPrivatelink {
2531    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2532        let AwsPrivatelink {
2533            connection_id,
2534            availability_zone: _,
2535            port: _,
2536        } = self;
2537
2538        let compatibility_checks = [(connection_id == &other.connection_id, "connection_id")];
2539
2540        for (compatible, field) in compatibility_checks {
2541            if !compatible {
2542                tracing::warn!(
2543                    "AwsPrivatelink incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2544                    self,
2545                    other
2546                );
2547
2548                return Err(AlterError { id });
2549            }
2550        }
2551
2552        Ok(())
2553    }
2554}
2555
2556/// Specifies an SSH tunnel connection.
2557#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2558pub struct SshTunnel<C: ConnectionAccess = InlinedConnection> {
2559    /// id of the ssh connection
2560    pub connection_id: CatalogItemId,
2561    /// ssh connection object
2562    pub connection: C::Ssh,
2563}
2564
2565impl<R: ConnectionResolver> IntoInlineConnection<SshTunnel, R> for SshTunnel<ReferencedConnection> {
2566    fn into_inline_connection(self, r: R) -> SshTunnel {
2567        let SshTunnel {
2568            connection,
2569            connection_id,
2570        } = self;
2571
2572        SshTunnel {
2573            connection: r.resolve_connection(connection).unwrap_ssh(),
2574            connection_id,
2575        }
2576    }
2577}
2578
2579impl SshTunnel<InlinedConnection> {
2580    /// Like [`SshTunnelConfig::connect`], but the SSH key is loaded from a
2581    /// secret.
2582    async fn connect(
2583        &self,
2584        storage_configuration: &StorageConfiguration,
2585        remote_host: &str,
2586        remote_port: u16,
2587        in_task: InTask,
2588    ) -> Result<ManagedSshTunnelHandle, anyhow::Error> {
2589        // Ensure any ssh-bastion host we connect to is resolved to an external address.
2590        let resolved = resolve_address(
2591            &self.connection.host,
2592            ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2593        )
2594        .await?;
2595        storage_configuration
2596            .connection_context
2597            .ssh_tunnel_manager
2598            .connect(
2599                SshTunnelConfig {
2600                    host: resolved
2601                        .iter()
2602                        .map(|a| a.to_string())
2603                        .collect::<BTreeSet<_>>(),
2604                    port: self.connection.port,
2605                    user: self.connection.user.clone(),
2606                    key_pair: SshKeyPair::from_bytes(
2607                        &storage_configuration
2608                            .connection_context
2609                            .secrets_reader
2610                            .read_in_task_if(in_task, self.connection_id)
2611                            .await?,
2612                    )?,
2613                },
2614                remote_host,
2615                remote_port,
2616                storage_configuration.parameters.ssh_timeout_config,
2617                in_task,
2618            )
2619            .await
2620    }
2621}
2622
2623impl<C: ConnectionAccess> AlterCompatible for SshTunnel<C> {
2624    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2625        let SshTunnel {
2626            connection_id,
2627            connection,
2628        } = self;
2629
2630        let compatibility_checks = [
2631            (connection_id == &other.connection_id, "connection_id"),
2632            (
2633                connection.alter_compatible(id, &other.connection).is_ok(),
2634                "connection",
2635            ),
2636        ];
2637
2638        for (compatible, field) in compatibility_checks {
2639            if !compatible {
2640                tracing::warn!(
2641                    "SshTunnel incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2642                    self,
2643                    other
2644                );
2645
2646                return Err(AlterError { id });
2647            }
2648        }
2649
2650        Ok(())
2651    }
2652}
2653
2654impl SshConnection {
2655    #[allow(clippy::unused_async)]
2656    async fn validate(
2657        &self,
2658        id: CatalogItemId,
2659        storage_configuration: &StorageConfiguration,
2660    ) -> Result<(), anyhow::Error> {
2661        let secret = storage_configuration
2662            .connection_context
2663            .secrets_reader
2664            .read_in_task_if(
2665                // We are in a normal tokio context during validation, already.
2666                InTask::No,
2667                id,
2668            )
2669            .await?;
2670        let key_pair = SshKeyPair::from_bytes(&secret)?;
2671
2672        // Ensure any ssh-bastion host we connect to is resolved to an external address.
2673        let resolved = resolve_address(
2674            &self.host,
2675            ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2676        )
2677        .await?;
2678
2679        let config = SshTunnelConfig {
2680            host: resolved
2681                .iter()
2682                .map(|a| a.to_string())
2683                .collect::<BTreeSet<_>>(),
2684            port: self.port,
2685            user: self.user.clone(),
2686            key_pair,
2687        };
2688        // Note that we do NOT use the `SshTunnelManager` here, as we want to validate that we
2689        // can actually create a new connection to the ssh bastion, without tunneling.
2690        config
2691            .validate(storage_configuration.parameters.ssh_timeout_config)
2692            .await
2693    }
2694
2695    fn validate_by_default(&self) -> bool {
2696        false
2697    }
2698}
2699
2700impl AwsPrivatelinkConnection {
2701    #[allow(clippy::unused_async)]
2702    async fn validate(
2703        &self,
2704        id: CatalogItemId,
2705        storage_configuration: &StorageConfiguration,
2706    ) -> Result<(), anyhow::Error> {
2707        let Some(ref cloud_resource_reader) = storage_configuration
2708            .connection_context
2709            .cloud_resource_reader
2710        else {
2711            return Err(anyhow!("AWS PrivateLink connections are unsupported"));
2712        };
2713
2714        // No need to optionally run this in a task, as we are just validating from envd.
2715        let status = cloud_resource_reader.read(id).await?;
2716
2717        let availability = status
2718            .conditions
2719            .as_ref()
2720            .and_then(|conditions| conditions.iter().find(|c| c.type_ == "Available"));
2721
2722        match availability {
2723            Some(condition) if condition.status == "True" => Ok(()),
2724            Some(condition) => Err(anyhow!("{}", condition.message)),
2725            None => Err(anyhow!("Endpoint availability is unknown")),
2726        }
2727    }
2728
2729    fn validate_by_default(&self) -> bool {
2730        false
2731    }
2732}