Skip to main content

mz_storage_types/
connections.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Connection types.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::net::SocketAddr;
15use std::sync::Arc;
16
17use anyhow::{Context, anyhow};
18use async_trait::async_trait;
19use aws_credential_types::provider::ProvideCredentials;
20use iceberg::Catalog;
21use iceberg::CatalogBuilder;
22use iceberg::io::{
23    AwsCredential, AwsCredentialLoad, CustomAwsCredentialLoader, S3_ACCESS_KEY_ID,
24    S3_DISABLE_EC2_METADATA, S3_REGION, S3_SECRET_ACCESS_KEY,
25};
26use iceberg_catalog_rest::{
27    REST_CATALOG_PROP_URI, REST_CATALOG_PROP_WAREHOUSE, RestCatalogBuilder,
28};
29use itertools::Itertools;
30use mz_ccsr::tls::{Certificate, Identity};
31use mz_cloud_resources::{AwsExternalIdPrefix, CloudResourceReader, vpc_endpoint_host};
32use mz_dyncfg::ConfigSet;
33use mz_kafka_util::client::{
34    BrokerAddr, BrokerRewrite, MzClientContext, MzKafkaError, TunnelConfig, TunnelingClientContext,
35};
36use mz_mysql_util::{MySqlConn, MySqlError};
37use mz_ore::assert_none;
38use mz_ore::error::ErrorExt;
39use mz_ore::future::{InTask, OreFutureExt};
40use mz_ore::netio::resolve_address;
41use mz_ore::num::NonNeg;
42use mz_repr::{CatalogItemId, GlobalId};
43use mz_secrets::SecretsReader;
44use mz_ssh_util::keys::SshKeyPair;
45use mz_ssh_util::tunnel::SshTunnelConfig;
46use mz_ssh_util::tunnel_manager::{ManagedSshTunnelHandle, SshTunnelManager};
47use mz_tls_util::Pkcs12Archive;
48use mz_tracing::CloneableEnvFilter;
49use rdkafka::ClientContext;
50use rdkafka::config::FromClientConfigAndContext;
51use rdkafka::consumer::{BaseConsumer, Consumer};
52use regex::Regex;
53use serde::{Deserialize, Deserializer, Serialize};
54use tokio::net;
55use tokio::runtime::Handle;
56use tokio_postgres::config::SslMode;
57use tracing::{debug, warn};
58use url::Url;
59
60use crate::AlterCompatible;
61use crate::configuration::StorageConfiguration;
62use crate::connections::aws::{
63    AwsAuth, AwsConnection, AwsConnectionReference, AwsConnectionValidationError,
64};
65use crate::connections::string_or_secret::StringOrSecret;
66use crate::controller::AlterError;
67use crate::dyncfgs::{
68    ENFORCE_EXTERNAL_ADDRESSES, KAFKA_CLIENT_ID_ENRICHMENT_RULES,
69    KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM, KAFKA_RECONNECT_BACKOFF,
70    KAFKA_RECONNECT_BACKOFF_MAX, KAFKA_RETRY_BACKOFF, KAFKA_RETRY_BACKOFF_MAX,
71};
72use crate::errors::{ContextCreationError, CsrConnectError};
73
74pub mod aws;
75pub mod inline;
76pub mod string_or_secret;
77
78const REST_CATALOG_PROP_SCOPE: &str = "scope";
79const REST_CATALOG_PROP_CREDENTIAL: &str = "credential";
80
81/// A credential loader that wraps an aws-sdk-rust credentials provider for use with
82/// iceberg/OpenDAL. This allows us to provide refreshable credentials from the AWS SDK
83/// credential chain (including the full assume role chain) to OpenDAL's S3 implementation.
84///
85/// We use this instead of OpenDAL's built-in assume role support because Materialize
86/// has a runtime-defined credential chain (ambient → jump role → user role with external ID)
87/// that can't be expressed via OpenDAL's static configuration properties.
88struct AwsSdkCredentialLoader {
89    /// The underlying AWS SDK credentials provider. For assume role auth, this provider
90    /// already handles the full chain: ambient creds -> jump role -> user role.
91    provider: aws_credential_types::provider::SharedCredentialsProvider,
92}
93
94impl AwsSdkCredentialLoader {
95    fn new(provider: aws_credential_types::provider::SharedCredentialsProvider) -> Self {
96        Self { provider }
97    }
98}
99
100#[async_trait]
101impl AwsCredentialLoad for AwsSdkCredentialLoader {
102    async fn load_credential(
103        &self,
104        _client: reqwest::Client,
105    ) -> anyhow::Result<Option<AwsCredential>> {
106        let creds = self
107            .provider
108            .provide_credentials()
109            .await
110            .context("failed to load AWS credentials from SDK provider")?;
111
112        Ok(Some(AwsCredential {
113            access_key_id: creds.access_key_id().to_string(),
114            secret_access_key: creds.secret_access_key().to_string(),
115            session_token: creds.session_token().map(|s| s.to_string()),
116            expires_in: creds.expiry().map(|t| t.into()),
117        }))
118    }
119}
120
121/// An extension trait for [`SecretsReader`]
122#[async_trait::async_trait]
123trait SecretsReaderExt {
124    /// `SecretsReader::read`, but optionally run in a task.
125    async fn read_in_task_if(
126        &self,
127        in_task: InTask,
128        id: CatalogItemId,
129    ) -> Result<Vec<u8>, anyhow::Error>;
130
131    /// `SecretsReader::read_string`, but optionally run in a task.
132    async fn read_string_in_task_if(
133        &self,
134        in_task: InTask,
135        id: CatalogItemId,
136    ) -> Result<String, anyhow::Error>;
137}
138
139#[async_trait::async_trait]
140impl SecretsReaderExt for Arc<dyn SecretsReader> {
141    async fn read_in_task_if(
142        &self,
143        in_task: InTask,
144        id: CatalogItemId,
145    ) -> Result<Vec<u8>, anyhow::Error> {
146        let sr = Arc::clone(self);
147        async move { sr.read(id).await }
148            .run_in_task_if(in_task, || "secrets_reader_read".to_string())
149            .await
150    }
151    async fn read_string_in_task_if(
152        &self,
153        in_task: InTask,
154        id: CatalogItemId,
155    ) -> Result<String, anyhow::Error> {
156        let sr = Arc::clone(self);
157        async move { sr.read_string(id).await }
158            .run_in_task_if(in_task, || "secrets_reader_read".to_string())
159            .await
160    }
161}
162
163/// Extra context to pass through when instantiating a connection for a source
164/// or sink.
165///
166/// Should be kept cheaply cloneable.
167#[derive(Debug, Clone)]
168pub struct ConnectionContext {
169    /// An opaque identifier for the environment in which this process is
170    /// running.
171    ///
172    /// The storage layer is intentionally unaware of the structure within this
173    /// identifier. Higher layers of the stack can make use of that structure,
174    /// but the storage layer should be oblivious to it.
175    pub environment_id: String,
176    /// The level for librdkafka's logs.
177    pub librdkafka_log_level: tracing::Level,
178    /// A prefix for an external ID to use for all AWS AssumeRole operations.
179    pub aws_external_id_prefix: Option<AwsExternalIdPrefix>,
180    /// The ARN for a Materialize-controlled role to assume before assuming
181    /// a customer's requested role for an AWS connection.
182    pub aws_connection_role_arn: Option<String>,
183    /// A secrets reader.
184    pub secrets_reader: Arc<dyn SecretsReader>,
185    /// A cloud resource reader, if supported in this configuration.
186    pub cloud_resource_reader: Option<Arc<dyn CloudResourceReader>>,
187    /// A manager for SSH tunnels.
188    pub ssh_tunnel_manager: SshTunnelManager,
189}
190
191impl ConnectionContext {
192    /// Constructs a new connection context from command line arguments.
193    ///
194    /// **WARNING:** it is critical for security that the `aws_external_id` be
195    /// provided by the operator of the Materialize service (i.e., via a CLI
196    /// argument or environment variable) and not the end user of Materialize
197    /// (e.g., via a configuration option in a SQL statement). See
198    /// [`AwsExternalIdPrefix`] for details.
199    pub fn from_cli_args(
200        environment_id: String,
201        startup_log_level: &CloneableEnvFilter,
202        aws_external_id_prefix: Option<AwsExternalIdPrefix>,
203        aws_connection_role_arn: Option<String>,
204        secrets_reader: Arc<dyn SecretsReader>,
205        cloud_resource_reader: Option<Arc<dyn CloudResourceReader>>,
206    ) -> ConnectionContext {
207        ConnectionContext {
208            environment_id,
209            librdkafka_log_level: mz_ore::tracing::crate_level(
210                &startup_log_level.clone().into(),
211                "librdkafka",
212            ),
213            aws_external_id_prefix,
214            aws_connection_role_arn,
215            secrets_reader,
216            cloud_resource_reader,
217            ssh_tunnel_manager: SshTunnelManager::default(),
218        }
219    }
220
221    /// Constructs a new connection context for usage in tests.
222    pub fn for_tests(secrets_reader: Arc<dyn SecretsReader>) -> ConnectionContext {
223        ConnectionContext {
224            environment_id: "test-environment-id".into(),
225            librdkafka_log_level: tracing::Level::INFO,
226            aws_external_id_prefix: Some(
227                AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable(
228                    "test-aws-external-id-prefix",
229                )
230                .expect("infallible"),
231            ),
232            aws_connection_role_arn: Some(
233                "arn:aws:iam::123456789000:role/MaterializeConnection".into(),
234            ),
235            secrets_reader,
236            cloud_resource_reader: None,
237            ssh_tunnel_manager: SshTunnelManager::default(),
238        }
239    }
240}
241
242#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
243pub enum Connection<C: ConnectionAccess = InlinedConnection> {
244    Kafka(KafkaConnection<C>),
245    Csr(CsrConnection<C>),
246    Postgres(PostgresConnection<C>),
247    Ssh(SshConnection),
248    Aws(AwsConnection),
249    AwsPrivatelink(AwsPrivatelinkConnection),
250    MySql(MySqlConnection<C>),
251    SqlServer(SqlServerConnectionDetails<C>),
252    IcebergCatalog(IcebergCatalogConnection<C>),
253}
254
255impl<R: ConnectionResolver> IntoInlineConnection<Connection, R>
256    for Connection<ReferencedConnection>
257{
258    fn into_inline_connection(self, r: R) -> Connection {
259        match self {
260            Connection::Kafka(kafka) => Connection::Kafka(kafka.into_inline_connection(r)),
261            Connection::Csr(csr) => Connection::Csr(csr.into_inline_connection(r)),
262            Connection::Postgres(pg) => Connection::Postgres(pg.into_inline_connection(r)),
263            Connection::Ssh(ssh) => Connection::Ssh(ssh),
264            Connection::Aws(aws) => Connection::Aws(aws),
265            Connection::AwsPrivatelink(awspl) => Connection::AwsPrivatelink(awspl),
266            Connection::MySql(mysql) => Connection::MySql(mysql.into_inline_connection(r)),
267            Connection::SqlServer(sql_server) => {
268                Connection::SqlServer(sql_server.into_inline_connection(r))
269            }
270            Connection::IcebergCatalog(iceberg) => {
271                Connection::IcebergCatalog(iceberg.into_inline_connection(r))
272            }
273        }
274    }
275}
276
277impl<C: ConnectionAccess> Connection<C> {
278    /// Whether this connection should be validated by default on creation.
279    pub fn validate_by_default(&self) -> bool {
280        match self {
281            Connection::Kafka(conn) => conn.validate_by_default(),
282            Connection::Csr(conn) => conn.validate_by_default(),
283            Connection::Postgres(conn) => conn.validate_by_default(),
284            Connection::Ssh(conn) => conn.validate_by_default(),
285            Connection::Aws(conn) => conn.validate_by_default(),
286            Connection::AwsPrivatelink(conn) => conn.validate_by_default(),
287            Connection::MySql(conn) => conn.validate_by_default(),
288            Connection::SqlServer(conn) => conn.validate_by_default(),
289            Connection::IcebergCatalog(conn) => conn.validate_by_default(),
290        }
291    }
292}
293
294impl Connection<InlinedConnection> {
295    /// Validates this connection by attempting to connect to the upstream system.
296    pub async fn validate(
297        &self,
298        id: CatalogItemId,
299        storage_configuration: &StorageConfiguration,
300    ) -> Result<(), ConnectionValidationError> {
301        match self {
302            Connection::Kafka(conn) => conn.validate(id, storage_configuration).await?,
303            Connection::Csr(conn) => conn.validate(id, storage_configuration).await?,
304            Connection::Postgres(conn) => {
305                conn.validate(id, storage_configuration).await?;
306            }
307            Connection::Ssh(conn) => conn.validate(id, storage_configuration).await?,
308            Connection::Aws(conn) => conn.validate(id, storage_configuration).await?,
309            Connection::AwsPrivatelink(conn) => conn.validate(id, storage_configuration).await?,
310            Connection::MySql(conn) => {
311                conn.validate(id, storage_configuration).await?;
312            }
313            Connection::SqlServer(conn) => {
314                conn.validate(id, storage_configuration).await?;
315            }
316            Connection::IcebergCatalog(conn) => conn.validate(id, storage_configuration).await?,
317        }
318        Ok(())
319    }
320
321    pub fn unwrap_kafka(self) -> <InlinedConnection as ConnectionAccess>::Kafka {
322        match self {
323            Self::Kafka(conn) => conn,
324            o => unreachable!("{o:?} is not a Kafka connection"),
325        }
326    }
327
328    pub fn unwrap_pg(self) -> <InlinedConnection as ConnectionAccess>::Pg {
329        match self {
330            Self::Postgres(conn) => conn,
331            o => unreachable!("{o:?} is not a Postgres connection"),
332        }
333    }
334
335    pub fn unwrap_mysql(self) -> <InlinedConnection as ConnectionAccess>::MySql {
336        match self {
337            Self::MySql(conn) => conn,
338            o => unreachable!("{o:?} is not a MySQL connection"),
339        }
340    }
341
342    pub fn unwrap_sql_server(self) -> <InlinedConnection as ConnectionAccess>::SqlServer {
343        match self {
344            Self::SqlServer(conn) => conn,
345            o => unreachable!("{o:?} is not a SQL Server connection"),
346        }
347    }
348
349    pub fn unwrap_aws(self) -> <InlinedConnection as ConnectionAccess>::Aws {
350        match self {
351            Self::Aws(conn) => conn,
352            o => unreachable!("{o:?} is not an AWS connection"),
353        }
354    }
355
356    pub fn unwrap_ssh(self) -> <InlinedConnection as ConnectionAccess>::Ssh {
357        match self {
358            Self::Ssh(conn) => conn,
359            o => unreachable!("{o:?} is not an SSH connection"),
360        }
361    }
362
363    pub fn unwrap_csr(self) -> <InlinedConnection as ConnectionAccess>::Csr {
364        match self {
365            Self::Csr(conn) => conn,
366            o => unreachable!("{o:?} is not a Kafka connection"),
367        }
368    }
369
370    pub fn unwrap_iceberg_catalog(self) -> <InlinedConnection as ConnectionAccess>::IcebergCatalog {
371        match self {
372            Self::IcebergCatalog(conn) => conn,
373            o => unreachable!("{o:?} is not an Iceberg catalog connection"),
374        }
375    }
376}
377
378/// An error returned by [`Connection::validate`].
379#[derive(thiserror::Error, Debug)]
380pub enum ConnectionValidationError {
381    #[error(transparent)]
382    Postgres(#[from] PostgresConnectionValidationError),
383    #[error(transparent)]
384    MySql(#[from] MySqlConnectionValidationError),
385    #[error(transparent)]
386    SqlServer(#[from] SqlServerConnectionValidationError),
387    #[error(transparent)]
388    Aws(#[from] AwsConnectionValidationError),
389    #[error("{}", .0.display_with_causes())]
390    Other(#[from] anyhow::Error),
391}
392
393impl ConnectionValidationError {
394    /// Reports additional details about the error, if any are available.
395    pub fn detail(&self) -> Option<String> {
396        match self {
397            ConnectionValidationError::Postgres(e) => e.detail(),
398            ConnectionValidationError::MySql(e) => e.detail(),
399            ConnectionValidationError::SqlServer(e) => e.detail(),
400            ConnectionValidationError::Aws(e) => e.detail(),
401            ConnectionValidationError::Other(_) => None,
402        }
403    }
404
405    /// Reports a hint for the user about how the error could be fixed.
406    pub fn hint(&self) -> Option<String> {
407        match self {
408            ConnectionValidationError::Postgres(e) => e.hint(),
409            ConnectionValidationError::MySql(e) => e.hint(),
410            ConnectionValidationError::SqlServer(e) => e.hint(),
411            ConnectionValidationError::Aws(e) => e.hint(),
412            ConnectionValidationError::Other(_) => None,
413        }
414    }
415}
416
417impl<C: ConnectionAccess> AlterCompatible for Connection<C> {
418    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
419        match (self, other) {
420            (Self::Aws(s), Self::Aws(o)) => s.alter_compatible(id, o),
421            (Self::AwsPrivatelink(s), Self::AwsPrivatelink(o)) => s.alter_compatible(id, o),
422            (Self::Ssh(s), Self::Ssh(o)) => s.alter_compatible(id, o),
423            (Self::Csr(s), Self::Csr(o)) => s.alter_compatible(id, o),
424            (Self::Kafka(s), Self::Kafka(o)) => s.alter_compatible(id, o),
425            (Self::Postgres(s), Self::Postgres(o)) => s.alter_compatible(id, o),
426            (Self::MySql(s), Self::MySql(o)) => s.alter_compatible(id, o),
427            _ => {
428                tracing::warn!(
429                    "Connection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
430                    self,
431                    other
432                );
433                Err(AlterError { id })
434            }
435        }
436    }
437}
438
439#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
440pub struct RestIcebergCatalog {
441    /// For REST catalogs, the oauth2 credential in a `CLIENT_ID:CLIENT_SECRET` format
442    pub credential: StringOrSecret,
443    /// The oauth2 scope for REST catalogs
444    pub scope: Option<String>,
445    /// The warehouse for REST catalogs
446    pub warehouse: Option<String>,
447}
448
449#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
450pub struct S3TablesRestIcebergCatalog<C: ConnectionAccess = InlinedConnection> {
451    /// The AWS connection details, for s3tables
452    pub aws_connection: AwsConnectionReference<C>,
453    /// The warehouse for s3tables
454    pub warehouse: String,
455}
456
457impl<R: ConnectionResolver> IntoInlineConnection<S3TablesRestIcebergCatalog, R>
458    for S3TablesRestIcebergCatalog<ReferencedConnection>
459{
460    fn into_inline_connection(self, r: R) -> S3TablesRestIcebergCatalog {
461        S3TablesRestIcebergCatalog {
462            aws_connection: self.aws_connection.into_inline_connection(&r),
463            warehouse: self.warehouse,
464        }
465    }
466}
467
468#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
469pub enum IcebergCatalogType {
470    Rest,
471    S3TablesRest,
472}
473
474#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
475pub enum IcebergCatalogImpl<C: ConnectionAccess = InlinedConnection> {
476    Rest(RestIcebergCatalog),
477    S3TablesRest(S3TablesRestIcebergCatalog<C>),
478}
479
480impl<R: ConnectionResolver> IntoInlineConnection<IcebergCatalogImpl, R>
481    for IcebergCatalogImpl<ReferencedConnection>
482{
483    fn into_inline_connection(self, r: R) -> IcebergCatalogImpl {
484        match self {
485            IcebergCatalogImpl::Rest(rest) => IcebergCatalogImpl::Rest(rest),
486            IcebergCatalogImpl::S3TablesRest(s3tables) => {
487                IcebergCatalogImpl::S3TablesRest(s3tables.into_inline_connection(r))
488            }
489        }
490    }
491}
492
493#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
494pub struct IcebergCatalogConnection<C: ConnectionAccess = InlinedConnection> {
495    /// The catalog impl impl of that catalog
496    pub catalog: IcebergCatalogImpl<C>,
497    /// Where the catalog is located
498    pub uri: reqwest::Url,
499}
500
501impl AlterCompatible for IcebergCatalogConnection {
502    fn alter_compatible(&self, id: GlobalId, _other: &Self) -> Result<(), AlterError> {
503        Err(AlterError { id })
504    }
505}
506
507impl<R: ConnectionResolver> IntoInlineConnection<IcebergCatalogConnection, R>
508    for IcebergCatalogConnection<ReferencedConnection>
509{
510    fn into_inline_connection(self, r: R) -> IcebergCatalogConnection {
511        IcebergCatalogConnection {
512            catalog: self.catalog.into_inline_connection(&r),
513            uri: self.uri,
514        }
515    }
516}
517
518impl<C: ConnectionAccess> IcebergCatalogConnection<C> {
519    fn validate_by_default(&self) -> bool {
520        true
521    }
522}
523
524impl IcebergCatalogConnection<InlinedConnection> {
525    pub async fn connect(
526        &self,
527        storage_configuration: &StorageConfiguration,
528        in_task: InTask,
529    ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
530        match self.catalog {
531            IcebergCatalogImpl::S3TablesRest(ref s3tables) => {
532                self.connect_s3tables(s3tables, storage_configuration, in_task)
533                    .await
534            }
535            IcebergCatalogImpl::Rest(ref rest) => {
536                self.connect_rest(rest, storage_configuration, in_task)
537                    .await
538            }
539        }
540    }
541
542    pub fn catalog_type(&self) -> IcebergCatalogType {
543        match self.catalog {
544            IcebergCatalogImpl::S3TablesRest(_) => IcebergCatalogType::S3TablesRest,
545            IcebergCatalogImpl::Rest(_) => IcebergCatalogType::Rest,
546        }
547    }
548
549    pub fn s3tables_catalog(&self) -> Option<&S3TablesRestIcebergCatalog> {
550        match &self.catalog {
551            IcebergCatalogImpl::S3TablesRest(s3tables) => Some(s3tables),
552            _ => None,
553        }
554    }
555
556    pub fn rest_catalog(&self) -> Option<&RestIcebergCatalog> {
557        match &self.catalog {
558            IcebergCatalogImpl::Rest(rest) => Some(rest),
559            _ => None,
560        }
561    }
562
563    async fn connect_s3tables(
564        &self,
565        s3tables: &S3TablesRestIcebergCatalog,
566        storage_configuration: &StorageConfiguration,
567        in_task: InTask,
568    ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
569        let secret_reader = &storage_configuration.connection_context.secrets_reader;
570        let aws_ref = &s3tables.aws_connection;
571        let aws_config = aws_ref
572            .connection
573            .load_sdk_config(
574                &storage_configuration.connection_context,
575                aws_ref.connection_id,
576                in_task,
577            )
578            .await?;
579
580        let aws_region = aws_ref
581            .connection
582            .region
583            .clone()
584            .unwrap_or_else(|| "us-east-1".to_string());
585
586        let mut props = vec![
587            (S3_REGION.to_string(), aws_region),
588            (S3_DISABLE_EC2_METADATA.to_string(), "true".to_string()),
589            (
590                REST_CATALOG_PROP_WAREHOUSE.to_string(),
591                s3tables.warehouse.clone(),
592            ),
593            (REST_CATALOG_PROP_URI.to_string(), self.uri.to_string()),
594        ];
595
596        let aws_auth = aws_ref.connection.auth.clone();
597
598        if let AwsAuth::Credentials(creds) = &aws_auth {
599            props.push((
600                S3_ACCESS_KEY_ID.to_string(),
601                creds
602                    .access_key_id
603                    .get_string(in_task, secret_reader)
604                    .await?,
605            ));
606            props.push((
607                S3_SECRET_ACCESS_KEY.to_string(),
608                secret_reader.read_string(creds.secret_access_key).await?,
609            ));
610        }
611
612        // Build the catalog with aws_config for REST API signing.
613        // For AssumeRole auth, we also add a FileIO extension so OpenDAL can
614        // use our credential chain for S3 object access.
615        let catalog = RestCatalogBuilder::default()
616            .with_aws_config(aws_config.clone())
617            .load("IcebergCatalog", props.into_iter().collect())
618            .await?;
619
620        let catalog = if matches!(aws_auth, AwsAuth::AssumeRole(_)) {
621            let credentials_provider = aws_config
622                .credentials_provider()
623                .ok_or_else(|| anyhow!("aws_config missing credentials provider"))?;
624            let file_io_loader = CustomAwsCredentialLoader::new(Arc::new(
625                AwsSdkCredentialLoader::new(credentials_provider),
626            ));
627            catalog.with_file_io_extension(file_io_loader)
628        } else {
629            catalog
630        };
631
632        Ok(Arc::new(catalog))
633    }
634
635    async fn connect_rest(
636        &self,
637        rest: &RestIcebergCatalog,
638        storage_configuration: &StorageConfiguration,
639        in_task: InTask,
640    ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
641        let mut props = BTreeMap::from([(
642            REST_CATALOG_PROP_URI.to_string(),
643            self.uri.to_string().clone(),
644        )]);
645
646        if let Some(warehouse) = &rest.warehouse {
647            props.insert(REST_CATALOG_PROP_WAREHOUSE.to_string(), warehouse.clone());
648        }
649
650        let credential = rest
651            .credential
652            .get_string(
653                in_task,
654                &storage_configuration.connection_context.secrets_reader,
655            )
656            .await
657            .map_err(|e| anyhow!("failed to read Iceberg catalog credential: {e}"))?;
658        props.insert(REST_CATALOG_PROP_CREDENTIAL.to_string(), credential);
659
660        if let Some(scope) = &rest.scope {
661            props.insert(REST_CATALOG_PROP_SCOPE.to_string(), scope.clone());
662        }
663
664        let catalog = RestCatalogBuilder::default()
665            .load("IcebergCatalog", props.into_iter().collect())
666            .await
667            .map_err(|e| anyhow!("failed to create Iceberg catalog: {e}"))?;
668        Ok(Arc::new(catalog))
669    }
670
671    async fn validate(
672        &self,
673        _id: CatalogItemId,
674        storage_configuration: &StorageConfiguration,
675    ) -> Result<(), ConnectionValidationError> {
676        let catalog = self
677            .connect(storage_configuration, InTask::No)
678            .await
679            .map_err(|e| {
680                ConnectionValidationError::Other(anyhow!("failed to connect to catalog: {e}"))
681            })?;
682
683        // If we can list namespaces, the connection is valid.
684        catalog.list_namespaces(None).await.map_err(|e| {
685            ConnectionValidationError::Other(anyhow!("failed to list namespaces: {e}"))
686        })?;
687
688        Ok(())
689    }
690}
691
692#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
693pub struct AwsPrivatelinkConnection {
694    pub service_name: String,
695    pub availability_zones: Vec<String>,
696}
697
698impl AlterCompatible for AwsPrivatelinkConnection {
699    fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
700        // Every element of the AwsPrivatelinkConnection connection is configurable.
701        Ok(())
702    }
703}
704
705#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
706pub struct KafkaTlsConfig {
707    pub identity: Option<TlsIdentity>,
708    pub root_cert: Option<StringOrSecret>,
709}
710
711#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
712pub struct KafkaSaslConfig<C: ConnectionAccess = InlinedConnection> {
713    pub mechanism: String,
714    pub username: StringOrSecret,
715    pub password: Option<CatalogItemId>,
716    pub aws: Option<AwsConnectionReference<C>>,
717}
718
719impl<R: ConnectionResolver> IntoInlineConnection<KafkaSaslConfig, R>
720    for KafkaSaslConfig<ReferencedConnection>
721{
722    fn into_inline_connection(self, r: R) -> KafkaSaslConfig {
723        KafkaSaslConfig {
724            mechanism: self.mechanism,
725            username: self.username,
726            password: self.password,
727            aws: self.aws.map(|aws| aws.into_inline_connection(&r)),
728        }
729    }
730}
731
732/// Specifies a Kafka broker in a [`KafkaConnection`].
733#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
734pub struct KafkaBroker<C: ConnectionAccess = InlinedConnection> {
735    /// The address of the Kafka broker.
736    pub address: String,
737    /// An optional tunnel to use when connecting to the broker.
738    pub tunnel: Tunnel<C>,
739}
740
741impl<R: ConnectionResolver> IntoInlineConnection<KafkaBroker, R>
742    for KafkaBroker<ReferencedConnection>
743{
744    fn into_inline_connection(self, r: R) -> KafkaBroker {
745        let KafkaBroker { address, tunnel } = self;
746        KafkaBroker {
747            address,
748            tunnel: tunnel.into_inline_connection(r),
749        }
750    }
751}
752
753#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Default)]
754pub struct KafkaTopicOptions {
755    /// The replication factor for the topic.
756    /// If `None`, the broker default will be used.
757    pub replication_factor: Option<NonNeg<i32>>,
758    /// The number of partitions to create.
759    /// If `None`, the broker default will be used.
760    pub partition_count: Option<NonNeg<i32>>,
761    /// The initial configuration parameters for the topic.
762    pub topic_config: BTreeMap<String, String>,
763}
764
765#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
766pub struct KafkaConnection<C: ConnectionAccess = InlinedConnection> {
767    pub brokers: Vec<KafkaBroker<C>>,
768    /// A tunnel through which to route traffic,
769    /// that can be overridden for individual brokers
770    /// in `brokers`.
771    pub default_tunnel: Tunnel<C>,
772    pub progress_topic: Option<String>,
773    pub progress_topic_options: KafkaTopicOptions,
774    pub options: BTreeMap<String, StringOrSecret>,
775    pub tls: Option<KafkaTlsConfig>,
776    pub sasl: Option<KafkaSaslConfig<C>>,
777}
778
779impl<R: ConnectionResolver> IntoInlineConnection<KafkaConnection, R>
780    for KafkaConnection<ReferencedConnection>
781{
782    fn into_inline_connection(self, r: R) -> KafkaConnection {
783        let KafkaConnection {
784            brokers,
785            progress_topic,
786            progress_topic_options,
787            default_tunnel,
788            options,
789            tls,
790            sasl,
791        } = self;
792
793        let brokers = brokers
794            .into_iter()
795            .map(|broker| broker.into_inline_connection(&r))
796            .collect();
797
798        KafkaConnection {
799            brokers,
800            progress_topic,
801            progress_topic_options,
802            default_tunnel: default_tunnel.into_inline_connection(&r),
803            options,
804            tls,
805            sasl: sasl.map(|sasl| sasl.into_inline_connection(&r)),
806        }
807    }
808}
809
810impl<C: ConnectionAccess> KafkaConnection<C> {
811    /// Returns the name of the progress topic to use for the connection.
812    ///
813    /// The caller is responsible for providing the connection ID as it is not
814    /// known to `KafkaConnection`.
815    pub fn progress_topic(
816        &self,
817        connection_context: &ConnectionContext,
818        connection_id: CatalogItemId,
819    ) -> Cow<'_, str> {
820        if let Some(progress_topic) = &self.progress_topic {
821            Cow::Borrowed(progress_topic)
822        } else {
823            Cow::Owned(format!(
824                "_materialize-progress-{}-{}",
825                connection_context.environment_id, connection_id,
826            ))
827        }
828    }
829
830    fn validate_by_default(&self) -> bool {
831        true
832    }
833}
834
835impl KafkaConnection {
836    /// Generates a string that can be used as the base for a configuration ID
837    /// (e.g., `client.id`, `group.id`, `transactional.id`) for a Kafka source
838    /// or sink.
839    pub fn id_base(
840        connection_context: &ConnectionContext,
841        connection_id: CatalogItemId,
842        object_id: GlobalId,
843    ) -> String {
844        format!(
845            "materialize-{}-{}-{}",
846            connection_context.environment_id, connection_id, object_id,
847        )
848    }
849
850    /// Enriches the provided `client_id` according to any enrichment rules in
851    /// the `kafka_client_id_enrichment_rules` configuration parameter.
852    pub fn enrich_client_id(&self, configs: &ConfigSet, client_id: &mut String) {
853        #[derive(Debug, Deserialize)]
854        struct EnrichmentRule {
855            #[serde(deserialize_with = "deserialize_regex")]
856            pattern: Regex,
857            payload: String,
858        }
859
860        fn deserialize_regex<'de, D>(deserializer: D) -> Result<Regex, D::Error>
861        where
862            D: Deserializer<'de>,
863        {
864            let buf = String::deserialize(deserializer)?;
865            Regex::new(&buf).map_err(serde::de::Error::custom)
866        }
867
868        let rules = KAFKA_CLIENT_ID_ENRICHMENT_RULES.get(configs);
869        let rules = match serde_json::from_value::<Vec<EnrichmentRule>>(rules) {
870            Ok(rules) => rules,
871            Err(e) => {
872                warn!(%e, "failed to decode kafka_client_id_enrichment_rules");
873                return;
874            }
875        };
876
877        // Check every rule against every broker. Rules are matched in the order
878        // that they are specified. It is usually a configuration error if
879        // multiple rules match the same list of Kafka brokers, but we
880        // nonetheless want to provide well defined semantics.
881        debug!(?self.brokers, "evaluating client ID enrichment rules");
882        for rule in rules {
883            let is_match = self
884                .brokers
885                .iter()
886                .any(|b| rule.pattern.is_match(&b.address));
887            debug!(?rule, is_match, "evaluated client ID enrichment rule");
888            if is_match {
889                client_id.push('-');
890                client_id.push_str(&rule.payload);
891            }
892        }
893    }
894
895    /// Creates a Kafka client for the connection.
896    pub async fn create_with_context<C, T>(
897        &self,
898        storage_configuration: &StorageConfiguration,
899        context: C,
900        extra_options: &BTreeMap<&str, String>,
901        in_task: InTask,
902    ) -> Result<T, ContextCreationError>
903    where
904        C: ClientContext,
905        T: FromClientConfigAndContext<TunnelingClientContext<C>>,
906    {
907        let mut options = self.options.clone();
908
909        // Ensure that Kafka topics are *not* automatically created when
910        // consuming, producing, or fetching metadata for a topic. This ensures
911        // that we don't accidentally create topics with the wrong number of
912        // partitions.
913        options.insert("allow.auto.create.topics".into(), "false".into());
914
915        let brokers = match &self.default_tunnel {
916            Tunnel::AwsPrivatelink(t) => {
917                assert!(&self.brokers.is_empty());
918
919                let algo = KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM
920                    .get(storage_configuration.config_set());
921                options.insert("ssl.endpoint.identification.algorithm".into(), algo.into());
922
923                // When using a default privatelink tunnel broker/brokers cannot be specified
924                // instead the tunnel connection_id and port are used for the initial connection.
925                format!(
926                    "{}:{}",
927                    vpc_endpoint_host(
928                        t.connection_id,
929                        None, // Default tunnel does not support availability zones.
930                    ),
931                    t.port.unwrap_or(9092)
932                )
933            }
934            _ => self.brokers.iter().map(|b| &b.address).join(","),
935        };
936        options.insert("bootstrap.servers".into(), brokers.into());
937        let security_protocol = match (self.tls.is_some(), self.sasl.is_some()) {
938            (false, false) => "PLAINTEXT",
939            (true, false) => "SSL",
940            (false, true) => "SASL_PLAINTEXT",
941            (true, true) => "SASL_SSL",
942        };
943        options.insert("security.protocol".into(), security_protocol.into());
944        if let Some(tls) = &self.tls {
945            if let Some(root_cert) = &tls.root_cert {
946                options.insert("ssl.ca.pem".into(), root_cert.clone());
947            }
948            if let Some(identity) = &tls.identity {
949                options.insert("ssl.key.pem".into(), StringOrSecret::Secret(identity.key));
950                options.insert("ssl.certificate.pem".into(), identity.cert.clone());
951            }
952        }
953        if let Some(sasl) = &self.sasl {
954            options.insert("sasl.mechanisms".into(), (&sasl.mechanism).into());
955            options.insert("sasl.username".into(), sasl.username.clone());
956            if let Some(password) = sasl.password {
957                options.insert("sasl.password".into(), StringOrSecret::Secret(password));
958            }
959        }
960
961        options.insert(
962            "retry.backoff.ms".into(),
963            KAFKA_RETRY_BACKOFF
964                .get(storage_configuration.config_set())
965                .as_millis()
966                .into(),
967        );
968        options.insert(
969            "retry.backoff.max.ms".into(),
970            KAFKA_RETRY_BACKOFF_MAX
971                .get(storage_configuration.config_set())
972                .as_millis()
973                .into(),
974        );
975        options.insert(
976            "reconnect.backoff.ms".into(),
977            KAFKA_RECONNECT_BACKOFF
978                .get(storage_configuration.config_set())
979                .as_millis()
980                .into(),
981        );
982        options.insert(
983            "reconnect.backoff.max.ms".into(),
984            KAFKA_RECONNECT_BACKOFF_MAX
985                .get(storage_configuration.config_set())
986                .as_millis()
987                .into(),
988        );
989
990        let mut config = mz_kafka_util::client::create_new_client_config(
991            storage_configuration
992                .connection_context
993                .librdkafka_log_level,
994            storage_configuration.parameters.kafka_timeout_config,
995        );
996        for (k, v) in options {
997            config.set(
998                k,
999                v.get_string(
1000                    in_task,
1001                    &storage_configuration.connection_context.secrets_reader,
1002                )
1003                .await
1004                .context("reading kafka secret")?,
1005            );
1006        }
1007        for (k, v) in extra_options {
1008            config.set(*k, v);
1009        }
1010
1011        let aws_config = match self.sasl.as_ref().and_then(|sasl| sasl.aws.as_ref()) {
1012            None => None,
1013            Some(aws) => Some(
1014                aws.connection
1015                    .load_sdk_config(
1016                        &storage_configuration.connection_context,
1017                        aws.connection_id,
1018                        in_task,
1019                    )
1020                    .await?,
1021            ),
1022        };
1023
1024        // TODO(roshan): Implement enforcement of external address validation once
1025        // rdkafka client has been updated to support providing multiple resolved
1026        // addresses for brokers
1027        let mut context = TunnelingClientContext::new(
1028            context,
1029            Handle::current(),
1030            storage_configuration
1031                .connection_context
1032                .ssh_tunnel_manager
1033                .clone(),
1034            storage_configuration.parameters.ssh_timeout_config,
1035            aws_config,
1036            in_task,
1037        );
1038
1039        match &self.default_tunnel {
1040            Tunnel::Direct => {
1041                // By default, don't offer a default override for broker address lookup.
1042            }
1043            Tunnel::AwsPrivatelink(pl) => {
1044                context.set_default_tunnel(TunnelConfig::StaticHost(vpc_endpoint_host(
1045                    pl.connection_id,
1046                    None, // Default tunnel does not support availability zones.
1047                )));
1048            }
1049            Tunnel::Ssh(ssh_tunnel) => {
1050                let secret = storage_configuration
1051                    .connection_context
1052                    .secrets_reader
1053                    .read_in_task_if(in_task, ssh_tunnel.connection_id)
1054                    .await?;
1055                let key_pair = SshKeyPair::from_bytes(&secret)?;
1056
1057                // Ensure any ssh-bastion address we connect to is resolved to an external address.
1058                let resolved = resolve_address(
1059                    &ssh_tunnel.connection.host,
1060                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1061                )
1062                .await?;
1063                context.set_default_tunnel(TunnelConfig::Ssh(SshTunnelConfig {
1064                    host: resolved
1065                        .iter()
1066                        .map(|a| a.to_string())
1067                        .collect::<BTreeSet<_>>(),
1068                    port: ssh_tunnel.connection.port,
1069                    user: ssh_tunnel.connection.user.clone(),
1070                    key_pair,
1071                }));
1072            }
1073        }
1074
1075        for broker in &self.brokers {
1076            let mut addr_parts = broker.address.splitn(2, ':');
1077            let addr = BrokerAddr {
1078                host: addr_parts
1079                    .next()
1080                    .context("BROKER is not address:port")?
1081                    .into(),
1082                port: addr_parts
1083                    .next()
1084                    .unwrap_or("9092")
1085                    .parse()
1086                    .context("parsing BROKER port")?,
1087            };
1088            match &broker.tunnel {
1089                Tunnel::Direct => {
1090                    // By default, don't override broker address lookup.
1091                    //
1092                    // N.B.
1093                    //
1094                    // We _could_ pre-setup the default ssh tunnel for all known brokers here, but
1095                    // we avoid doing because:
1096                    // - Its not necessary.
1097                    // - Not doing so makes it easier to test the `FailedDefaultSshTunnel` path
1098                    // in the `TunnelingClientContext`.
1099                }
1100                Tunnel::AwsPrivatelink(aws_privatelink) => {
1101                    let host = mz_cloud_resources::vpc_endpoint_host(
1102                        aws_privatelink.connection_id,
1103                        aws_privatelink.availability_zone.as_deref(),
1104                    );
1105                    let port = aws_privatelink.port;
1106                    context.add_broker_rewrite(
1107                        addr,
1108                        BrokerRewrite {
1109                            host: host.clone(),
1110                            port,
1111                        },
1112                    );
1113                }
1114                Tunnel::Ssh(ssh_tunnel) => {
1115                    // Ensure any SSH bastion address we connect to is resolved to an external address.
1116                    let ssh_host_resolved = resolve_address(
1117                        &ssh_tunnel.connection.host,
1118                        ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1119                    )
1120                    .await?;
1121                    context
1122                        .add_ssh_tunnel(
1123                            addr,
1124                            SshTunnelConfig {
1125                                host: ssh_host_resolved
1126                                    .iter()
1127                                    .map(|a| a.to_string())
1128                                    .collect::<BTreeSet<_>>(),
1129                                port: ssh_tunnel.connection.port,
1130                                user: ssh_tunnel.connection.user.clone(),
1131                                key_pair: SshKeyPair::from_bytes(
1132                                    &storage_configuration
1133                                        .connection_context
1134                                        .secrets_reader
1135                                        .read_in_task_if(in_task, ssh_tunnel.connection_id)
1136                                        .await?,
1137                                )?,
1138                            },
1139                        )
1140                        .await
1141                        .map_err(ContextCreationError::Ssh)?;
1142                }
1143            }
1144        }
1145
1146        Ok(config.create_with_context(context)?)
1147    }
1148
1149    async fn validate(
1150        &self,
1151        _id: CatalogItemId,
1152        storage_configuration: &StorageConfiguration,
1153    ) -> Result<(), anyhow::Error> {
1154        let (context, error_rx) = MzClientContext::with_errors();
1155        let consumer: BaseConsumer<_> = self
1156            .create_with_context(
1157                storage_configuration,
1158                context,
1159                &BTreeMap::new(),
1160                // We are in a normal tokio context during validation, already.
1161                InTask::No,
1162            )
1163            .await?;
1164        let consumer = Arc::new(consumer);
1165
1166        let timeout = storage_configuration
1167            .parameters
1168            .kafka_timeout_config
1169            .fetch_metadata_timeout;
1170
1171        // librdkafka doesn't expose an API for determining whether a connection to
1172        // the Kafka cluster has been successfully established. So we make a
1173        // metadata request, though we don't care about the results, so that we can
1174        // report any errors making that request. If the request succeeds, we know
1175        // we were able to contact at least one broker, and that's a good proxy for
1176        // being able to contact all the brokers in the cluster.
1177        //
1178        // The downside of this approach is it produces a generic error message like
1179        // "metadata fetch error" with no additional details. The real networking
1180        // error is buried in the librdkafka logs, which are not visible to users.
1181        let result = mz_ore::task::spawn_blocking(|| "kafka_get_metadata", {
1182            let consumer = Arc::clone(&consumer);
1183            move || consumer.fetch_metadata(None, timeout)
1184        })
1185        .await;
1186        match result {
1187            Ok(_) => Ok(()),
1188            // The error returned by `fetch_metadata` does not provide any details which makes for
1189            // a crappy user facing error message. For this reason we attempt to grab a better
1190            // error message from the client context, which should contain any error logs emitted
1191            // by librdkafka, and fallback to the generic error if there is nothing there.
1192            Err(err) => {
1193                // Multiple errors might have been logged during this validation but some are more
1194                // relevant than others. Specifically, we prefer non-internal errors over internal
1195                // errors since those give much more useful information to the users.
1196                let main_err = error_rx.try_iter().reduce(|cur, new| match cur {
1197                    MzKafkaError::Internal(_) => new,
1198                    _ => cur,
1199                });
1200
1201                // Don't drop the consumer until after we've drained the errors
1202                // channel. Dropping the consumer can introduce spurious errors.
1203                // See database-issues#7432.
1204                drop(consumer);
1205
1206                match main_err {
1207                    Some(err) => Err(err.into()),
1208                    None => Err(err.into()),
1209                }
1210            }
1211        }
1212    }
1213}
1214
1215impl<C: ConnectionAccess> AlterCompatible for KafkaConnection<C> {
1216    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1217        let KafkaConnection {
1218            brokers: _,
1219            default_tunnel: _,
1220            progress_topic,
1221            progress_topic_options,
1222            options: _,
1223            tls: _,
1224            sasl: _,
1225        } = self;
1226
1227        let compatibility_checks = [
1228            (progress_topic == &other.progress_topic, "progress_topic"),
1229            (
1230                progress_topic_options == &other.progress_topic_options,
1231                "progress_topic_options",
1232            ),
1233        ];
1234
1235        for (compatible, field) in compatibility_checks {
1236            if !compatible {
1237                tracing::warn!(
1238                    "KafkaConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1239                    self,
1240                    other
1241                );
1242
1243                return Err(AlterError { id });
1244            }
1245        }
1246
1247        Ok(())
1248    }
1249}
1250
1251/// A connection to a Confluent Schema Registry.
1252#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1253pub struct CsrConnection<C: ConnectionAccess = InlinedConnection> {
1254    /// The URL of the schema registry.
1255    pub url: Url,
1256    /// Trusted root TLS certificate in PEM format.
1257    pub tls_root_cert: Option<StringOrSecret>,
1258    /// An optional TLS client certificate for authentication with the schema
1259    /// registry.
1260    pub tls_identity: Option<TlsIdentity>,
1261    /// Optional HTTP authentication credentials for the schema registry.
1262    pub http_auth: Option<CsrConnectionHttpAuth>,
1263    /// A tunnel through which to route traffic.
1264    pub tunnel: Tunnel<C>,
1265}
1266
1267impl<R: ConnectionResolver> IntoInlineConnection<CsrConnection, R>
1268    for CsrConnection<ReferencedConnection>
1269{
1270    fn into_inline_connection(self, r: R) -> CsrConnection {
1271        let CsrConnection {
1272            url,
1273            tls_root_cert,
1274            tls_identity,
1275            http_auth,
1276            tunnel,
1277        } = self;
1278        CsrConnection {
1279            url,
1280            tls_root_cert,
1281            tls_identity,
1282            http_auth,
1283            tunnel: tunnel.into_inline_connection(r),
1284        }
1285    }
1286}
1287
1288impl<C: ConnectionAccess> CsrConnection<C> {
1289    fn validate_by_default(&self) -> bool {
1290        true
1291    }
1292}
1293
1294impl CsrConnection {
1295    /// Constructs a schema registry client from the connection.
1296    pub async fn connect(
1297        &self,
1298        storage_configuration: &StorageConfiguration,
1299        in_task: InTask,
1300    ) -> Result<mz_ccsr::Client, CsrConnectError> {
1301        let mut client_config = mz_ccsr::ClientConfig::new(self.url.clone());
1302        if let Some(root_cert) = &self.tls_root_cert {
1303            let root_cert = root_cert
1304                .get_string(
1305                    in_task,
1306                    &storage_configuration.connection_context.secrets_reader,
1307                )
1308                .await?;
1309            let root_cert = Certificate::from_pem(root_cert.as_bytes())?;
1310            client_config = client_config.add_root_certificate(root_cert);
1311        }
1312
1313        if let Some(tls_identity) = &self.tls_identity {
1314            let key = &storage_configuration
1315                .connection_context
1316                .secrets_reader
1317                .read_string_in_task_if(in_task, tls_identity.key)
1318                .await?;
1319            let cert = tls_identity
1320                .cert
1321                .get_string(
1322                    in_task,
1323                    &storage_configuration.connection_context.secrets_reader,
1324                )
1325                .await?;
1326            let ident = Identity::from_pem(key.as_bytes(), cert.as_bytes())?;
1327            client_config = client_config.identity(ident);
1328        }
1329
1330        if let Some(http_auth) = &self.http_auth {
1331            let username = http_auth
1332                .username
1333                .get_string(
1334                    in_task,
1335                    &storage_configuration.connection_context.secrets_reader,
1336                )
1337                .await?;
1338            let password = match http_auth.password {
1339                None => None,
1340                Some(password) => Some(
1341                    storage_configuration
1342                        .connection_context
1343                        .secrets_reader
1344                        .read_string_in_task_if(in_task, password)
1345                        .await?,
1346                ),
1347            };
1348            client_config = client_config.auth(username, password);
1349        }
1350
1351        // TODO: use types to enforce that the URL has a string hostname.
1352        let host = self
1353            .url
1354            .host_str()
1355            .ok_or_else(|| anyhow!("url missing host"))?;
1356        match &self.tunnel {
1357            Tunnel::Direct => {
1358                // Ensure any host we connect to is resolved to an external address.
1359                let resolved = resolve_address(
1360                    host,
1361                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1362                )
1363                .await?;
1364                client_config = client_config.resolve_to_addrs(
1365                    host,
1366                    &resolved
1367                        .iter()
1368                        .map(|addr| SocketAddr::new(*addr, 0))
1369                        .collect::<Vec<_>>(),
1370                )
1371            }
1372            Tunnel::Ssh(ssh_tunnel) => {
1373                let ssh_tunnel = ssh_tunnel
1374                    .connect(
1375                        storage_configuration,
1376                        host,
1377                        // Default to the default http port, but this
1378                        // could default to 8081...
1379                        self.url.port().unwrap_or(80),
1380                        in_task,
1381                    )
1382                    .await
1383                    .map_err(CsrConnectError::Ssh)?;
1384
1385                // Carefully inject the SSH tunnel into the client
1386                // configuration. This is delicate because we need TLS
1387                // verification to continue to use the remote hostname rather
1388                // than the tunnel hostname.
1389
1390                client_config = client_config
1391                    // `resolve_to_addrs` allows us to rewrite the hostname
1392                    // at the DNS level, which means the TCP connection is
1393                    // correctly routed through the tunnel, but TLS verification
1394                    // is still performed against the remote hostname.
1395                    // Unfortunately the port here is ignored if the URL also
1396                    // specifies a port...
1397                    .resolve_to_addrs(host, &[SocketAddr::new(ssh_tunnel.local_addr().ip(), 0)])
1398                    // ...so we also dynamically rewrite the URL to use the
1399                    // current port for the SSH tunnel.
1400                    //
1401                    // WARNING: this is brittle, because we only dynamically
1402                    // update the client configuration with the tunnel *port*,
1403                    // and not the hostname This works fine in practice, because
1404                    // only the SSH tunnel port will change if the tunnel fails
1405                    // and has to be restarted (the hostname is always
1406                    // 127.0.0.1)--but this is an an implementation detail of
1407                    // the SSH tunnel code that we're relying on.
1408                    .dynamic_url({
1409                        let remote_url = self.url.clone();
1410                        move || {
1411                            let mut url = remote_url.clone();
1412                            url.set_port(Some(ssh_tunnel.local_addr().port()))
1413                                .expect("cannot fail");
1414                            url
1415                        }
1416                    });
1417            }
1418            Tunnel::AwsPrivatelink(connection) => {
1419                assert_none!(connection.port);
1420
1421                let privatelink_host = mz_cloud_resources::vpc_endpoint_host(
1422                    connection.connection_id,
1423                    connection.availability_zone.as_deref(),
1424                );
1425                let addrs: Vec<_> = net::lookup_host((privatelink_host, 0))
1426                    .await
1427                    .context("resolving PrivateLink host")?
1428                    .collect();
1429                client_config = client_config.resolve_to_addrs(host, &addrs)
1430            }
1431        }
1432
1433        Ok(client_config.build()?)
1434    }
1435
1436    async fn validate(
1437        &self,
1438        _id: CatalogItemId,
1439        storage_configuration: &StorageConfiguration,
1440    ) -> Result<(), anyhow::Error> {
1441        let client = self
1442            .connect(
1443                storage_configuration,
1444                // We are in a normal tokio context during validation, already.
1445                InTask::No,
1446            )
1447            .await?;
1448        client.list_subjects().await?;
1449        Ok(())
1450    }
1451}
1452
1453impl<C: ConnectionAccess> AlterCompatible for CsrConnection<C> {
1454    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1455        let CsrConnection {
1456            tunnel,
1457            // All non-tunnel fields may change
1458            url: _,
1459            tls_root_cert: _,
1460            tls_identity: _,
1461            http_auth: _,
1462        } = self;
1463
1464        let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
1465
1466        for (compatible, field) in compatibility_checks {
1467            if !compatible {
1468                tracing::warn!(
1469                    "CsrConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1470                    self,
1471                    other
1472                );
1473
1474                return Err(AlterError { id });
1475            }
1476        }
1477        Ok(())
1478    }
1479}
1480
1481/// A TLS key pair used for client identity.
1482#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1483pub struct TlsIdentity {
1484    /// The client's TLS public certificate in PEM format.
1485    pub cert: StringOrSecret,
1486    /// The ID of the secret containing the client's TLS private key in PEM
1487    /// format.
1488    pub key: CatalogItemId,
1489}
1490
1491/// HTTP authentication credentials in a [`CsrConnection`].
1492#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1493pub struct CsrConnectionHttpAuth {
1494    /// The username.
1495    pub username: StringOrSecret,
1496    /// The ID of the secret containing the password, if any.
1497    pub password: Option<CatalogItemId>,
1498}
1499
1500/// A connection to a PostgreSQL server.
1501#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1502pub struct PostgresConnection<C: ConnectionAccess = InlinedConnection> {
1503    /// The hostname of the server.
1504    pub host: String,
1505    /// The port of the server.
1506    pub port: u16,
1507    /// The name of the database to connect to.
1508    pub database: String,
1509    /// The username to authenticate as.
1510    pub user: StringOrSecret,
1511    /// An optional password for authentication.
1512    pub password: Option<CatalogItemId>,
1513    /// A tunnel through which to route traffic.
1514    pub tunnel: Tunnel<C>,
1515    /// Whether to use TLS for encryption, authentication, or both.
1516    pub tls_mode: SslMode,
1517    /// An optional root TLS certificate in PEM format, to verify the server's
1518    /// identity.
1519    pub tls_root_cert: Option<StringOrSecret>,
1520    /// An optional TLS client certificate for authentication.
1521    pub tls_identity: Option<TlsIdentity>,
1522}
1523
1524impl<R: ConnectionResolver> IntoInlineConnection<PostgresConnection, R>
1525    for PostgresConnection<ReferencedConnection>
1526{
1527    fn into_inline_connection(self, r: R) -> PostgresConnection {
1528        let PostgresConnection {
1529            host,
1530            port,
1531            database,
1532            user,
1533            password,
1534            tunnel,
1535            tls_mode,
1536            tls_root_cert,
1537            tls_identity,
1538        } = self;
1539
1540        PostgresConnection {
1541            host,
1542            port,
1543            database,
1544            user,
1545            password,
1546            tunnel: tunnel.into_inline_connection(r),
1547            tls_mode,
1548            tls_root_cert,
1549            tls_identity,
1550        }
1551    }
1552}
1553
1554impl<C: ConnectionAccess> PostgresConnection<C> {
1555    fn validate_by_default(&self) -> bool {
1556        true
1557    }
1558}
1559
1560impl PostgresConnection<InlinedConnection> {
1561    pub async fn config(
1562        &self,
1563        secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
1564        storage_configuration: &StorageConfiguration,
1565        in_task: InTask,
1566    ) -> Result<mz_postgres_util::Config, anyhow::Error> {
1567        let params = &storage_configuration.parameters;
1568
1569        let mut config = tokio_postgres::Config::new();
1570        config
1571            .host(&self.host)
1572            .port(self.port)
1573            .dbname(&self.database)
1574            .user(&self.user.get_string(in_task, secrets_reader).await?)
1575            .ssl_mode(self.tls_mode);
1576        if let Some(password) = self.password {
1577            let password = secrets_reader
1578                .read_string_in_task_if(in_task, password)
1579                .await?;
1580            config.password(password);
1581        }
1582        if let Some(tls_root_cert) = &self.tls_root_cert {
1583            let tls_root_cert = tls_root_cert.get_string(in_task, secrets_reader).await?;
1584            config.ssl_root_cert(tls_root_cert.as_bytes());
1585        }
1586        if let Some(tls_identity) = &self.tls_identity {
1587            let cert = tls_identity
1588                .cert
1589                .get_string(in_task, secrets_reader)
1590                .await?;
1591            let key = secrets_reader
1592                .read_string_in_task_if(in_task, tls_identity.key)
1593                .await?;
1594            config.ssl_cert(cert.as_bytes()).ssl_key(key.as_bytes());
1595        }
1596
1597        if let Some(connect_timeout) = params.pg_source_connect_timeout {
1598            config.connect_timeout(connect_timeout);
1599        }
1600        if let Some(keepalives_retries) = params.pg_source_tcp_keepalives_retries {
1601            config.keepalives_retries(keepalives_retries);
1602        }
1603        if let Some(keepalives_idle) = params.pg_source_tcp_keepalives_idle {
1604            config.keepalives_idle(keepalives_idle);
1605        }
1606        if let Some(keepalives_interval) = params.pg_source_tcp_keepalives_interval {
1607            config.keepalives_interval(keepalives_interval);
1608        }
1609        if let Some(tcp_user_timeout) = params.pg_source_tcp_user_timeout {
1610            config.tcp_user_timeout(tcp_user_timeout);
1611        }
1612
1613        let mut options = vec![];
1614        if let Some(wal_sender_timeout) = params.pg_source_wal_sender_timeout {
1615            options.push(format!(
1616                "--wal_sender_timeout={}",
1617                wal_sender_timeout.as_millis()
1618            ));
1619        };
1620        if params.pg_source_tcp_configure_server {
1621            if let Some(keepalives_retries) = params.pg_source_tcp_keepalives_retries {
1622                options.push(format!("--tcp_keepalives_count={}", keepalives_retries));
1623            }
1624            if let Some(keepalives_idle) = params.pg_source_tcp_keepalives_idle {
1625                options.push(format!(
1626                    "--tcp_keepalives_idle={}",
1627                    keepalives_idle.as_secs()
1628                ));
1629            }
1630            if let Some(keepalives_interval) = params.pg_source_tcp_keepalives_interval {
1631                options.push(format!(
1632                    "--tcp_keepalives_interval={}",
1633                    keepalives_interval.as_secs()
1634                ));
1635            }
1636            if let Some(tcp_user_timeout) = params.pg_source_tcp_user_timeout {
1637                options.push(format!(
1638                    "--tcp_user_timeout={}",
1639                    tcp_user_timeout.as_millis()
1640                ));
1641            }
1642        }
1643        config.options(options.join(" ").as_str());
1644
1645        let tunnel = match &self.tunnel {
1646            Tunnel::Direct => {
1647                // Ensure any host we connect to is resolved to an external address.
1648                let resolved = resolve_address(
1649                    &self.host,
1650                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1651                )
1652                .await?;
1653                mz_postgres_util::TunnelConfig::Direct {
1654                    resolved_ips: Some(resolved),
1655                }
1656            }
1657            Tunnel::Ssh(SshTunnel {
1658                connection_id,
1659                connection,
1660            }) => {
1661                let secret = secrets_reader
1662                    .read_in_task_if(in_task, *connection_id)
1663                    .await?;
1664                let key_pair = SshKeyPair::from_bytes(&secret)?;
1665                // Ensure any ssh-bastion host we connect to is resolved to an external address.
1666                let resolved = resolve_address(
1667                    &connection.host,
1668                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1669                )
1670                .await?;
1671                mz_postgres_util::TunnelConfig::Ssh {
1672                    config: SshTunnelConfig {
1673                        host: resolved
1674                            .iter()
1675                            .map(|a| a.to_string())
1676                            .collect::<BTreeSet<_>>(),
1677                        port: connection.port,
1678                        user: connection.user.clone(),
1679                        key_pair,
1680                    },
1681                }
1682            }
1683            Tunnel::AwsPrivatelink(connection) => {
1684                assert_none!(connection.port);
1685                mz_postgres_util::TunnelConfig::AwsPrivatelink {
1686                    connection_id: connection.connection_id,
1687                }
1688            }
1689        };
1690
1691        Ok(mz_postgres_util::Config::new(
1692            config,
1693            tunnel,
1694            params.ssh_timeout_config,
1695            in_task,
1696        )?)
1697    }
1698
1699    pub async fn validate(
1700        &self,
1701        _id: CatalogItemId,
1702        storage_configuration: &StorageConfiguration,
1703    ) -> Result<mz_postgres_util::Client, anyhow::Error> {
1704        let config = self
1705            .config(
1706                &storage_configuration.connection_context.secrets_reader,
1707                storage_configuration,
1708                // We are in a normal tokio context during validation, already.
1709                InTask::No,
1710            )
1711            .await?;
1712        let client = config
1713            .connect(
1714                "connection validation",
1715                &storage_configuration.connection_context.ssh_tunnel_manager,
1716            )
1717            .await?;
1718
1719        let wal_level = mz_postgres_util::get_wal_level(&client).await?;
1720
1721        if wal_level < mz_postgres_util::replication::WalLevel::Logical {
1722            Err(PostgresConnectionValidationError::InsufficientWalLevel { wal_level })?;
1723        }
1724
1725        let max_wal_senders = mz_postgres_util::get_max_wal_senders(&client).await?;
1726
1727        if max_wal_senders < 1 {
1728            Err(PostgresConnectionValidationError::ReplicationDisabled)?;
1729        }
1730
1731        let available_replication_slots =
1732            mz_postgres_util::available_replication_slots(&client).await?;
1733
1734        // We need 1 replication slot for the snapshots and 1 for the continuing replication
1735        if available_replication_slots < 2 {
1736            Err(
1737                PostgresConnectionValidationError::InsufficientReplicationSlotsAvailable {
1738                    count: 2,
1739                },
1740            )?;
1741        }
1742
1743        Ok(client)
1744    }
1745}
1746
1747#[derive(Debug, Clone, thiserror::Error)]
1748pub enum PostgresConnectionValidationError {
1749    #[error("PostgreSQL server has insufficient number of replication slots available")]
1750    InsufficientReplicationSlotsAvailable { count: usize },
1751    #[error("server must have wal_level >= logical, but has {wal_level}")]
1752    InsufficientWalLevel {
1753        wal_level: mz_postgres_util::replication::WalLevel,
1754    },
1755    #[error("replication disabled on server")]
1756    ReplicationDisabled,
1757}
1758
1759impl PostgresConnectionValidationError {
1760    pub fn detail(&self) -> Option<String> {
1761        match self {
1762            Self::InsufficientReplicationSlotsAvailable { count } => Some(format!(
1763                "executing this statement requires {} replication slot{}",
1764                count,
1765                if *count == 1 { "" } else { "s" }
1766            )),
1767            _ => None,
1768        }
1769    }
1770
1771    pub fn hint(&self) -> Option<String> {
1772        match self {
1773            Self::InsufficientReplicationSlotsAvailable { .. } => Some(
1774                "you might be able to wait for other sources to finish snapshotting and try again"
1775                    .into(),
1776            ),
1777            Self::ReplicationDisabled => Some("set max_wal_senders to a value > 0".into()),
1778            _ => None,
1779        }
1780    }
1781}
1782
1783impl<C: ConnectionAccess> AlterCompatible for PostgresConnection<C> {
1784    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1785        let PostgresConnection {
1786            tunnel,
1787            // All non-tunnel options may change arbitrarily
1788            host: _,
1789            port: _,
1790            database: _,
1791            user: _,
1792            password: _,
1793            tls_mode: _,
1794            tls_root_cert: _,
1795            tls_identity: _,
1796        } = self;
1797
1798        let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
1799
1800        for (compatible, field) in compatibility_checks {
1801            if !compatible {
1802                tracing::warn!(
1803                    "PostgresConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1804                    self,
1805                    other
1806                );
1807
1808                return Err(AlterError { id });
1809            }
1810        }
1811        Ok(())
1812    }
1813}
1814
1815/// Specifies how to tunnel a connection.
1816#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1817pub enum Tunnel<C: ConnectionAccess = InlinedConnection> {
1818    /// No tunneling.
1819    Direct,
1820    /// Via the specified SSH tunnel connection.
1821    Ssh(SshTunnel<C>),
1822    /// Via the specified AWS PrivateLink connection.
1823    AwsPrivatelink(AwsPrivatelink),
1824}
1825
1826impl<R: ConnectionResolver> IntoInlineConnection<Tunnel, R> for Tunnel<ReferencedConnection> {
1827    fn into_inline_connection(self, r: R) -> Tunnel {
1828        match self {
1829            Tunnel::Direct => Tunnel::Direct,
1830            Tunnel::Ssh(ssh) => Tunnel::Ssh(ssh.into_inline_connection(r)),
1831            Tunnel::AwsPrivatelink(awspl) => Tunnel::AwsPrivatelink(awspl),
1832        }
1833    }
1834}
1835
1836impl<C: ConnectionAccess> AlterCompatible for Tunnel<C> {
1837    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1838        let compatible = match (self, other) {
1839            (Self::Ssh(s), Self::Ssh(o)) => s.alter_compatible(id, o).is_ok(),
1840            (s, o) => s == o,
1841        };
1842
1843        if !compatible {
1844            tracing::warn!(
1845                "Tunnel incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
1846                self,
1847                other
1848            );
1849
1850            return Err(AlterError { id });
1851        }
1852
1853        Ok(())
1854    }
1855}
1856
1857/// Specifies which MySQL SSL Mode to use:
1858/// <https://dev.mysql.com/doc/refman/8.0/en/connection-options.html#option_general_ssl-mode>
1859/// This is not available as an enum in the mysql-async crate, so we define our own.
1860#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1861pub enum MySqlSslMode {
1862    Disabled,
1863    Required,
1864    VerifyCa,
1865    VerifyIdentity,
1866}
1867
1868/// A connection to a MySQL server.
1869#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1870pub struct MySqlConnection<C: ConnectionAccess = InlinedConnection> {
1871    /// The hostname of the server.
1872    pub host: String,
1873    /// The port of the server.
1874    pub port: u16,
1875    /// The username to authenticate as.
1876    pub user: StringOrSecret,
1877    /// An optional password for authentication.
1878    pub password: Option<CatalogItemId>,
1879    /// A tunnel through which to route traffic.
1880    pub tunnel: Tunnel<C>,
1881    /// Whether to use TLS for encryption, verify the server's certificate, and identity.
1882    pub tls_mode: MySqlSslMode,
1883    /// An optional root TLS certificate in PEM format, to verify the server's
1884    /// identity.
1885    pub tls_root_cert: Option<StringOrSecret>,
1886    /// An optional TLS client certificate for authentication.
1887    pub tls_identity: Option<TlsIdentity>,
1888    /// Reference to the AWS connection information to be used for IAM authenitcation and
1889    /// assuming AWS roles.
1890    pub aws_connection: Option<AwsConnectionReference<C>>,
1891}
1892
1893impl<R: ConnectionResolver> IntoInlineConnection<MySqlConnection, R>
1894    for MySqlConnection<ReferencedConnection>
1895{
1896    fn into_inline_connection(self, r: R) -> MySqlConnection {
1897        let MySqlConnection {
1898            host,
1899            port,
1900            user,
1901            password,
1902            tunnel,
1903            tls_mode,
1904            tls_root_cert,
1905            tls_identity,
1906            aws_connection,
1907        } = self;
1908
1909        MySqlConnection {
1910            host,
1911            port,
1912            user,
1913            password,
1914            tunnel: tunnel.into_inline_connection(&r),
1915            tls_mode,
1916            tls_root_cert,
1917            tls_identity,
1918            aws_connection: aws_connection.map(|aws| aws.into_inline_connection(&r)),
1919        }
1920    }
1921}
1922
1923impl<C: ConnectionAccess> MySqlConnection<C> {
1924    fn validate_by_default(&self) -> bool {
1925        true
1926    }
1927}
1928
1929impl MySqlConnection<InlinedConnection> {
1930    pub async fn config(
1931        &self,
1932        secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
1933        storage_configuration: &StorageConfiguration,
1934        in_task: InTask,
1935    ) -> Result<mz_mysql_util::Config, anyhow::Error> {
1936        // TODO(roshan): Set appropriate connection timeouts
1937        let mut opts = mysql_async::OptsBuilder::default()
1938            .ip_or_hostname(&self.host)
1939            .tcp_port(self.port)
1940            .user(Some(&self.user.get_string(in_task, secrets_reader).await?));
1941
1942        if let Some(password) = self.password {
1943            let password = secrets_reader
1944                .read_string_in_task_if(in_task, password)
1945                .await?;
1946            opts = opts.pass(Some(password));
1947        }
1948
1949        // Our `MySqlSslMode` enum matches the official MySQL Client `--ssl-mode` parameter values
1950        // which uses opt-in security features (SSL, CA verification, & Identity verification).
1951        // The mysql_async crate `SslOpts` struct uses an opt-out mechanism for each of these, so
1952        // we need to appropriately disable features to match the intent of each enum value.
1953        let mut ssl_opts = match self.tls_mode {
1954            MySqlSslMode::Disabled => None,
1955            MySqlSslMode::Required => Some(
1956                mysql_async::SslOpts::default()
1957                    .with_danger_accept_invalid_certs(true)
1958                    .with_danger_skip_domain_validation(true),
1959            ),
1960            MySqlSslMode::VerifyCa => {
1961                Some(mysql_async::SslOpts::default().with_danger_skip_domain_validation(true))
1962            }
1963            MySqlSslMode::VerifyIdentity => Some(mysql_async::SslOpts::default()),
1964        };
1965
1966        if matches!(
1967            self.tls_mode,
1968            MySqlSslMode::VerifyCa | MySqlSslMode::VerifyIdentity
1969        ) {
1970            if let Some(tls_root_cert) = &self.tls_root_cert {
1971                let tls_root_cert = tls_root_cert.get_string(in_task, secrets_reader).await?;
1972                ssl_opts = ssl_opts.map(|opts| {
1973                    opts.with_root_certs(vec![tls_root_cert.as_bytes().to_vec().into()])
1974                });
1975            }
1976        }
1977
1978        if let Some(identity) = &self.tls_identity {
1979            let key = secrets_reader
1980                .read_string_in_task_if(in_task, identity.key)
1981                .await?;
1982            let cert = identity.cert.get_string(in_task, secrets_reader).await?;
1983            let Pkcs12Archive { der, pass } =
1984                mz_tls_util::pkcs12der_from_pem(key.as_bytes(), cert.as_bytes())?;
1985
1986            // Add client identity to SSLOpts
1987            ssl_opts = ssl_opts.map(|opts| {
1988                opts.with_client_identity(Some(
1989                    mysql_async::ClientIdentity::new(der.into()).with_password(pass),
1990                ))
1991            });
1992        }
1993
1994        opts = opts.ssl_opts(ssl_opts);
1995
1996        let tunnel = match &self.tunnel {
1997            Tunnel::Direct => {
1998                // Ensure any host we connect to is resolved to an external address.
1999                let resolved = resolve_address(
2000                    &self.host,
2001                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2002                )
2003                .await?;
2004                mz_mysql_util::TunnelConfig::Direct {
2005                    resolved_ips: Some(resolved),
2006                }
2007            }
2008            Tunnel::Ssh(SshTunnel {
2009                connection_id,
2010                connection,
2011            }) => {
2012                let secret = secrets_reader
2013                    .read_in_task_if(in_task, *connection_id)
2014                    .await?;
2015                let key_pair = SshKeyPair::from_bytes(&secret)?;
2016                // Ensure any ssh-bastion host we connect to is resolved to an external address.
2017                let resolved = resolve_address(
2018                    &connection.host,
2019                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2020                )
2021                .await?;
2022                mz_mysql_util::TunnelConfig::Ssh {
2023                    config: SshTunnelConfig {
2024                        host: resolved
2025                            .iter()
2026                            .map(|a| a.to_string())
2027                            .collect::<BTreeSet<_>>(),
2028                        port: connection.port,
2029                        user: connection.user.clone(),
2030                        key_pair,
2031                    },
2032                }
2033            }
2034            Tunnel::AwsPrivatelink(connection) => {
2035                assert_none!(connection.port);
2036                mz_mysql_util::TunnelConfig::AwsPrivatelink {
2037                    connection_id: connection.connection_id,
2038                }
2039            }
2040        };
2041
2042        let aws_config = match self.aws_connection.as_ref() {
2043            None => None,
2044            Some(aws_ref) => Some(
2045                aws_ref
2046                    .connection
2047                    .load_sdk_config(
2048                        &storage_configuration.connection_context,
2049                        aws_ref.connection_id,
2050                        in_task,
2051                    )
2052                    .await?,
2053            ),
2054        };
2055
2056        Ok(mz_mysql_util::Config::new(
2057            opts,
2058            tunnel,
2059            storage_configuration.parameters.ssh_timeout_config,
2060            in_task,
2061            storage_configuration
2062                .parameters
2063                .mysql_source_timeouts
2064                .clone(),
2065            aws_config,
2066        )?)
2067    }
2068
2069    pub async fn validate(
2070        &self,
2071        _id: CatalogItemId,
2072        storage_configuration: &StorageConfiguration,
2073    ) -> Result<MySqlConn, MySqlConnectionValidationError> {
2074        let config = self
2075            .config(
2076                &storage_configuration.connection_context.secrets_reader,
2077                storage_configuration,
2078                // We are in a normal tokio context during validation, already.
2079                InTask::No,
2080            )
2081            .await?;
2082        let mut conn = config
2083            .connect(
2084                "connection validation",
2085                &storage_configuration.connection_context.ssh_tunnel_manager,
2086            )
2087            .await?;
2088
2089        // Check if the MySQL database is configured to allow row-based consistent GTID replication
2090        let mut setting_errors = vec![];
2091        let gtid_res = mz_mysql_util::ensure_gtid_consistency(&mut conn).await;
2092        let binlog_res = mz_mysql_util::ensure_full_row_binlog_format(&mut conn).await;
2093        let order_res = mz_mysql_util::ensure_replication_commit_order(&mut conn).await;
2094        for res in [gtid_res, binlog_res, order_res] {
2095            match res {
2096                Err(MySqlError::InvalidSystemSetting {
2097                    setting,
2098                    expected,
2099                    actual,
2100                }) => {
2101                    setting_errors.push((setting, expected, actual));
2102                }
2103                Err(err) => Err(err)?,
2104                Ok(()) => {}
2105            }
2106        }
2107        if !setting_errors.is_empty() {
2108            Err(MySqlConnectionValidationError::ReplicationSettingsError(
2109                setting_errors,
2110            ))?;
2111        }
2112
2113        Ok(conn)
2114    }
2115}
2116
2117#[derive(Debug, thiserror::Error)]
2118pub enum MySqlConnectionValidationError {
2119    #[error("Invalid MySQL system replication settings")]
2120    ReplicationSettingsError(Vec<(String, String, String)>),
2121    #[error(transparent)]
2122    Client(#[from] MySqlError),
2123    #[error("{}", .0.display_with_causes())]
2124    Other(#[from] anyhow::Error),
2125}
2126
2127impl MySqlConnectionValidationError {
2128    pub fn detail(&self) -> Option<String> {
2129        match self {
2130            Self::ReplicationSettingsError(settings) => Some(format!(
2131                "Invalid MySQL system replication settings: {}",
2132                itertools::join(
2133                    settings.iter().map(|(setting, expected, actual)| format!(
2134                        "{}: expected {}, got {}",
2135                        setting, expected, actual
2136                    )),
2137                    "; "
2138                )
2139            )),
2140            _ => None,
2141        }
2142    }
2143
2144    pub fn hint(&self) -> Option<String> {
2145        match self {
2146            Self::ReplicationSettingsError(_) => {
2147                Some("Set the necessary MySQL database system settings.".into())
2148            }
2149            _ => None,
2150        }
2151    }
2152}
2153
2154impl<C: ConnectionAccess> AlterCompatible for MySqlConnection<C> {
2155    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2156        let MySqlConnection {
2157            tunnel,
2158            // All non-tunnel options may change arbitrarily
2159            host: _,
2160            port: _,
2161            user: _,
2162            password: _,
2163            tls_mode: _,
2164            tls_root_cert: _,
2165            tls_identity: _,
2166            aws_connection: _,
2167        } = self;
2168
2169        let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
2170
2171        for (compatible, field) in compatibility_checks {
2172            if !compatible {
2173                tracing::warn!(
2174                    "MySqlConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2175                    self,
2176                    other
2177                );
2178
2179                return Err(AlterError { id });
2180            }
2181        }
2182        Ok(())
2183    }
2184}
2185
2186/// Details how to connect to an instance of Microsoft SQL Server.
2187///
2188/// For specifics of connecting to SQL Server for purposes of creating a
2189/// Materialize Source, see [`SqlServerSourceConnection`] which wraps this type.
2190///
2191/// [`SqlServerSourceConnection`]: crate::sources::SqlServerSourceConnection
2192#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2193pub struct SqlServerConnectionDetails<C: ConnectionAccess = InlinedConnection> {
2194    /// The hostname of the server.
2195    pub host: String,
2196    /// The port of the server.
2197    pub port: u16,
2198    /// Database we should connect to.
2199    pub database: String,
2200    /// The username to authenticate as.
2201    pub user: StringOrSecret,
2202    /// Password used for authentication.
2203    pub password: CatalogItemId,
2204    /// A tunnel through which to route traffic.
2205    pub tunnel: Tunnel<C>,
2206    /// Level of encryption to use for the connection.
2207    pub encryption: mz_sql_server_util::config::EncryptionLevel,
2208    /// Certificate validation policy
2209    pub certificate_validation_policy: mz_sql_server_util::config::CertificateValidationPolicy,
2210    /// TLS CA Certifiecate in PEM format
2211    pub tls_root_cert: Option<StringOrSecret>,
2212}
2213
2214impl<C: ConnectionAccess> SqlServerConnectionDetails<C> {
2215    fn validate_by_default(&self) -> bool {
2216        true
2217    }
2218}
2219
2220impl SqlServerConnectionDetails<InlinedConnection> {
2221    /// Attempts to open a connection to the upstream SQL Server instance.
2222    pub async fn validate(
2223        &self,
2224        _id: CatalogItemId,
2225        storage_configuration: &StorageConfiguration,
2226    ) -> Result<mz_sql_server_util::Client, anyhow::Error> {
2227        let config = self
2228            .resolve_config(
2229                &storage_configuration.connection_context.secrets_reader,
2230                storage_configuration,
2231                InTask::No,
2232            )
2233            .await?;
2234        tracing::debug!(?config, "Validating SQL Server connection");
2235
2236        let mut client = mz_sql_server_util::Client::connect(config).await?;
2237
2238        // Ensure the upstream SQL Server instance is configured to allow CDC.
2239        //
2240        // Run all of the checks necessary and collect the errors to provide the best
2241        // guidance as to which system settings need to be enabled.
2242        let mut replication_errors = vec![];
2243        for error in [
2244            mz_sql_server_util::inspect::ensure_database_cdc_enabled(&mut client).await,
2245            mz_sql_server_util::inspect::ensure_snapshot_isolation_enabled(&mut client).await,
2246            mz_sql_server_util::inspect::ensure_sql_server_agent_running(&mut client).await,
2247        ] {
2248            match error {
2249                Err(mz_sql_server_util::SqlServerError::InvalidSystemSetting {
2250                    name,
2251                    expected,
2252                    actual,
2253                }) => replication_errors.push((name, expected, actual)),
2254                Err(other) => Err(other)?,
2255                Ok(()) => (),
2256            }
2257        }
2258        if !replication_errors.is_empty() {
2259            Err(SqlServerConnectionValidationError::ReplicationSettingsError(replication_errors))?;
2260        }
2261
2262        Ok(client)
2263    }
2264
2265    /// Resolve all of the connection details (e.g. read from the [`SecretsReader`])
2266    /// so the returned [`Config`] can be used to open a connection with the
2267    /// upstream system.
2268    ///
2269    /// The provided [`InTask`] argument determines whether any I/O is run in an
2270    /// [`mz_ore::task`] (i.e. a different thread) or directly in the returned
2271    /// future. The main goal here is to prevent running I/O in timely threads.
2272    ///
2273    /// [`Config`]: mz_sql_server_util::Config
2274    pub async fn resolve_config(
2275        &self,
2276        secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
2277        storage_configuration: &StorageConfiguration,
2278        in_task: InTask,
2279    ) -> Result<mz_sql_server_util::Config, anyhow::Error> {
2280        let dyncfg = storage_configuration.config_set();
2281        let mut inner_config = tiberius::Config::new();
2282
2283        // Setup default connection params.
2284        inner_config.host(&self.host);
2285        inner_config.port(self.port);
2286        inner_config.database(self.database.clone());
2287        inner_config.encryption(self.encryption.into());
2288        match self.certificate_validation_policy {
2289            mz_sql_server_util::config::CertificateValidationPolicy::TrustAll => {
2290                inner_config.trust_cert()
2291            }
2292            mz_sql_server_util::config::CertificateValidationPolicy::VerifyCA => {
2293                inner_config.trust_cert_ca_pem(
2294                    self.tls_root_cert
2295                        .as_ref()
2296                        .unwrap()
2297                        .get_string(in_task, secrets_reader)
2298                        .await
2299                        .context("ca certificate")?,
2300                );
2301            }
2302            mz_sql_server_util::config::CertificateValidationPolicy::VerifySystem => (), // no-op
2303        }
2304
2305        inner_config.application_name("materialize");
2306
2307        // Read our auth settings from
2308        let user = self
2309            .user
2310            .get_string(in_task, secrets_reader)
2311            .await
2312            .context("username")?;
2313        let password = secrets_reader
2314            .read_string_in_task_if(in_task, self.password)
2315            .await
2316            .context("password")?;
2317        // TODO(sql_server3): Support other methods of authentication besides
2318        // username and password.
2319        inner_config.authentication(tiberius::AuthMethod::sql_server(user, password));
2320
2321        // Prevent users from probing our internal network ports by trying to
2322        // connect to localhost, or another non-external IP.
2323        let enfoce_external_addresses = ENFORCE_EXTERNAL_ADDRESSES.get(dyncfg);
2324
2325        let tunnel = match &self.tunnel {
2326            Tunnel::Direct => mz_sql_server_util::config::TunnelConfig::Direct,
2327            Tunnel::Ssh(SshTunnel {
2328                connection_id,
2329                connection: ssh_connection,
2330            }) => {
2331                let secret = secrets_reader
2332                    .read_in_task_if(in_task, *connection_id)
2333                    .await
2334                    .context("ssh secret")?;
2335                let key_pair = SshKeyPair::from_bytes(&secret).context("ssh key pair")?;
2336                // Ensure any SSH-bastion host we connect to is resolved to an
2337                // external address.
2338                let addresses = resolve_address(&ssh_connection.host, enfoce_external_addresses)
2339                    .await
2340                    .context("ssh tunnel")?;
2341
2342                let config = SshTunnelConfig {
2343                    host: addresses.into_iter().map(|a| a.to_string()).collect(),
2344                    port: ssh_connection.port,
2345                    user: ssh_connection.user.clone(),
2346                    key_pair,
2347                };
2348                mz_sql_server_util::config::TunnelConfig::Ssh {
2349                    config,
2350                    manager: storage_configuration
2351                        .connection_context
2352                        .ssh_tunnel_manager
2353                        .clone(),
2354                    timeout: storage_configuration.parameters.ssh_timeout_config.clone(),
2355                    host: self.host.clone(),
2356                    port: self.port,
2357                }
2358            }
2359            Tunnel::AwsPrivatelink(private_link_connection) => {
2360                assert_none!(private_link_connection.port);
2361                mz_sql_server_util::config::TunnelConfig::AwsPrivatelink {
2362                    connection_id: private_link_connection.connection_id,
2363                    port: self.port,
2364                }
2365            }
2366        };
2367
2368        Ok(mz_sql_server_util::Config::new(
2369            inner_config,
2370            tunnel,
2371            in_task,
2372        ))
2373    }
2374}
2375
2376#[derive(Debug, Clone, thiserror::Error)]
2377pub enum SqlServerConnectionValidationError {
2378    #[error("Invalid SQL Server system replication settings")]
2379    ReplicationSettingsError(Vec<(String, String, String)>),
2380}
2381
2382impl SqlServerConnectionValidationError {
2383    pub fn detail(&self) -> Option<String> {
2384        match self {
2385            Self::ReplicationSettingsError(settings) => Some(format!(
2386                "Invalid SQL Server system replication settings: {}",
2387                itertools::join(
2388                    settings.iter().map(|(setting, expected, actual)| format!(
2389                        "{}: expected {}, got {}",
2390                        setting, expected, actual
2391                    )),
2392                    "; "
2393                )
2394            )),
2395        }
2396    }
2397
2398    pub fn hint(&self) -> Option<String> {
2399        match self {
2400            _ => None,
2401        }
2402    }
2403}
2404
2405impl<R: ConnectionResolver> IntoInlineConnection<SqlServerConnectionDetails, R>
2406    for SqlServerConnectionDetails<ReferencedConnection>
2407{
2408    fn into_inline_connection(self, r: R) -> SqlServerConnectionDetails {
2409        let SqlServerConnectionDetails {
2410            host,
2411            port,
2412            database,
2413            user,
2414            password,
2415            tunnel,
2416            encryption,
2417            certificate_validation_policy,
2418            tls_root_cert,
2419        } = self;
2420
2421        SqlServerConnectionDetails {
2422            host,
2423            port,
2424            database,
2425            user,
2426            password,
2427            tunnel: tunnel.into_inline_connection(&r),
2428            encryption,
2429            certificate_validation_policy,
2430            tls_root_cert,
2431        }
2432    }
2433}
2434
2435impl<C: ConnectionAccess> AlterCompatible for SqlServerConnectionDetails<C> {
2436    fn alter_compatible(
2437        &self,
2438        id: mz_repr::GlobalId,
2439        other: &Self,
2440    ) -> Result<(), crate::controller::AlterError> {
2441        let SqlServerConnectionDetails {
2442            tunnel,
2443            // TODO(sql_server2): Figure out how these variables are allowed to change.
2444            host: _,
2445            port: _,
2446            database: _,
2447            user: _,
2448            password: _,
2449            encryption: _,
2450            certificate_validation_policy: _,
2451            tls_root_cert: _,
2452        } = self;
2453
2454        let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
2455
2456        for (compatible, field) in compatibility_checks {
2457            if !compatible {
2458                tracing::warn!(
2459                    "SqlServerConnectionDetails incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2460                    self,
2461                    other
2462                );
2463
2464                return Err(AlterError { id });
2465            }
2466        }
2467        Ok(())
2468    }
2469}
2470
2471/// A connection to an SSH tunnel.
2472#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2473pub struct SshConnection {
2474    pub host: String,
2475    pub port: u16,
2476    pub user: String,
2477}
2478
2479use self::inline::{
2480    ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
2481    ReferencedConnection,
2482};
2483
2484impl AlterCompatible for SshConnection {
2485    fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
2486        // Every element of the SSH connection is configurable.
2487        Ok(())
2488    }
2489}
2490
2491/// Specifies an AWS PrivateLink service for a [`Tunnel`].
2492#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2493pub struct AwsPrivatelink {
2494    /// The ID of the connection to the AWS PrivateLink service.
2495    pub connection_id: CatalogItemId,
2496    // The availability zone to use when connecting to the AWS PrivateLink service.
2497    pub availability_zone: Option<String>,
2498    /// The port to use when connecting to the AWS PrivateLink service, if
2499    /// different from the port in [`KafkaBroker::address`].
2500    pub port: Option<u16>,
2501}
2502
2503impl AlterCompatible for AwsPrivatelink {
2504    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2505        let AwsPrivatelink {
2506            connection_id,
2507            availability_zone: _,
2508            port: _,
2509        } = self;
2510
2511        let compatibility_checks = [(connection_id == &other.connection_id, "connection_id")];
2512
2513        for (compatible, field) in compatibility_checks {
2514            if !compatible {
2515                tracing::warn!(
2516                    "AwsPrivatelink incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2517                    self,
2518                    other
2519                );
2520
2521                return Err(AlterError { id });
2522            }
2523        }
2524
2525        Ok(())
2526    }
2527}
2528
2529/// Specifies an SSH tunnel connection.
2530#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2531pub struct SshTunnel<C: ConnectionAccess = InlinedConnection> {
2532    /// id of the ssh connection
2533    pub connection_id: CatalogItemId,
2534    /// ssh connection object
2535    pub connection: C::Ssh,
2536}
2537
2538impl<R: ConnectionResolver> IntoInlineConnection<SshTunnel, R> for SshTunnel<ReferencedConnection> {
2539    fn into_inline_connection(self, r: R) -> SshTunnel {
2540        let SshTunnel {
2541            connection,
2542            connection_id,
2543        } = self;
2544
2545        SshTunnel {
2546            connection: r.resolve_connection(connection).unwrap_ssh(),
2547            connection_id,
2548        }
2549    }
2550}
2551
2552impl SshTunnel<InlinedConnection> {
2553    /// Like [`SshTunnelConfig::connect`], but the SSH key is loaded from a
2554    /// secret.
2555    async fn connect(
2556        &self,
2557        storage_configuration: &StorageConfiguration,
2558        remote_host: &str,
2559        remote_port: u16,
2560        in_task: InTask,
2561    ) -> Result<ManagedSshTunnelHandle, anyhow::Error> {
2562        // Ensure any ssh-bastion host we connect to is resolved to an external address.
2563        let resolved = resolve_address(
2564            &self.connection.host,
2565            ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2566        )
2567        .await?;
2568        storage_configuration
2569            .connection_context
2570            .ssh_tunnel_manager
2571            .connect(
2572                SshTunnelConfig {
2573                    host: resolved
2574                        .iter()
2575                        .map(|a| a.to_string())
2576                        .collect::<BTreeSet<_>>(),
2577                    port: self.connection.port,
2578                    user: self.connection.user.clone(),
2579                    key_pair: SshKeyPair::from_bytes(
2580                        &storage_configuration
2581                            .connection_context
2582                            .secrets_reader
2583                            .read_in_task_if(in_task, self.connection_id)
2584                            .await?,
2585                    )?,
2586                },
2587                remote_host,
2588                remote_port,
2589                storage_configuration.parameters.ssh_timeout_config,
2590                in_task,
2591            )
2592            .await
2593    }
2594}
2595
2596impl<C: ConnectionAccess> AlterCompatible for SshTunnel<C> {
2597    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2598        let SshTunnel {
2599            connection_id,
2600            connection,
2601        } = self;
2602
2603        let compatibility_checks = [
2604            (connection_id == &other.connection_id, "connection_id"),
2605            (
2606                connection.alter_compatible(id, &other.connection).is_ok(),
2607                "connection",
2608            ),
2609        ];
2610
2611        for (compatible, field) in compatibility_checks {
2612            if !compatible {
2613                tracing::warn!(
2614                    "SshTunnel incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2615                    self,
2616                    other
2617                );
2618
2619                return Err(AlterError { id });
2620            }
2621        }
2622
2623        Ok(())
2624    }
2625}
2626
2627impl SshConnection {
2628    #[allow(clippy::unused_async)]
2629    async fn validate(
2630        &self,
2631        id: CatalogItemId,
2632        storage_configuration: &StorageConfiguration,
2633    ) -> Result<(), anyhow::Error> {
2634        let secret = storage_configuration
2635            .connection_context
2636            .secrets_reader
2637            .read_in_task_if(
2638                // We are in a normal tokio context during validation, already.
2639                InTask::No,
2640                id,
2641            )
2642            .await?;
2643        let key_pair = SshKeyPair::from_bytes(&secret)?;
2644
2645        // Ensure any ssh-bastion host we connect to is resolved to an external address.
2646        let resolved = resolve_address(
2647            &self.host,
2648            ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2649        )
2650        .await?;
2651
2652        let config = SshTunnelConfig {
2653            host: resolved
2654                .iter()
2655                .map(|a| a.to_string())
2656                .collect::<BTreeSet<_>>(),
2657            port: self.port,
2658            user: self.user.clone(),
2659            key_pair,
2660        };
2661        // Note that we do NOT use the `SshTunnelManager` here, as we want to validate that we
2662        // can actually create a new connection to the ssh bastion, without tunneling.
2663        config
2664            .validate(storage_configuration.parameters.ssh_timeout_config)
2665            .await
2666    }
2667
2668    fn validate_by_default(&self) -> bool {
2669        false
2670    }
2671}
2672
2673impl AwsPrivatelinkConnection {
2674    #[allow(clippy::unused_async)]
2675    async fn validate(
2676        &self,
2677        id: CatalogItemId,
2678        storage_configuration: &StorageConfiguration,
2679    ) -> Result<(), anyhow::Error> {
2680        let Some(ref cloud_resource_reader) = storage_configuration
2681            .connection_context
2682            .cloud_resource_reader
2683        else {
2684            return Err(anyhow!("AWS PrivateLink connections are unsupported"));
2685        };
2686
2687        // No need to optionally run this in a task, as we are just validating from envd.
2688        let status = cloud_resource_reader.read(id).await?;
2689
2690        let availability = status
2691            .conditions
2692            .as_ref()
2693            .and_then(|conditions| conditions.iter().find(|c| c.type_ == "Available"));
2694
2695        match availability {
2696            Some(condition) if condition.status == "True" => Ok(()),
2697            Some(condition) => Err(anyhow!("{}", condition.message)),
2698            None => Err(anyhow!("Endpoint availability is unknown")),
2699        }
2700    }
2701
2702    fn validate_by_default(&self) -> bool {
2703        false
2704    }
2705}