Skip to main content

mz_storage_types/
connections.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Connection types.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::net::SocketAddr;
15use std::sync::Arc;
16
17use anyhow::{Context, anyhow};
18use async_trait::async_trait;
19use aws_credential_types::provider::ProvideCredentials;
20use iceberg::Catalog;
21use iceberg::CatalogBuilder;
22use iceberg::io::{
23    AwsCredential, AwsCredentialLoad, CustomAwsCredentialLoader, S3_ACCESS_KEY_ID,
24    S3_DISABLE_EC2_METADATA, S3_REGION, S3_SECRET_ACCESS_KEY,
25};
26use iceberg_catalog_rest::{
27    REST_CATALOG_PROP_URI, REST_CATALOG_PROP_WAREHOUSE, RestCatalogBuilder,
28};
29use itertools::Itertools;
30use mz_ccsr::tls::{Certificate, Identity};
31use mz_cloud_resources::{AwsExternalIdPrefix, CloudResourceReader, vpc_endpoint_host};
32use mz_dyncfg::ConfigSet;
33use mz_kafka_util::client::{
34    BrokerAddr, BrokerRewrite, HostMappingRules, MzClientContext, MzKafkaError, TunnelConfig,
35    TunnelingClientContext,
36};
37use mz_mysql_util::{MySqlConn, MySqlError};
38use mz_ore::assert_none;
39use mz_ore::error::ErrorExt;
40use mz_ore::future::{InTask, OreFutureExt};
41use mz_ore::netio::resolve_address;
42use mz_ore::num::NonNeg;
43use mz_repr::{CatalogItemId, GlobalId};
44use mz_secrets::SecretsReader;
45use mz_sql_parser::ast::ConnectionRulePattern;
46use mz_ssh_util::keys::SshKeyPair;
47use mz_ssh_util::tunnel::SshTunnelConfig;
48use mz_ssh_util::tunnel_manager::{ManagedSshTunnelHandle, SshTunnelManager};
49use mz_tracing::CloneableEnvFilter;
50use rdkafka::ClientContext;
51use rdkafka::config::FromClientConfigAndContext;
52use rdkafka::consumer::{BaseConsumer, Consumer};
53use regex::Regex;
54use serde::{Deserialize, Deserializer, Serialize};
55use tokio::net;
56use tokio::runtime::Handle;
57use tokio_postgres::config::SslMode;
58use tracing::{debug, info, warn};
59use url::Url;
60
61use crate::AlterCompatible;
62use crate::configuration::StorageConfiguration;
63use crate::connections::aws::{
64    AwsAuth, AwsConnection, AwsConnectionReference, AwsConnectionValidationError,
65};
66use crate::connections::string_or_secret::StringOrSecret;
67use crate::controller::AlterError;
68use crate::dyncfgs::{
69    ENFORCE_EXTERNAL_ADDRESSES, KAFKA_CLIENT_ID_ENRICHMENT_RULES,
70    KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM, KAFKA_RECONNECT_BACKOFF,
71    KAFKA_RECONNECT_BACKOFF_MAX, KAFKA_RETRY_BACKOFF, KAFKA_RETRY_BACKOFF_MAX,
72};
73use crate::errors::{ContextCreationError, CsrConnectError};
74
75pub mod aws;
76pub mod inline;
77pub mod string_or_secret;
78
79const REST_CATALOG_PROP_SCOPE: &str = "scope";
80const REST_CATALOG_PROP_CREDENTIAL: &str = "credential";
81
82/// A credential loader that wraps an aws-sdk-rust credentials provider for use with
83/// iceberg/OpenDAL. This allows us to provide refreshable credentials from the AWS SDK
84/// credential chain (including the full assume role chain) to OpenDAL's S3 implementation.
85///
86/// We use this instead of OpenDAL's built-in assume role support because Materialize
87/// has a runtime-defined credential chain (ambient → jump role → user role with external ID)
88/// that can't be expressed via OpenDAL's static configuration properties.
89struct AwsSdkCredentialLoader {
90    /// The underlying AWS SDK credentials provider. For assume role auth, this provider
91    /// already handles the full chain: ambient creds -> jump role -> user role.
92    provider: aws_credential_types::provider::SharedCredentialsProvider,
93}
94
95impl AwsSdkCredentialLoader {
96    fn new(provider: aws_credential_types::provider::SharedCredentialsProvider) -> Self {
97        Self { provider }
98    }
99}
100
101#[async_trait]
102impl AwsCredentialLoad for AwsSdkCredentialLoader {
103    async fn load_credential(
104        &self,
105        _client: reqwest::Client,
106    ) -> anyhow::Result<Option<AwsCredential>> {
107        let creds = self
108            .provider
109            .provide_credentials()
110            .await
111            .map_err(|e| {
112                warn!(
113                    error = %e.display_with_causes(),
114                    "failed to load AWS credentials for Iceberg FileIO from SDK provider"
115                );
116                e
117            })
118            .context(
119                "failed to load AWS credentials from SDK provider for Iceberg FileIO \
120                 (credential source may be temporarily unavailable)",
121            )?;
122
123        Ok(Some(AwsCredential {
124            access_key_id: creds.access_key_id().to_string(),
125            secret_access_key: creds.secret_access_key().to_string(),
126            session_token: creds.session_token().map(|s| s.to_string()),
127            expires_in: creds.expiry().map(|t| t.into()),
128        }))
129    }
130}
131
132/// An extension trait for [`SecretsReader`]
133#[async_trait::async_trait]
134trait SecretsReaderExt {
135    /// `SecretsReader::read`, but optionally run in a task.
136    async fn read_in_task_if(
137        &self,
138        in_task: InTask,
139        id: CatalogItemId,
140    ) -> Result<Vec<u8>, anyhow::Error>;
141
142    /// `SecretsReader::read_string`, but optionally run in a task.
143    async fn read_string_in_task_if(
144        &self,
145        in_task: InTask,
146        id: CatalogItemId,
147    ) -> Result<String, anyhow::Error>;
148}
149
150#[async_trait::async_trait]
151impl SecretsReaderExt for Arc<dyn SecretsReader> {
152    async fn read_in_task_if(
153        &self,
154        in_task: InTask,
155        id: CatalogItemId,
156    ) -> Result<Vec<u8>, anyhow::Error> {
157        let sr = Arc::clone(self);
158        async move { sr.read(id).await }
159            .run_in_task_if(in_task, || "secrets_reader_read".to_string())
160            .await
161    }
162    async fn read_string_in_task_if(
163        &self,
164        in_task: InTask,
165        id: CatalogItemId,
166    ) -> Result<String, anyhow::Error> {
167        let sr = Arc::clone(self);
168        async move { sr.read_string(id).await }
169            .run_in_task_if(in_task, || "secrets_reader_read".to_string())
170            .await
171    }
172}
173
174/// Extra context to pass through when instantiating a connection for a source
175/// or sink.
176///
177/// Should be kept cheaply cloneable.
178#[derive(Debug, Clone)]
179pub struct ConnectionContext {
180    /// An opaque identifier for the environment in which this process is
181    /// running.
182    ///
183    /// The storage layer is intentionally unaware of the structure within this
184    /// identifier. Higher layers of the stack can make use of that structure,
185    /// but the storage layer should be oblivious to it.
186    pub environment_id: String,
187    /// The level for librdkafka's logs.
188    pub librdkafka_log_level: tracing::Level,
189    /// A prefix for an external ID to use for all AWS AssumeRole operations.
190    pub aws_external_id_prefix: Option<AwsExternalIdPrefix>,
191    /// The ARN for a Materialize-controlled role to assume before assuming
192    /// a customer's requested role for an AWS connection.
193    pub aws_connection_role_arn: Option<String>,
194    /// A secrets reader.
195    pub secrets_reader: Arc<dyn SecretsReader>,
196    /// A cloud resource reader, if supported in this configuration.
197    pub cloud_resource_reader: Option<Arc<dyn CloudResourceReader>>,
198    /// A manager for SSH tunnels.
199    pub ssh_tunnel_manager: SshTunnelManager,
200}
201
202impl ConnectionContext {
203    /// Constructs a new connection context from command line arguments.
204    ///
205    /// **WARNING:** it is critical for security that the `aws_external_id` be
206    /// provided by the operator of the Materialize service (i.e., via a CLI
207    /// argument or environment variable) and not the end user of Materialize
208    /// (e.g., via a configuration option in a SQL statement). See
209    /// [`AwsExternalIdPrefix`] for details.
210    pub fn from_cli_args(
211        environment_id: String,
212        startup_log_level: &CloneableEnvFilter,
213        aws_external_id_prefix: Option<AwsExternalIdPrefix>,
214        aws_connection_role_arn: Option<String>,
215        secrets_reader: Arc<dyn SecretsReader>,
216        cloud_resource_reader: Option<Arc<dyn CloudResourceReader>>,
217    ) -> ConnectionContext {
218        ConnectionContext {
219            environment_id,
220            librdkafka_log_level: mz_ore::tracing::crate_level(
221                &startup_log_level.clone().into(),
222                "librdkafka",
223            ),
224            aws_external_id_prefix,
225            aws_connection_role_arn,
226            secrets_reader,
227            cloud_resource_reader,
228            ssh_tunnel_manager: SshTunnelManager::default(),
229        }
230    }
231
232    /// Constructs a new connection context for usage in tests.
233    pub fn for_tests(secrets_reader: Arc<dyn SecretsReader>) -> ConnectionContext {
234        ConnectionContext {
235            environment_id: "test-environment-id".into(),
236            librdkafka_log_level: tracing::Level::INFO,
237            aws_external_id_prefix: Some(
238                AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable(
239                    "test-aws-external-id-prefix",
240                )
241                .expect("infallible"),
242            ),
243            aws_connection_role_arn: Some(
244                "arn:aws:iam::123456789000:role/MaterializeConnection".into(),
245            ),
246            secrets_reader,
247            cloud_resource_reader: None,
248            ssh_tunnel_manager: SshTunnelManager::default(),
249        }
250    }
251}
252
253#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
254pub enum Connection<C: ConnectionAccess = InlinedConnection> {
255    Kafka(KafkaConnection<C>),
256    Csr(CsrConnection<C>),
257    GlueSchemaRegistry(GlueSchemaRegistryConnection<C>),
258    Postgres(PostgresConnection<C>),
259    Ssh(SshConnection),
260    Aws(AwsConnection),
261    AwsPrivatelink(AwsPrivatelinkConnection),
262    MySql(MySqlConnection<C>),
263    SqlServer(SqlServerConnectionDetails<C>),
264    IcebergCatalog(IcebergCatalogConnection<C>),
265}
266
267impl<R: ConnectionResolver> IntoInlineConnection<Connection, R>
268    for Connection<ReferencedConnection>
269{
270    fn into_inline_connection(self, r: R) -> Connection {
271        match self {
272            Connection::Kafka(kafka) => Connection::Kafka(kafka.into_inline_connection(r)),
273            Connection::Csr(csr) => Connection::Csr(csr.into_inline_connection(r)),
274            Connection::GlueSchemaRegistry(glue) => {
275                Connection::GlueSchemaRegistry(glue.into_inline_connection(r))
276            }
277            Connection::Postgres(pg) => Connection::Postgres(pg.into_inline_connection(r)),
278            Connection::Ssh(ssh) => Connection::Ssh(ssh),
279            Connection::Aws(aws) => Connection::Aws(aws),
280            Connection::AwsPrivatelink(awspl) => Connection::AwsPrivatelink(awspl),
281            Connection::MySql(mysql) => Connection::MySql(mysql.into_inline_connection(r)),
282            Connection::SqlServer(sql_server) => {
283                Connection::SqlServer(sql_server.into_inline_connection(r))
284            }
285            Connection::IcebergCatalog(iceberg) => {
286                Connection::IcebergCatalog(iceberg.into_inline_connection(r))
287            }
288        }
289    }
290}
291
292impl<C: ConnectionAccess> Connection<C> {
293    /// Whether this connection should be validated by default on creation.
294    pub fn validate_by_default(&self) -> bool {
295        match self {
296            Connection::Kafka(conn) => conn.validate_by_default(),
297            Connection::Csr(conn) => conn.validate_by_default(),
298            Connection::GlueSchemaRegistry(conn) => conn.validate_by_default(),
299            Connection::Postgres(conn) => conn.validate_by_default(),
300            Connection::Ssh(conn) => conn.validate_by_default(),
301            Connection::Aws(conn) => conn.validate_by_default(),
302            Connection::AwsPrivatelink(conn) => conn.validate_by_default(),
303            Connection::MySql(conn) => conn.validate_by_default(),
304            Connection::SqlServer(conn) => conn.validate_by_default(),
305            Connection::IcebergCatalog(conn) => conn.validate_by_default(),
306        }
307    }
308}
309
310impl Connection<InlinedConnection> {
311    /// Validates this connection by attempting to connect to the upstream system.
312    pub async fn validate(
313        &self,
314        id: CatalogItemId,
315        storage_configuration: &StorageConfiguration,
316    ) -> Result<(), ConnectionValidationError> {
317        match self {
318            Connection::Kafka(conn) => conn.validate(id, storage_configuration).await?,
319            Connection::Csr(conn) => conn.validate(id, storage_configuration).await?,
320            Connection::GlueSchemaRegistry(conn) => {
321                conn.validate(id, storage_configuration).await?
322            }
323            Connection::Postgres(conn) => {
324                conn.validate(id, storage_configuration).await?;
325            }
326            Connection::Ssh(conn) => conn.validate(id, storage_configuration).await?,
327            Connection::Aws(conn) => conn.validate(id, storage_configuration).await?,
328            Connection::AwsPrivatelink(conn) => conn.validate(id, storage_configuration).await?,
329            Connection::MySql(conn) => {
330                conn.validate(id, storage_configuration).await?;
331            }
332            Connection::SqlServer(conn) => {
333                conn.validate(id, storage_configuration).await?;
334            }
335            Connection::IcebergCatalog(conn) => conn.validate(id, storage_configuration).await?,
336        }
337        Ok(())
338    }
339
340    pub fn unwrap_kafka(self) -> <InlinedConnection as ConnectionAccess>::Kafka {
341        match self {
342            Self::Kafka(conn) => conn,
343            o => unreachable!("{o:?} is not a Kafka connection"),
344        }
345    }
346
347    pub fn unwrap_pg(self) -> <InlinedConnection as ConnectionAccess>::Pg {
348        match self {
349            Self::Postgres(conn) => conn,
350            o => unreachable!("{o:?} is not a Postgres connection"),
351        }
352    }
353
354    pub fn unwrap_mysql(self) -> <InlinedConnection as ConnectionAccess>::MySql {
355        match self {
356            Self::MySql(conn) => conn,
357            o => unreachable!("{o:?} is not a MySQL connection"),
358        }
359    }
360
361    pub fn unwrap_sql_server(self) -> <InlinedConnection as ConnectionAccess>::SqlServer {
362        match self {
363            Self::SqlServer(conn) => conn,
364            o => unreachable!("{o:?} is not a SQL Server connection"),
365        }
366    }
367
368    pub fn unwrap_aws(self) -> <InlinedConnection as ConnectionAccess>::Aws {
369        match self {
370            Self::Aws(conn) => conn,
371            o => unreachable!("{o:?} is not an AWS connection"),
372        }
373    }
374
375    pub fn unwrap_ssh(self) -> <InlinedConnection as ConnectionAccess>::Ssh {
376        match self {
377            Self::Ssh(conn) => conn,
378            o => unreachable!("{o:?} is not an SSH connection"),
379        }
380    }
381
382    pub fn unwrap_csr(self) -> <InlinedConnection as ConnectionAccess>::Csr {
383        match self {
384            Self::Csr(conn) => conn,
385            o => unreachable!("{o:?} is not a Kafka connection"),
386        }
387    }
388
389    pub fn unwrap_glue_schema_registry(
390        self,
391    ) -> <InlinedConnection as ConnectionAccess>::GlueSchemaRegistry {
392        match self {
393            Self::GlueSchemaRegistry(conn) => conn,
394            o => unreachable!("{o:?} is not an AWS Glue Schema Registry connection"),
395        }
396    }
397
398    pub fn unwrap_iceberg_catalog(self) -> <InlinedConnection as ConnectionAccess>::IcebergCatalog {
399        match self {
400            Self::IcebergCatalog(conn) => conn,
401            o => unreachable!("{o:?} is not an Iceberg catalog connection"),
402        }
403    }
404}
405
406/// An error returned by [`Connection::validate`].
407#[derive(thiserror::Error, Debug)]
408pub enum ConnectionValidationError {
409    #[error(transparent)]
410    Postgres(#[from] PostgresConnectionValidationError),
411    #[error(transparent)]
412    MySql(#[from] MySqlConnectionValidationError),
413    #[error(transparent)]
414    SqlServer(#[from] SqlServerConnectionValidationError),
415    #[error(transparent)]
416    Aws(#[from] AwsConnectionValidationError),
417    #[error("{}", .0.display_with_causes())]
418    Other(#[from] anyhow::Error),
419}
420
421impl ConnectionValidationError {
422    /// Reports additional details about the error, if any are available.
423    pub fn detail(&self) -> Option<String> {
424        match self {
425            ConnectionValidationError::Postgres(e) => e.detail(),
426            ConnectionValidationError::MySql(e) => e.detail(),
427            ConnectionValidationError::SqlServer(e) => e.detail(),
428            ConnectionValidationError::Aws(e) => e.detail(),
429            ConnectionValidationError::Other(_) => None,
430        }
431    }
432
433    /// Reports a hint for the user about how the error could be fixed.
434    pub fn hint(&self) -> Option<String> {
435        match self {
436            ConnectionValidationError::Postgres(e) => e.hint(),
437            ConnectionValidationError::MySql(e) => e.hint(),
438            ConnectionValidationError::SqlServer(e) => e.hint(),
439            ConnectionValidationError::Aws(e) => e.hint(),
440            ConnectionValidationError::Other(_) => None,
441        }
442    }
443}
444
445impl<C: ConnectionAccess> AlterCompatible for Connection<C> {
446    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
447        match (self, other) {
448            (Self::Aws(s), Self::Aws(o)) => s.alter_compatible(id, o),
449            (Self::AwsPrivatelink(s), Self::AwsPrivatelink(o)) => s.alter_compatible(id, o),
450            (Self::Ssh(s), Self::Ssh(o)) => s.alter_compatible(id, o),
451            (Self::Csr(s), Self::Csr(o)) => s.alter_compatible(id, o),
452            (Self::Kafka(s), Self::Kafka(o)) => s.alter_compatible(id, o),
453            (Self::Postgres(s), Self::Postgres(o)) => s.alter_compatible(id, o),
454            (Self::MySql(s), Self::MySql(o)) => s.alter_compatible(id, o),
455            _ => {
456                tracing::warn!(
457                    "Connection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
458                    self,
459                    other
460                );
461                Err(AlterError { id })
462            }
463        }
464    }
465}
466
467#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
468pub struct RestIcebergCatalog {
469    /// For REST catalogs, the oauth2 credential in a `CLIENT_ID:CLIENT_SECRET` format
470    pub credential: StringOrSecret,
471    /// The oauth2 scope for REST catalogs
472    pub scope: Option<String>,
473    /// The warehouse for REST catalogs
474    pub warehouse: Option<String>,
475}
476
477#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
478pub struct S3TablesRestIcebergCatalog<C: ConnectionAccess = InlinedConnection> {
479    /// The AWS connection details, for s3tables
480    pub aws_connection: AwsConnectionReference<C>,
481    /// The warehouse for s3tables
482    pub warehouse: String,
483}
484
485impl<R: ConnectionResolver> IntoInlineConnection<S3TablesRestIcebergCatalog, R>
486    for S3TablesRestIcebergCatalog<ReferencedConnection>
487{
488    fn into_inline_connection(self, r: R) -> S3TablesRestIcebergCatalog {
489        S3TablesRestIcebergCatalog {
490            aws_connection: self.aws_connection.into_inline_connection(&r),
491            warehouse: self.warehouse,
492        }
493    }
494}
495
496#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
497pub enum IcebergCatalogType {
498    Rest,
499    S3TablesRest,
500}
501
502#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
503pub enum IcebergCatalogImpl<C: ConnectionAccess = InlinedConnection> {
504    Rest(RestIcebergCatalog),
505    S3TablesRest(S3TablesRestIcebergCatalog<C>),
506}
507
508impl<R: ConnectionResolver> IntoInlineConnection<IcebergCatalogImpl, R>
509    for IcebergCatalogImpl<ReferencedConnection>
510{
511    fn into_inline_connection(self, r: R) -> IcebergCatalogImpl {
512        match self {
513            IcebergCatalogImpl::Rest(rest) => IcebergCatalogImpl::Rest(rest),
514            IcebergCatalogImpl::S3TablesRest(s3tables) => {
515                IcebergCatalogImpl::S3TablesRest(s3tables.into_inline_connection(r))
516            }
517        }
518    }
519}
520
521#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
522pub struct IcebergCatalogConnection<C: ConnectionAccess = InlinedConnection> {
523    /// The catalog impl impl of that catalog
524    pub catalog: IcebergCatalogImpl<C>,
525    /// Where the catalog is located
526    pub uri: reqwest::Url,
527}
528
529impl AlterCompatible for IcebergCatalogConnection {
530    fn alter_compatible(&self, id: GlobalId, _other: &Self) -> Result<(), AlterError> {
531        Err(AlterError { id })
532    }
533}
534
535impl<R: ConnectionResolver> IntoInlineConnection<IcebergCatalogConnection, R>
536    for IcebergCatalogConnection<ReferencedConnection>
537{
538    fn into_inline_connection(self, r: R) -> IcebergCatalogConnection {
539        IcebergCatalogConnection {
540            catalog: self.catalog.into_inline_connection(&r),
541            uri: self.uri,
542        }
543    }
544}
545
546impl<C: ConnectionAccess> IcebergCatalogConnection<C> {
547    fn validate_by_default(&self) -> bool {
548        true
549    }
550}
551
552impl IcebergCatalogConnection<InlinedConnection> {
553    pub async fn connect(
554        &self,
555        storage_configuration: &StorageConfiguration,
556        in_task: InTask,
557    ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
558        match self.catalog {
559            IcebergCatalogImpl::S3TablesRest(ref s3tables) => {
560                self.connect_s3tables(s3tables, storage_configuration, in_task)
561                    .await
562            }
563            IcebergCatalogImpl::Rest(ref rest) => {
564                self.connect_rest(rest, storage_configuration, in_task)
565                    .await
566            }
567        }
568    }
569
570    pub fn catalog_type(&self) -> IcebergCatalogType {
571        match self.catalog {
572            IcebergCatalogImpl::S3TablesRest(_) => IcebergCatalogType::S3TablesRest,
573            IcebergCatalogImpl::Rest(_) => IcebergCatalogType::Rest,
574        }
575    }
576
577    pub fn s3tables_catalog(&self) -> Option<&S3TablesRestIcebergCatalog> {
578        match &self.catalog {
579            IcebergCatalogImpl::S3TablesRest(s3tables) => Some(s3tables),
580            IcebergCatalogImpl::Rest(_) => None,
581        }
582    }
583
584    pub fn rest_catalog(&self) -> Option<&RestIcebergCatalog> {
585        match &self.catalog {
586            IcebergCatalogImpl::Rest(rest) => Some(rest),
587            IcebergCatalogImpl::S3TablesRest(_) => None,
588        }
589    }
590
591    async fn connect_s3tables(
592        &self,
593        s3tables: &S3TablesRestIcebergCatalog,
594        storage_configuration: &StorageConfiguration,
595        in_task: InTask,
596    ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
597        let secret_reader = &storage_configuration.connection_context.secrets_reader;
598        let aws_ref = &s3tables.aws_connection;
599        let aws_config = aws_ref
600            .connection
601            .load_sdk_config(
602                &storage_configuration.connection_context,
603                aws_ref.connection_id,
604                in_task,
605                ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
606            )
607            .await
608            .with_context(|| {
609                format!(
610                    "failed to load AWS SDK config for S3 Tables Iceberg catalog \
611                     (connection id: {}, auth method: {}, catalog uri: {}, warehouse: {})",
612                    aws_ref.connection_id,
613                    aws_ref.connection.auth_method(),
614                    self.uri,
615                    s3tables.warehouse
616                )
617            })?;
618
619        let aws_region = aws_ref
620            .connection
621            .region
622            .clone()
623            .unwrap_or_else(|| "us-east-1".to_string());
624
625        let mut props = vec![
626            (S3_REGION.to_string(), aws_region),
627            (S3_DISABLE_EC2_METADATA.to_string(), "true".to_string()),
628            (
629                REST_CATALOG_PROP_WAREHOUSE.to_string(),
630                s3tables.warehouse.clone(),
631            ),
632            (REST_CATALOG_PROP_URI.to_string(), self.uri.to_string()),
633        ];
634
635        let aws_auth = aws_ref.connection.auth.clone();
636
637        if let AwsAuth::Credentials(creds) = &aws_auth {
638            props.push((
639                S3_ACCESS_KEY_ID.to_string(),
640                creds
641                    .access_key_id
642                    .get_string(in_task, secret_reader)
643                    .await?,
644            ));
645            props.push((
646                S3_SECRET_ACCESS_KEY.to_string(),
647                secret_reader.read_string(creds.secret_access_key).await?,
648            ));
649        }
650
651        // Build the catalog with aws_config for REST API signing.
652        // For AssumeRole auth, we also add a FileIO extension so OpenDAL can
653        // use our credential chain for S3 object access.
654        let catalog = RestCatalogBuilder::default()
655            .with_aws_config(aws_config.clone())
656            .load("IcebergCatalog", props.into_iter().collect())
657            .await
658            .with_context(|| {
659                format!(
660                    "failed to create S3 Tables Iceberg catalog \
661                     (connection id: {}, catalog uri: {}, warehouse: {})",
662                    aws_ref.connection_id, self.uri, s3tables.warehouse
663                )
664            })?;
665
666        let catalog = if matches!(aws_auth, AwsAuth::AssumeRole(_)) {
667            let credentials_provider = aws_config
668                .credentials_provider()
669                .ok_or_else(|| anyhow!("aws_config missing credentials provider"))?;
670            let file_io_loader = CustomAwsCredentialLoader::new(Arc::new(
671                AwsSdkCredentialLoader::new(credentials_provider),
672            ));
673            catalog.with_file_io_extension(file_io_loader)
674        } else {
675            catalog
676        };
677
678        Ok(Arc::new(catalog))
679    }
680
681    async fn connect_rest(
682        &self,
683        rest: &RestIcebergCatalog,
684        storage_configuration: &StorageConfiguration,
685        in_task: InTask,
686    ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
687        let mut props = BTreeMap::from([(
688            REST_CATALOG_PROP_URI.to_string(),
689            self.uri.to_string().clone(),
690        )]);
691
692        if let Some(warehouse) = &rest.warehouse {
693            props.insert(REST_CATALOG_PROP_WAREHOUSE.to_string(), warehouse.clone());
694        }
695
696        let credential = rest
697            .credential
698            .get_string(
699                in_task,
700                &storage_configuration.connection_context.secrets_reader,
701            )
702            .await
703            .map_err(|e| anyhow!("failed to read Iceberg catalog credential: {e}"))?;
704        props.insert(REST_CATALOG_PROP_CREDENTIAL.to_string(), credential);
705
706        if let Some(scope) = &rest.scope {
707            props.insert(REST_CATALOG_PROP_SCOPE.to_string(), scope.clone());
708        }
709
710        let catalog = RestCatalogBuilder::default()
711            .load("IcebergCatalog", props.into_iter().collect())
712            .await
713            .map_err(|e| anyhow!("failed to create Iceberg catalog: {e}"))?;
714        Ok(Arc::new(catalog))
715    }
716
717    async fn validate(
718        &self,
719        _id: CatalogItemId,
720        storage_configuration: &StorageConfiguration,
721    ) -> Result<(), ConnectionValidationError> {
722        let catalog = self
723            .connect(storage_configuration, InTask::No)
724            .await
725            .map_err(|e| {
726                ConnectionValidationError::Other(anyhow!("failed to connect to catalog: {e}"))
727            })?;
728
729        // If we can list namespaces, the connection is valid.
730        catalog.list_namespaces(None).await.map_err(|e| {
731            ConnectionValidationError::Other(anyhow!("failed to list namespaces: {e}"))
732        })?;
733
734        Ok(())
735    }
736}
737
738#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
739pub struct AwsPrivatelinkConnection {
740    pub service_name: String,
741    pub availability_zones: Vec<String>,
742}
743
744impl AlterCompatible for AwsPrivatelinkConnection {
745    fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
746        // Every element of the AwsPrivatelinkConnection connection is configurable.
747        Ok(())
748    }
749}
750
751#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
752pub struct KafkaTlsConfig {
753    pub identity: Option<TlsIdentity>,
754    pub root_cert: Option<StringOrSecret>,
755}
756
757#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
758pub struct KafkaSaslConfig<C: ConnectionAccess = InlinedConnection> {
759    pub mechanism: String,
760    pub username: StringOrSecret,
761    pub password: Option<CatalogItemId>,
762    pub aws: Option<AwsConnectionReference<C>>,
763}
764
765impl<R: ConnectionResolver> IntoInlineConnection<KafkaSaslConfig, R>
766    for KafkaSaslConfig<ReferencedConnection>
767{
768    fn into_inline_connection(self, r: R) -> KafkaSaslConfig {
769        KafkaSaslConfig {
770            mechanism: self.mechanism,
771            username: self.username,
772            password: self.password,
773            aws: self.aws.map(|aws| aws.into_inline_connection(&r)),
774        }
775    }
776}
777
778/// Specifies a Kafka broker in a [`KafkaConnection`].
779#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
780pub struct KafkaBroker<C: ConnectionAccess = InlinedConnection> {
781    /// The address of the Kafka broker.
782    pub address: String,
783    /// An optional tunnel to use when connecting to the broker.
784    pub tunnel: Tunnel<C>,
785}
786
787impl<R: ConnectionResolver> IntoInlineConnection<KafkaBroker, R>
788    for KafkaBroker<ReferencedConnection>
789{
790    fn into_inline_connection(self, r: R) -> KafkaBroker {
791        let KafkaBroker { address, tunnel } = self;
792        KafkaBroker {
793            address,
794            tunnel: tunnel.into_inline_connection(r),
795        }
796    }
797}
798
799#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Default)]
800pub struct KafkaTopicOptions {
801    /// The replication factor for the topic.
802    /// If `None`, the broker default will be used.
803    pub replication_factor: Option<NonNeg<i32>>,
804    /// The number of partitions to create.
805    /// If `None`, the broker default will be used.
806    pub partition_count: Option<NonNeg<i32>>,
807    /// The initial configuration parameters for the topic.
808    pub topic_config: BTreeMap<String, String>,
809}
810
811#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
812pub struct KafkaConnection<C: ConnectionAccess = InlinedConnection> {
813    pub brokers: Vec<KafkaBroker<C>>,
814    /// A tunnel through which to route traffic,
815    /// that can be overridden for individual brokers
816    /// in `brokers`.
817    pub default_tunnel: Tunnel<C>,
818    pub progress_topic: Option<String>,
819    pub progress_topic_options: KafkaTopicOptions,
820    pub options: BTreeMap<String, StringOrSecret>,
821    pub tls: Option<KafkaTlsConfig>,
822    pub sasl: Option<KafkaSaslConfig<C>>,
823}
824
825impl<R: ConnectionResolver> IntoInlineConnection<KafkaConnection, R>
826    for KafkaConnection<ReferencedConnection>
827{
828    fn into_inline_connection(self, r: R) -> KafkaConnection {
829        let KafkaConnection {
830            brokers,
831            progress_topic,
832            progress_topic_options,
833            default_tunnel,
834            options,
835            tls,
836            sasl,
837        } = self;
838
839        let brokers = brokers
840            .into_iter()
841            .map(|broker| broker.into_inline_connection(&r))
842            .collect();
843
844        KafkaConnection {
845            brokers,
846            progress_topic,
847            progress_topic_options,
848            default_tunnel: default_tunnel.into_inline_connection(&r),
849            options,
850            tls,
851            sasl: sasl.map(|sasl| sasl.into_inline_connection(&r)),
852        }
853    }
854}
855
856impl<C: ConnectionAccess> KafkaConnection<C> {
857    /// Returns the name of the progress topic to use for the connection.
858    ///
859    /// The caller is responsible for providing the connection ID as it is not
860    /// known to `KafkaConnection`.
861    pub fn progress_topic(
862        &self,
863        connection_context: &ConnectionContext,
864        connection_id: CatalogItemId,
865    ) -> Cow<'_, str> {
866        if let Some(progress_topic) = &self.progress_topic {
867            Cow::Borrowed(progress_topic)
868        } else {
869            Cow::Owned(format!(
870                "_materialize-progress-{}-{}",
871                connection_context.environment_id, connection_id,
872            ))
873        }
874    }
875
876    fn validate_by_default(&self) -> bool {
877        true
878    }
879}
880
881impl KafkaConnection {
882    /// Generates a string that can be used as the base for a configuration ID
883    /// (e.g., `client.id`, `group.id`, `transactional.id`) for a Kafka source
884    /// or sink.
885    pub fn id_base(
886        connection_context: &ConnectionContext,
887        connection_id: CatalogItemId,
888        object_id: GlobalId,
889    ) -> String {
890        format!(
891            "materialize-{}-{}-{}",
892            connection_context.environment_id, connection_id, object_id,
893        )
894    }
895
896    /// Enriches the provided `client_id` according to any enrichment rules in
897    /// the `kafka_client_id_enrichment_rules` configuration parameter.
898    pub fn enrich_client_id(&self, configs: &ConfigSet, client_id: &mut String) {
899        #[derive(Debug, Deserialize)]
900        struct EnrichmentRule {
901            #[serde(deserialize_with = "deserialize_regex")]
902            pattern: Regex,
903            payload: String,
904        }
905
906        fn deserialize_regex<'de, D>(deserializer: D) -> Result<Regex, D::Error>
907        where
908            D: Deserializer<'de>,
909        {
910            let buf = String::deserialize(deserializer)?;
911            Regex::new(&buf).map_err(serde::de::Error::custom)
912        }
913
914        let rules = KAFKA_CLIENT_ID_ENRICHMENT_RULES.get(configs);
915        let rules = match serde_json::from_value::<Vec<EnrichmentRule>>(rules) {
916            Ok(rules) => rules,
917            Err(e) => {
918                warn!(%e, "failed to decode kafka_client_id_enrichment_rules");
919                return;
920            }
921        };
922
923        // Check every rule against every broker. Rules are matched in the order
924        // that they are specified. It is usually a configuration error if
925        // multiple rules match the same list of Kafka brokers, but we
926        // nonetheless want to provide well defined semantics.
927        debug!(?self.brokers, "evaluating client ID enrichment rules");
928        for rule in rules {
929            let is_match = self
930                .brokers
931                .iter()
932                .any(|b| rule.pattern.is_match(&b.address));
933            debug!(?rule, is_match, "evaluated client ID enrichment rule");
934            if is_match {
935                client_id.push('-');
936                client_id.push_str(&rule.payload);
937            }
938        }
939    }
940
941    /// Creates a Kafka client for the connection.
942    pub async fn create_with_context<C, T>(
943        &self,
944        storage_configuration: &StorageConfiguration,
945        context: C,
946        extra_options: &BTreeMap<&str, String>,
947        in_task: InTask,
948    ) -> Result<T, ContextCreationError>
949    where
950        C: ClientContext,
951        T: FromClientConfigAndContext<TunnelingClientContext<C>>,
952    {
953        let mut options = self.options.clone();
954
955        // Ensure that Kafka topics are *not* automatically created when
956        // consuming, producing, or fetching metadata for a topic. This ensures
957        // that we don't accidentally create topics with the wrong number of
958        // partitions.
959        options.insert("allow.auto.create.topics".into(), "false".into());
960
961        let brokers = match &self.default_tunnel {
962            Tunnel::AwsPrivatelink(t) => {
963                assert!(&self.brokers.is_empty());
964
965                let algo = KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM
966                    .get(storage_configuration.config_set());
967                options.insert("ssl.endpoint.identification.algorithm".into(), algo.into());
968
969                // When using a default privatelink tunnel broker/brokers cannot be specified
970                // instead the tunnel connection_id and port are used for the initial connection.
971                format!(
972                    "{}:{}",
973                    vpc_endpoint_host(
974                        t.connection_id,
975                        None, // Default tunnel does not support availability zones.
976                    ),
977                    t.port.unwrap_or(9092)
978                )
979            }
980            Tunnel::AwsPrivatelinks(_pl) => {
981                let algo = KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM
982                    .get(storage_configuration.config_set());
983                options.insert("ssl.endpoint.identification.algorithm".into(), algo.into());
984
985                if self.brokers.is_empty() {
986                    return Err(ContextCreationError::Other(anyhow::anyhow!(
987                        "at least one static broker is required when using BROKER or BROKERS"
988                    )));
989                }
990                self.brokers.iter().map(|b| &b.address).join(",")
991            }
992            _ => self.brokers.iter().map(|b| &b.address).join(","),
993        };
994        options.insert("bootstrap.servers".into(), brokers.clone().into());
995        let security_protocol = match (self.tls.is_some(), self.sasl.is_some()) {
996            (false, false) => "PLAINTEXT",
997            (true, false) => "SSL",
998            (false, true) => "SASL_PLAINTEXT",
999            (true, true) => "SASL_SSL",
1000        };
1001        info!(
1002            "kafka: create_with_context bootstrap.servers={brokers}, security_protocol={security_protocol}"
1003        );
1004        options.insert("security.protocol".into(), security_protocol.into());
1005        if let Some(tls) = &self.tls {
1006            if let Some(root_cert) = &tls.root_cert {
1007                options.insert("ssl.ca.pem".into(), root_cert.clone());
1008            }
1009            if let Some(identity) = &tls.identity {
1010                options.insert("ssl.key.pem".into(), StringOrSecret::Secret(identity.key));
1011                options.insert("ssl.certificate.pem".into(), identity.cert.clone());
1012            }
1013        }
1014        if let Some(sasl) = &self.sasl {
1015            options.insert("sasl.mechanisms".into(), (&sasl.mechanism).into());
1016            options.insert("sasl.username".into(), sasl.username.clone());
1017            if let Some(password) = sasl.password {
1018                options.insert("sasl.password".into(), StringOrSecret::Secret(password));
1019            }
1020        }
1021
1022        options.insert(
1023            "retry.backoff.ms".into(),
1024            KAFKA_RETRY_BACKOFF
1025                .get(storage_configuration.config_set())
1026                .as_millis()
1027                .into(),
1028        );
1029        options.insert(
1030            "retry.backoff.max.ms".into(),
1031            KAFKA_RETRY_BACKOFF_MAX
1032                .get(storage_configuration.config_set())
1033                .as_millis()
1034                .into(),
1035        );
1036        options.insert(
1037            "reconnect.backoff.ms".into(),
1038            KAFKA_RECONNECT_BACKOFF
1039                .get(storage_configuration.config_set())
1040                .as_millis()
1041                .into(),
1042        );
1043        options.insert(
1044            "reconnect.backoff.max.ms".into(),
1045            KAFKA_RECONNECT_BACKOFF_MAX
1046                .get(storage_configuration.config_set())
1047                .as_millis()
1048                .into(),
1049        );
1050
1051        let mut config = mz_kafka_util::client::create_new_client_config(
1052            storage_configuration
1053                .connection_context
1054                .librdkafka_log_level,
1055            storage_configuration.parameters.kafka_timeout_config,
1056        );
1057        for (k, v) in options {
1058            config.set(
1059                k,
1060                v.get_string(
1061                    in_task,
1062                    &storage_configuration.connection_context.secrets_reader,
1063                )
1064                .await
1065                .context("reading kafka secret")?,
1066            );
1067        }
1068        for (k, v) in extra_options {
1069            config.set(*k, v);
1070        }
1071
1072        let aws_config = match self.sasl.as_ref().and_then(|sasl| sasl.aws.as_ref()) {
1073            None => None,
1074            Some(aws) => Some(
1075                aws.connection
1076                    .load_sdk_config(
1077                        &storage_configuration.connection_context,
1078                        aws.connection_id,
1079                        in_task,
1080                        ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1081                    )
1082                    .await?,
1083            ),
1084        };
1085
1086        // TODO(roshan): Implement enforcement of external address validation once
1087        // rdkafka client has been updated to support providing multiple resolved
1088        // addresses for brokers
1089        let mut context = TunnelingClientContext::new(
1090            context,
1091            Handle::current(),
1092            storage_configuration
1093                .connection_context
1094                .ssh_tunnel_manager
1095                .clone(),
1096            storage_configuration.parameters.ssh_timeout_config,
1097            aws_config,
1098            in_task,
1099        );
1100
1101        match &self.default_tunnel {
1102            Tunnel::Direct => {
1103                // By default, don't offer a default override for broker address lookup.
1104            }
1105            Tunnel::AwsPrivatelink(pl) => {
1106                context.set_default_tunnel(TunnelConfig::StaticHost(
1107                    // Possible bug: We have been ignoring the configured port.
1108                    KafkaConnection::from_default_aws_privatelink(pl).host,
1109                ));
1110            }
1111            Tunnel::AwsPrivatelinks(pl) => {
1112                context.set_default_tunnel(TunnelConfig::Rules(
1113                    KafkaConnection::from_aws_privatelinks(pl),
1114                ));
1115            }
1116            Tunnel::Ssh(ssh_tunnel) => {
1117                let secret = storage_configuration
1118                    .connection_context
1119                    .secrets_reader
1120                    .read_in_task_if(in_task, ssh_tunnel.connection_id)
1121                    .await?;
1122                let key_pair = SshKeyPair::from_bytes(&secret)?;
1123
1124                // Ensure any ssh-bastion address we connect to is resolved to an external address.
1125                let resolved = resolve_address(
1126                    &ssh_tunnel.connection.host,
1127                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1128                )
1129                .await?;
1130                context.set_default_tunnel(TunnelConfig::Ssh(SshTunnelConfig {
1131                    host: resolved
1132                        .iter()
1133                        .map(|a| a.to_string())
1134                        .collect::<BTreeSet<_>>(),
1135                    port: ssh_tunnel.connection.port,
1136                    user: ssh_tunnel.connection.user.clone(),
1137                    key_pair,
1138                }));
1139            }
1140        }
1141        info!(
1142            "kafka: tunnel config set to {}",
1143            match &self.default_tunnel {
1144                Tunnel::Direct => "Direct".to_string(),
1145                Tunnel::AwsPrivatelink(_) => "AwsPrivatelink (static host)".to_string(),
1146                Tunnel::AwsPrivatelinks(pl) =>
1147                    format!("AwsPrivatelinks ({} rules)", pl.rules.len()),
1148                Tunnel::Ssh(_) => "Ssh".to_string(),
1149            }
1150        );
1151
1152        // Here, we preemptively rewrite broker addresses.
1153        // In concept, this overlaps with 'TunnelingClientContext::resolve_broker_addr'.
1154        for broker in &self.brokers {
1155            let mut addr_parts = broker.address.splitn(2, ':');
1156            let addr = BrokerAddr {
1157                host: addr_parts
1158                    .next()
1159                    .context("BROKER is not address:port")?
1160                    .into(),
1161                port: addr_parts
1162                    .next()
1163                    .unwrap_or("9092")
1164                    .parse()
1165                    .context("parsing BROKER port")?,
1166            };
1167            match &broker.tunnel {
1168                Tunnel::Direct => {
1169                    // By default, don't override broker address lookup.
1170                    //
1171                    // N.B.
1172                    //
1173                    // We _could_ pre-setup the default ssh tunnel for all known brokers here, but
1174                    // we avoid doing because:
1175                    // - Its not necessary.
1176                    // - Not doing so makes it easier to test the `FailedDefaultSshTunnel` path
1177                    // in the `TunnelingClientContext`.
1178                }
1179                Tunnel::AwsPrivatelink(aws_privatelink) => {
1180                    context.add_broker_rewrite(
1181                        addr,
1182                        KafkaConnection::from_aws_privatelink(aws_privatelink),
1183                    );
1184                }
1185                Tunnel::AwsPrivatelinks(_) => unreachable!(
1186                    "Individually predefined brokers do not use rule-based PrivateLinks routing."
1187                ),
1188                Tunnel::Ssh(ssh_tunnel) => {
1189                    // Ensure any SSH bastion address we connect to is resolved to an external address.
1190                    let ssh_host_resolved = resolve_address(
1191                        &ssh_tunnel.connection.host,
1192                        ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1193                    )
1194                    .await?;
1195                    context
1196                        .add_ssh_tunnel(
1197                            addr,
1198                            SshTunnelConfig {
1199                                host: ssh_host_resolved
1200                                    .iter()
1201                                    .map(|a| a.to_string())
1202                                    .collect::<BTreeSet<_>>(),
1203                                port: ssh_tunnel.connection.port,
1204                                user: ssh_tunnel.connection.user.clone(),
1205                                key_pair: SshKeyPair::from_bytes(
1206                                    &storage_configuration
1207                                        .connection_context
1208                                        .secrets_reader
1209                                        .read_in_task_if(in_task, ssh_tunnel.connection_id)
1210                                        .await?,
1211                                )?,
1212                            },
1213                        )
1214                        .await
1215                        .map_err(ContextCreationError::Ssh)?;
1216                }
1217            }
1218        }
1219
1220        Ok(config.create_with_context(context)?)
1221    }
1222
1223    async fn validate(
1224        &self,
1225        _id: CatalogItemId,
1226        storage_configuration: &StorageConfiguration,
1227    ) -> Result<(), anyhow::Error> {
1228        let (context, error_rx) = MzClientContext::with_errors();
1229        let consumer: BaseConsumer<_> = self
1230            .create_with_context(
1231                storage_configuration,
1232                context,
1233                &BTreeMap::new(),
1234                // We are in a normal tokio context during validation, already.
1235                InTask::No,
1236            )
1237            .await?;
1238        let consumer = Arc::new(consumer);
1239
1240        let timeout = storage_configuration
1241            .parameters
1242            .kafka_timeout_config
1243            .fetch_metadata_timeout;
1244
1245        // librdkafka doesn't expose an API for determining whether a connection to
1246        // the Kafka cluster has been successfully established. So we make a
1247        // metadata request, though we don't care about the results, so that we can
1248        // report any errors making that request. If the request succeeds, we know
1249        // we were able to contact at least one broker, and that's a good proxy for
1250        // being able to contact all the brokers in the cluster.
1251        //
1252        // The downside of this approach is it produces a generic error message like
1253        // "metadata fetch error" with no additional details. The real networking
1254        // error is buried in the librdkafka logs, which are not visible to users.
1255        info!("kafka: starting connection validation via fetch_metadata (timeout={timeout:?})");
1256        let result = mz_ore::task::spawn_blocking(|| "kafka_get_metadata", {
1257            let consumer = Arc::clone(&consumer);
1258            move || consumer.fetch_metadata(None, timeout)
1259        })
1260        .await;
1261        info!(
1262            "kafka: connection validation result: {}",
1263            if result.is_ok() { "success" } else { "failed" },
1264        );
1265        match result {
1266            Ok(_) => Ok(()),
1267            // The error returned by `fetch_metadata` does not provide any details which makes for
1268            // a crappy user facing error message. For this reason we attempt to grab a better
1269            // error message from the client context, which should contain any error logs emitted
1270            // by librdkafka, and fallback to the generic error if there is nothing there.
1271            Err(err) => {
1272                // Multiple errors might have been logged during this validation but some are more
1273                // relevant than others. Specifically, we prefer non-internal errors over internal
1274                // errors since those give much more useful information to the users.
1275                let main_err = error_rx.try_iter().reduce(|cur, new| match cur {
1276                    MzKafkaError::Internal(_) => new,
1277                    _ => cur,
1278                });
1279
1280                // Don't drop the consumer until after we've drained the errors
1281                // channel. Dropping the consumer can introduce spurious errors.
1282                // See database-issues#7432.
1283                drop(consumer);
1284
1285                match main_err {
1286                    Some(err) => Err(err.into()),
1287                    None => Err(err.into()),
1288                }
1289            }
1290        }
1291    }
1292
1293    /// The "default" PrivateLink connection is used for bootstrapping Kafka.
1294    fn from_default_aws_privatelink(pl: &AwsPrivatelink) -> BrokerRewrite {
1295        BrokerRewrite {
1296            host: vpc_endpoint_host(
1297                pl.connection_id,
1298                None, // Default tunnel does not support availability zones.
1299            ),
1300            port: pl.port,
1301        }
1302    }
1303
1304    /// The "not default" PrivateLink connections are used for routing to specific Kafka brokers.
1305    fn from_aws_privatelink(pl: &AwsPrivatelink) -> BrokerRewrite {
1306        BrokerRewrite {
1307            host: vpc_endpoint_host(pl.connection_id, pl.availability_zone.as_deref()),
1308            port: pl.port,
1309        }
1310    }
1311
1312    fn from_aws_privatelink_rule(
1313        AwsPrivatelinkRule { pattern, to }: &AwsPrivatelinkRule,
1314    ) -> (mz_kafka_util::client::ConnectionRulePattern, BrokerRewrite) {
1315        (
1316            mz_kafka_util::client::ConnectionRulePattern {
1317                prefix_wildcard: pattern.prefix_wildcard,
1318                literal_match: pattern.literal_match.clone(),
1319                suffix_wildcard: pattern.suffix_wildcard,
1320            },
1321            KafkaConnection::from_aws_privatelink(to),
1322        )
1323    }
1324
1325    fn from_aws_privatelinks(pl: &AwsPrivatelinks) -> HostMappingRules {
1326        HostMappingRules {
1327            rules: pl
1328                .rules
1329                .iter()
1330                .map(KafkaConnection::from_aws_privatelink_rule)
1331                .collect_vec(),
1332        }
1333    }
1334}
1335
1336impl<C: ConnectionAccess> AlterCompatible for KafkaConnection<C> {
1337    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1338        let KafkaConnection {
1339            brokers: _,
1340            default_tunnel: _,
1341            progress_topic,
1342            progress_topic_options,
1343            options: _,
1344            tls: _,
1345            sasl: _,
1346        } = self;
1347
1348        let compatibility_checks = [
1349            (progress_topic == &other.progress_topic, "progress_topic"),
1350            (
1351                progress_topic_options == &other.progress_topic_options,
1352                "progress_topic_options",
1353            ),
1354        ];
1355
1356        for (compatible, field) in compatibility_checks {
1357            if !compatible {
1358                tracing::warn!(
1359                    "KafkaConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1360                    self,
1361                    other
1362                );
1363
1364                return Err(AlterError { id });
1365            }
1366        }
1367
1368        Ok(())
1369    }
1370}
1371
1372/// A connection to a Confluent Schema Registry.
1373#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1374pub struct CsrConnection<C: ConnectionAccess = InlinedConnection> {
1375    /// The URL of the schema registry.
1376    pub url: Url,
1377    /// Trusted root TLS certificate in PEM format.
1378    pub tls_root_cert: Option<StringOrSecret>,
1379    /// An optional TLS client certificate for authentication with the schema
1380    /// registry.
1381    pub tls_identity: Option<TlsIdentity>,
1382    /// Optional HTTP authentication credentials for the schema registry.
1383    pub http_auth: Option<CsrConnectionHttpAuth>,
1384    /// A tunnel through which to route traffic.
1385    pub tunnel: Tunnel<C>,
1386}
1387
1388impl<R: ConnectionResolver> IntoInlineConnection<CsrConnection, R>
1389    for CsrConnection<ReferencedConnection>
1390{
1391    fn into_inline_connection(self, r: R) -> CsrConnection {
1392        let CsrConnection {
1393            url,
1394            tls_root_cert,
1395            tls_identity,
1396            http_auth,
1397            tunnel,
1398        } = self;
1399        CsrConnection {
1400            url,
1401            tls_root_cert,
1402            tls_identity,
1403            http_auth,
1404            tunnel: tunnel.into_inline_connection(r),
1405        }
1406    }
1407}
1408
1409impl<C: ConnectionAccess> CsrConnection<C> {
1410    fn validate_by_default(&self) -> bool {
1411        true
1412    }
1413}
1414
1415impl CsrConnection {
1416    /// Constructs a schema registry client from the connection.
1417    pub async fn connect(
1418        &self,
1419        storage_configuration: &StorageConfiguration,
1420        in_task: InTask,
1421    ) -> Result<mz_ccsr::Client, CsrConnectError> {
1422        let mut client_config = mz_ccsr::ClientConfig::new(self.url.clone());
1423        if let Some(root_cert) = &self.tls_root_cert {
1424            let root_cert = root_cert
1425                .get_string(
1426                    in_task,
1427                    &storage_configuration.connection_context.secrets_reader,
1428                )
1429                .await?;
1430            let root_cert = Certificate::from_pem(root_cert.as_bytes())?;
1431            client_config = client_config.add_root_certificate(root_cert);
1432        }
1433
1434        if let Some(tls_identity) = &self.tls_identity {
1435            let key = &storage_configuration
1436                .connection_context
1437                .secrets_reader
1438                .read_string_in_task_if(in_task, tls_identity.key)
1439                .await?;
1440            let cert = tls_identity
1441                .cert
1442                .get_string(
1443                    in_task,
1444                    &storage_configuration.connection_context.secrets_reader,
1445                )
1446                .await?;
1447            let ident = Identity::from_pem(key.as_bytes(), cert.as_bytes())?;
1448            client_config = client_config.identity(ident);
1449        }
1450
1451        if let Some(http_auth) = &self.http_auth {
1452            let username = http_auth
1453                .username
1454                .get_string(
1455                    in_task,
1456                    &storage_configuration.connection_context.secrets_reader,
1457                )
1458                .await?;
1459            let password = match http_auth.password {
1460                None => None,
1461                Some(password) => Some(
1462                    storage_configuration
1463                        .connection_context
1464                        .secrets_reader
1465                        .read_string_in_task_if(in_task, password)
1466                        .await?,
1467                ),
1468            };
1469            client_config = client_config.auth(username, password);
1470        }
1471
1472        // TODO: use types to enforce that the URL has a string hostname.
1473        let host = self
1474            .url
1475            .host_str()
1476            .ok_or_else(|| anyhow!("url missing host"))?;
1477        match &self.tunnel {
1478            Tunnel::Direct => {
1479                // Ensure any host we connect to is resolved to an external address.
1480                let resolved = resolve_address(
1481                    host,
1482                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1483                )
1484                .await?;
1485                client_config = client_config.resolve_to_addrs(
1486                    host,
1487                    &resolved
1488                        .iter()
1489                        .map(|addr| SocketAddr::new(*addr, 0))
1490                        .collect::<Vec<_>>(),
1491                )
1492            }
1493            Tunnel::Ssh(ssh_tunnel) => {
1494                let ssh_tunnel = ssh_tunnel
1495                    .connect(
1496                        storage_configuration,
1497                        host,
1498                        // Honor the URL scheme's default port (443 for https,
1499                        // 80 for http) if no explicit port was provided.
1500                        self.url.port_or_known_default().unwrap_or(80),
1501                        in_task,
1502                    )
1503                    .await
1504                    .map_err(CsrConnectError::Ssh)?;
1505
1506                // Carefully inject the SSH tunnel into the client
1507                // configuration. This is delicate because we need TLS
1508                // verification to continue to use the remote hostname rather
1509                // than the tunnel hostname.
1510
1511                client_config = client_config
1512                    // `resolve_to_addrs` allows us to rewrite the hostname
1513                    // at the DNS level, which means the TCP connection is
1514                    // correctly routed through the tunnel, but TLS verification
1515                    // is still performed against the remote hostname.
1516                    // Unfortunately the port here is ignored if the URL also
1517                    // specifies a port...
1518                    .resolve_to_addrs(host, &[SocketAddr::new(ssh_tunnel.local_addr().ip(), 0)])
1519                    // ...so we also dynamically rewrite the URL to use the
1520                    // current port for the SSH tunnel.
1521                    //
1522                    // WARNING: this is brittle, because we only dynamically
1523                    // update the client configuration with the tunnel *port*,
1524                    // and not the hostname This works fine in practice, because
1525                    // only the SSH tunnel port will change if the tunnel fails
1526                    // and has to be restarted (the hostname is always
1527                    // 127.0.0.1)--but this is an an implementation detail of
1528                    // the SSH tunnel code that we're relying on.
1529                    .dynamic_url({
1530                        let remote_url = self.url.clone();
1531                        move || {
1532                            let mut url = remote_url.clone();
1533                            url.set_port(Some(ssh_tunnel.local_addr().port()))
1534                                .expect("cannot fail");
1535                            url
1536                        }
1537                    });
1538            }
1539            Tunnel::AwsPrivatelink(connection) => {
1540                assert_none!(connection.port);
1541
1542                let privatelink_host = mz_cloud_resources::vpc_endpoint_host(
1543                    connection.connection_id,
1544                    connection.availability_zone.as_deref(),
1545                );
1546                let addrs: Vec<_> = net::lookup_host((privatelink_host, 0))
1547                    .await
1548                    .context("resolving PrivateLink host")?
1549                    .collect();
1550                client_config = client_config.resolve_to_addrs(host, &addrs)
1551            }
1552            Tunnel::AwsPrivatelinks(_) => {
1553                unreachable!("MATCHING broker rules are only available for Kafka connections.");
1554            }
1555        }
1556
1557        Ok(client_config.build()?)
1558    }
1559
1560    async fn validate(
1561        &self,
1562        _id: CatalogItemId,
1563        storage_configuration: &StorageConfiguration,
1564    ) -> Result<(), anyhow::Error> {
1565        let client = self
1566            .connect(
1567                storage_configuration,
1568                // We are in a normal tokio context during validation, already.
1569                InTask::No,
1570            )
1571            .await?;
1572        client.list_subjects().await?;
1573        Ok(())
1574    }
1575}
1576
1577impl<C: ConnectionAccess> AlterCompatible for CsrConnection<C> {
1578    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1579        let CsrConnection {
1580            tunnel,
1581            // All non-tunnel fields may change
1582            url: _,
1583            tls_root_cert: _,
1584            tls_identity: _,
1585            http_auth: _,
1586        } = self;
1587
1588        let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
1589
1590        for (compatible, field) in compatibility_checks {
1591            if !compatible {
1592                tracing::warn!(
1593                    "CsrConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1594                    self,
1595                    other
1596                );
1597
1598                return Err(AlterError { id });
1599            }
1600        }
1601        Ok(())
1602    }
1603}
1604
1605/// A connection to an AWS Glue Schema Registry.
1606///
1607/// AWS credentials, region, and endpoint are inherited from the referenced
1608/// [`AwsConnection`]; this struct only carries the per-registry settings.
1609///
1610/// NOTE: Stage 1 of the GSR rollout. The client crate
1611/// (`mz-aws-glue-schema-registry`) does not exist yet; `validate` is a
1612/// no-op until Stage 3 retrofits a real `GetRegistry` ping. The connection
1613/// can be created and inspected, but cannot yet be attached to a source or
1614/// sink.
1615#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1616pub struct GlueSchemaRegistryConnection<C: ConnectionAccess = InlinedConnection> {
1617    /// The referenced AWS connection that supplies credentials, region, and
1618    /// (optional) endpoint.
1619    pub aws_connection: AwsConnectionReference<C>,
1620    /// The Glue Schema Registry name within the AWS account/region.
1621    pub registry_name: String,
1622}
1623
1624impl<R: ConnectionResolver> IntoInlineConnection<GlueSchemaRegistryConnection, R>
1625    for GlueSchemaRegistryConnection<ReferencedConnection>
1626{
1627    fn into_inline_connection(self, r: R) -> GlueSchemaRegistryConnection {
1628        let GlueSchemaRegistryConnection {
1629            aws_connection,
1630            registry_name,
1631        } = self;
1632        GlueSchemaRegistryConnection {
1633            aws_connection: aws_connection.into_inline_connection(&r),
1634            registry_name,
1635        }
1636    }
1637}
1638
1639impl<C: ConnectionAccess> GlueSchemaRegistryConnection<C> {
1640    fn validate_by_default(&self) -> bool {
1641        // Stage 1 of the AWS Glue Schema Registry rollout: the client crate
1642        // does not exist yet, and the no-op `validate` below succeeds
1643        // unconditionally. Default-validating preserves the API contract so
1644        // that Stage 3's real `GetRegistry` ping slots in without
1645        // behavioral change.
1646        true
1647    }
1648}
1649
1650impl GlueSchemaRegistryConnection {
1651    async fn validate(
1652        &self,
1653        _id: CatalogItemId,
1654        _storage_configuration: &StorageConfiguration,
1655    ) -> Result<(), anyhow::Error> {
1656        // Stage 1: no-op. Real validation arrives in Stage 3 when the
1657        // Glue client crate exists. Until then a `CREATE CONNECTION`
1658        // succeeds even against a registry that doesn't exist; the
1659        // failure will surface on first use (which is itself gated until
1660        // source/sink integration lands in Stages 4/5).
1661        std::future::ready(Ok(())).await
1662    }
1663}
1664
1665impl<C: ConnectionAccess> AlterCompatible for GlueSchemaRegistryConnection<C> {
1666    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1667        let GlueSchemaRegistryConnection {
1668            registry_name,
1669            // The referenced AWS connection itself may be swapped; matches
1670            // the permissive policy of MySqlConnection / SqlServerConnection.
1671            aws_connection: _,
1672        } = self;
1673
1674        let compatibility_checks = [(registry_name == &other.registry_name, "registry_name")];
1675
1676        for (compatible, field) in compatibility_checks {
1677            if !compatible {
1678                tracing::warn!(
1679                    "GlueSchemaRegistryConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1680                    self,
1681                    other
1682                );
1683
1684                return Err(AlterError { id });
1685            }
1686        }
1687        Ok(())
1688    }
1689}
1690
1691/// A TLS key pair used for client identity.
1692#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1693pub struct TlsIdentity {
1694    /// The client's TLS public certificate in PEM format.
1695    pub cert: StringOrSecret,
1696    /// The ID of the secret containing the client's TLS private key in PEM
1697    /// format.
1698    pub key: CatalogItemId,
1699}
1700
1701/// HTTP authentication credentials in a [`CsrConnection`].
1702#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1703pub struct CsrConnectionHttpAuth {
1704    /// The username.
1705    pub username: StringOrSecret,
1706    /// The ID of the secret containing the password, if any.
1707    pub password: Option<CatalogItemId>,
1708}
1709
1710/// A connection to a PostgreSQL server.
1711#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1712pub struct PostgresConnection<C: ConnectionAccess = InlinedConnection> {
1713    /// The hostname of the server.
1714    pub host: String,
1715    /// The port of the server.
1716    pub port: u16,
1717    /// The name of the database to connect to.
1718    pub database: String,
1719    /// The username to authenticate as.
1720    pub user: StringOrSecret,
1721    /// An optional password for authentication.
1722    pub password: Option<CatalogItemId>,
1723    /// A tunnel through which to route traffic.
1724    pub tunnel: Tunnel<C>,
1725    /// Whether to use TLS for encryption, authentication, or both.
1726    pub tls_mode: SslMode,
1727    /// An optional root TLS certificate in PEM format, to verify the server's
1728    /// identity.
1729    pub tls_root_cert: Option<StringOrSecret>,
1730    /// An optional TLS client certificate for authentication.
1731    pub tls_identity: Option<TlsIdentity>,
1732}
1733
1734impl<R: ConnectionResolver> IntoInlineConnection<PostgresConnection, R>
1735    for PostgresConnection<ReferencedConnection>
1736{
1737    fn into_inline_connection(self, r: R) -> PostgresConnection {
1738        let PostgresConnection {
1739            host,
1740            port,
1741            database,
1742            user,
1743            password,
1744            tunnel,
1745            tls_mode,
1746            tls_root_cert,
1747            tls_identity,
1748        } = self;
1749
1750        PostgresConnection {
1751            host,
1752            port,
1753            database,
1754            user,
1755            password,
1756            tunnel: tunnel.into_inline_connection(r),
1757            tls_mode,
1758            tls_root_cert,
1759            tls_identity,
1760        }
1761    }
1762}
1763
1764impl<C: ConnectionAccess> PostgresConnection<C> {
1765    fn validate_by_default(&self) -> bool {
1766        true
1767    }
1768}
1769
1770impl PostgresConnection<InlinedConnection> {
1771    pub async fn config(
1772        &self,
1773        secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
1774        storage_configuration: &StorageConfiguration,
1775        in_task: InTask,
1776    ) -> Result<mz_postgres_util::Config, anyhow::Error> {
1777        let params = &storage_configuration.parameters;
1778
1779        let mut config = tokio_postgres::Config::new();
1780        config
1781            .host(&self.host)
1782            .port(self.port)
1783            .dbname(&self.database)
1784            .user(&self.user.get_string(in_task, secrets_reader).await?)
1785            .ssl_mode(self.tls_mode);
1786        if let Some(password) = self.password {
1787            let password = secrets_reader
1788                .read_string_in_task_if(in_task, password)
1789                .await?;
1790            config.password(password);
1791        }
1792        if let Some(tls_root_cert) = &self.tls_root_cert {
1793            let tls_root_cert = tls_root_cert.get_string(in_task, secrets_reader).await?;
1794            config.ssl_root_cert(tls_root_cert.as_bytes());
1795        }
1796        if let Some(tls_identity) = &self.tls_identity {
1797            let cert = tls_identity
1798                .cert
1799                .get_string(in_task, secrets_reader)
1800                .await?;
1801            let key = secrets_reader
1802                .read_string_in_task_if(in_task, tls_identity.key)
1803                .await?;
1804            config.ssl_cert(cert.as_bytes()).ssl_key(key.as_bytes());
1805        }
1806
1807        if let Some(connect_timeout) = params.pg_source_connect_timeout {
1808            config.connect_timeout(connect_timeout);
1809        }
1810        if let Some(keepalives_retries) = params.pg_source_tcp_keepalives_retries {
1811            config.keepalives_retries(keepalives_retries);
1812        }
1813        if let Some(keepalives_idle) = params.pg_source_tcp_keepalives_idle {
1814            config.keepalives_idle(keepalives_idle);
1815        }
1816        if let Some(keepalives_interval) = params.pg_source_tcp_keepalives_interval {
1817            config.keepalives_interval(keepalives_interval);
1818        }
1819        if let Some(tcp_user_timeout) = params.pg_source_tcp_user_timeout {
1820            config.tcp_user_timeout(tcp_user_timeout);
1821        }
1822
1823        let mut options = vec![];
1824        if let Some(wal_sender_timeout) = params.pg_source_wal_sender_timeout {
1825            options.push(format!(
1826                "--wal_sender_timeout={}",
1827                wal_sender_timeout.as_millis()
1828            ));
1829        };
1830        if params.pg_source_tcp_configure_server {
1831            if let Some(keepalives_retries) = params.pg_source_tcp_keepalives_retries {
1832                options.push(format!("--tcp_keepalives_count={}", keepalives_retries));
1833            }
1834            if let Some(keepalives_idle) = params.pg_source_tcp_keepalives_idle {
1835                options.push(format!(
1836                    "--tcp_keepalives_idle={}",
1837                    keepalives_idle.as_secs()
1838                ));
1839            }
1840            if let Some(keepalives_interval) = params.pg_source_tcp_keepalives_interval {
1841                options.push(format!(
1842                    "--tcp_keepalives_interval={}",
1843                    keepalives_interval.as_secs()
1844                ));
1845            }
1846            if let Some(tcp_user_timeout) = params.pg_source_tcp_user_timeout {
1847                options.push(format!(
1848                    "--tcp_user_timeout={}",
1849                    tcp_user_timeout.as_millis()
1850                ));
1851            }
1852        }
1853        config.options(options.join(" ").as_str());
1854
1855        let tunnel = match &self.tunnel {
1856            Tunnel::Direct => {
1857                // Ensure any host we connect to is resolved to an external address.
1858                let resolved = resolve_address(
1859                    &self.host,
1860                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1861                )
1862                .await?;
1863                mz_postgres_util::TunnelConfig::Direct {
1864                    resolved_ips: Some(resolved),
1865                }
1866            }
1867            Tunnel::Ssh(SshTunnel {
1868                connection_id,
1869                connection,
1870            }) => {
1871                let secret = secrets_reader
1872                    .read_in_task_if(in_task, *connection_id)
1873                    .await?;
1874                let key_pair = SshKeyPair::from_bytes(&secret)?;
1875                // Ensure any ssh-bastion host we connect to is resolved to an external address.
1876                let resolved = resolve_address(
1877                    &connection.host,
1878                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1879                )
1880                .await?;
1881                mz_postgres_util::TunnelConfig::Ssh {
1882                    config: SshTunnelConfig {
1883                        host: resolved
1884                            .iter()
1885                            .map(|a| a.to_string())
1886                            .collect::<BTreeSet<_>>(),
1887                        port: connection.port,
1888                        user: connection.user.clone(),
1889                        key_pair,
1890                    },
1891                }
1892            }
1893            Tunnel::AwsPrivatelink(connection) => {
1894                assert_none!(connection.port);
1895                mz_postgres_util::TunnelConfig::AwsPrivatelink {
1896                    connection_id: connection.connection_id,
1897                }
1898            }
1899            Tunnel::AwsPrivatelinks(_) => {
1900                unreachable!("MATCHING broker rules are only available for Kafka connections.");
1901            }
1902        };
1903
1904        Ok(mz_postgres_util::Config::new(
1905            config,
1906            tunnel,
1907            params.ssh_timeout_config,
1908            in_task,
1909        )?)
1910    }
1911
1912    pub async fn validate(
1913        &self,
1914        _id: CatalogItemId,
1915        storage_configuration: &StorageConfiguration,
1916    ) -> Result<mz_postgres_util::Client, anyhow::Error> {
1917        let config = self
1918            .config(
1919                &storage_configuration.connection_context.secrets_reader,
1920                storage_configuration,
1921                // We are in a normal tokio context during validation, already.
1922                InTask::No,
1923            )
1924            .await?;
1925        let client = config
1926            .connect(
1927                "connection validation",
1928                &storage_configuration.connection_context.ssh_tunnel_manager,
1929            )
1930            .await?;
1931
1932        let wal_level = mz_postgres_util::get_wal_level(&client).await?;
1933
1934        if wal_level < mz_postgres_util::replication::WalLevel::Logical {
1935            Err(PostgresConnectionValidationError::InsufficientWalLevel { wal_level })?;
1936        }
1937
1938        let max_wal_senders = mz_postgres_util::get_max_wal_senders(&client).await?;
1939
1940        if max_wal_senders < 1 {
1941            Err(PostgresConnectionValidationError::ReplicationDisabled)?;
1942        }
1943
1944        let available_replication_slots =
1945            mz_postgres_util::available_replication_slots(&client).await?;
1946
1947        // We need 1 replication slot for the snapshots and 1 for the continuing replication
1948        if available_replication_slots < 2 {
1949            Err(
1950                PostgresConnectionValidationError::InsufficientReplicationSlotsAvailable {
1951                    count: 2,
1952                },
1953            )?;
1954        }
1955
1956        Ok(client)
1957    }
1958}
1959
1960#[derive(Debug, Clone, thiserror::Error)]
1961pub enum PostgresConnectionValidationError {
1962    #[error("PostgreSQL server has insufficient number of replication slots available")]
1963    InsufficientReplicationSlotsAvailable { count: usize },
1964    #[error("server must have wal_level >= logical, but has {wal_level}")]
1965    InsufficientWalLevel {
1966        wal_level: mz_postgres_util::replication::WalLevel,
1967    },
1968    #[error("replication disabled on server")]
1969    ReplicationDisabled,
1970}
1971
1972impl PostgresConnectionValidationError {
1973    pub fn detail(&self) -> Option<String> {
1974        match self {
1975            Self::InsufficientReplicationSlotsAvailable { count } => Some(format!(
1976                "executing this statement requires {} replication slot{}",
1977                count,
1978                if *count == 1 { "" } else { "s" }
1979            )),
1980            _ => None,
1981        }
1982    }
1983
1984    pub fn hint(&self) -> Option<String> {
1985        match self {
1986            Self::InsufficientReplicationSlotsAvailable { .. } => Some(
1987                "you might be able to wait for other sources to finish snapshotting and try again"
1988                    .into(),
1989            ),
1990            Self::ReplicationDisabled => Some("set max_wal_senders to a value > 0".into()),
1991            Self::InsufficientWalLevel { .. } => None,
1992        }
1993    }
1994}
1995
1996impl<C: ConnectionAccess> AlterCompatible for PostgresConnection<C> {
1997    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1998        let PostgresConnection {
1999            tunnel,
2000            // All non-tunnel options may change arbitrarily
2001            host: _,
2002            port: _,
2003            database: _,
2004            user: _,
2005            password: _,
2006            tls_mode: _,
2007            tls_root_cert: _,
2008            tls_identity: _,
2009        } = self;
2010
2011        let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
2012
2013        for (compatible, field) in compatibility_checks {
2014            if !compatible {
2015                tracing::warn!(
2016                    "PostgresConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2017                    self,
2018                    other
2019                );
2020
2021                return Err(AlterError { id });
2022            }
2023        }
2024        Ok(())
2025    }
2026}
2027
2028/// Specifies how to tunnel a connection.
2029#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2030pub enum Tunnel<C: ConnectionAccess = InlinedConnection> {
2031    /// No tunneling.
2032    Direct,
2033    /// Via the specified SSH tunnel connection.
2034    Ssh(SshTunnel<C>),
2035    /// Via the specified AWS PrivateLink connection.
2036    AwsPrivatelink(AwsPrivatelink),
2037    AwsPrivatelinks(AwsPrivatelinks),
2038}
2039
2040impl<R: ConnectionResolver> IntoInlineConnection<Tunnel, R> for Tunnel<ReferencedConnection> {
2041    fn into_inline_connection(self, r: R) -> Tunnel {
2042        match self {
2043            Tunnel::Direct => Tunnel::Direct,
2044            Tunnel::Ssh(ssh) => Tunnel::Ssh(ssh.into_inline_connection(r)),
2045            Tunnel::AwsPrivatelink(awspl) => Tunnel::AwsPrivatelink(awspl),
2046            Tunnel::AwsPrivatelinks(x) => Tunnel::AwsPrivatelinks(x),
2047        }
2048    }
2049}
2050
2051impl<C: ConnectionAccess> AlterCompatible for Tunnel<C> {
2052    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2053        let compatible = match (self, other) {
2054            (Self::Ssh(s), Self::Ssh(o)) => s.alter_compatible(id, o).is_ok(),
2055            (s, o) => s == o,
2056        };
2057
2058        if !compatible {
2059            tracing::warn!(
2060                "Tunnel incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
2061                self,
2062                other
2063            );
2064
2065            return Err(AlterError { id });
2066        }
2067
2068        Ok(())
2069    }
2070}
2071
2072/// Specifies which MySQL SSL Mode to use:
2073/// <https://dev.mysql.com/doc/refman/8.0/en/connection-options.html#option_general_ssl-mode>
2074/// This is not available as an enum in the mysql-async crate, so we define our own.
2075#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2076pub enum MySqlSslMode {
2077    Disabled,
2078    Required,
2079    VerifyCa,
2080    VerifyIdentity,
2081}
2082
2083/// A connection to a MySQL server.
2084#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2085pub struct MySqlConnection<C: ConnectionAccess = InlinedConnection> {
2086    /// The hostname of the server.
2087    pub host: String,
2088    /// The port of the server.
2089    pub port: u16,
2090    /// The username to authenticate as.
2091    pub user: StringOrSecret,
2092    /// An optional password for authentication.
2093    pub password: Option<CatalogItemId>,
2094    /// A tunnel through which to route traffic.
2095    pub tunnel: Tunnel<C>,
2096    /// Whether to use TLS for encryption, verify the server's certificate, and identity.
2097    pub tls_mode: MySqlSslMode,
2098    /// An optional root TLS certificate in PEM format, to verify the server's
2099    /// identity.
2100    pub tls_root_cert: Option<StringOrSecret>,
2101    /// An optional TLS client certificate for authentication.
2102    pub tls_identity: Option<TlsIdentity>,
2103    /// Reference to the AWS connection information to be used for IAM authenitcation and
2104    /// assuming AWS roles.
2105    pub aws_connection: Option<AwsConnectionReference<C>>,
2106}
2107
2108impl<R: ConnectionResolver> IntoInlineConnection<MySqlConnection, R>
2109    for MySqlConnection<ReferencedConnection>
2110{
2111    fn into_inline_connection(self, r: R) -> MySqlConnection {
2112        let MySqlConnection {
2113            host,
2114            port,
2115            user,
2116            password,
2117            tunnel,
2118            tls_mode,
2119            tls_root_cert,
2120            tls_identity,
2121            aws_connection,
2122        } = self;
2123
2124        MySqlConnection {
2125            host,
2126            port,
2127            user,
2128            password,
2129            tunnel: tunnel.into_inline_connection(&r),
2130            tls_mode,
2131            tls_root_cert,
2132            tls_identity,
2133            aws_connection: aws_connection.map(|aws| aws.into_inline_connection(&r)),
2134        }
2135    }
2136}
2137
2138impl<C: ConnectionAccess> MySqlConnection<C> {
2139    fn validate_by_default(&self) -> bool {
2140        true
2141    }
2142}
2143
2144impl MySqlConnection<InlinedConnection> {
2145    pub async fn config(
2146        &self,
2147        secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
2148        storage_configuration: &StorageConfiguration,
2149        in_task: InTask,
2150    ) -> Result<mz_mysql_util::Config, anyhow::Error> {
2151        // TODO(roshan): Set appropriate connection timeouts
2152        let mut opts = mysql_async::OptsBuilder::default()
2153            .ip_or_hostname(&self.host)
2154            .tcp_port(self.port)
2155            .user(Some(&self.user.get_string(in_task, secrets_reader).await?));
2156
2157        if let Some(password) = self.password {
2158            let password = secrets_reader
2159                .read_string_in_task_if(in_task, password)
2160                .await?;
2161            opts = opts.pass(Some(password));
2162        }
2163
2164        // Our `MySqlSslMode` enum matches the official MySQL Client `--ssl-mode` parameter values
2165        // which uses opt-in security features (SSL, CA verification, & Identity verification).
2166        // The mysql_async crate `SslOpts` struct uses an opt-out mechanism for each of these, so
2167        // we need to appropriately disable features to match the intent of each enum value.
2168        let mut ssl_opts = match self.tls_mode {
2169            MySqlSslMode::Disabled => None,
2170            MySqlSslMode::Required => Some(
2171                mysql_async::SslOpts::default()
2172                    .with_danger_accept_invalid_certs(true)
2173                    .with_danger_skip_domain_validation(true),
2174            ),
2175            MySqlSslMode::VerifyCa => {
2176                Some(mysql_async::SslOpts::default().with_danger_skip_domain_validation(true))
2177            }
2178            MySqlSslMode::VerifyIdentity => Some(mysql_async::SslOpts::default()),
2179        };
2180
2181        if matches!(
2182            self.tls_mode,
2183            MySqlSslMode::VerifyCa | MySqlSslMode::VerifyIdentity
2184        ) {
2185            if let Some(tls_root_cert) = &self.tls_root_cert {
2186                let tls_root_cert = tls_root_cert.get_string(in_task, secrets_reader).await?;
2187                ssl_opts = ssl_opts.map(|opts| {
2188                    opts.with_root_certs(vec![tls_root_cert.as_bytes().to_vec().into()])
2189                });
2190            }
2191        }
2192
2193        if let Some(identity) = &self.tls_identity {
2194            let key = secrets_reader
2195                .read_string_in_task_if(in_task, identity.key)
2196                .await?;
2197            let cert = identity.cert.get_string(in_task, secrets_reader).await?;
2198            let (der, pass) =
2199                mz_tls_util::pkcs12der_from_pem(key.as_bytes(), cert.as_bytes())?.into_parts();
2200
2201            // Add client identity to SSLOpts
2202            ssl_opts = ssl_opts.map(|opts| {
2203                opts.with_client_identity(Some(
2204                    mysql_async::ClientIdentity::new(der.into()).with_password(pass),
2205                ))
2206            });
2207        }
2208
2209        opts = opts.ssl_opts(ssl_opts);
2210
2211        let tunnel = match &self.tunnel {
2212            Tunnel::Direct => {
2213                // Ensure any host we connect to is resolved to an external address.
2214                let resolved = resolve_address(
2215                    &self.host,
2216                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2217                )
2218                .await?;
2219                mz_mysql_util::TunnelConfig::Direct {
2220                    resolved_ips: Some(resolved),
2221                }
2222            }
2223            Tunnel::Ssh(SshTunnel {
2224                connection_id,
2225                connection,
2226            }) => {
2227                let secret = secrets_reader
2228                    .read_in_task_if(in_task, *connection_id)
2229                    .await?;
2230                let key_pair = SshKeyPair::from_bytes(&secret)?;
2231                // Ensure any ssh-bastion host we connect to is resolved to an external address.
2232                let resolved = resolve_address(
2233                    &connection.host,
2234                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2235                )
2236                .await?;
2237                mz_mysql_util::TunnelConfig::Ssh {
2238                    config: SshTunnelConfig {
2239                        host: resolved
2240                            .iter()
2241                            .map(|a| a.to_string())
2242                            .collect::<BTreeSet<_>>(),
2243                        port: connection.port,
2244                        user: connection.user.clone(),
2245                        key_pair,
2246                    },
2247                }
2248            }
2249            Tunnel::AwsPrivatelink(connection) => {
2250                assert_none!(connection.port);
2251                mz_mysql_util::TunnelConfig::AwsPrivatelink {
2252                    connection_id: connection.connection_id,
2253                }
2254            }
2255            Tunnel::AwsPrivatelinks(_) => {
2256                unreachable!("MATCHING broker rules are only available for Kafka connections.");
2257            }
2258        };
2259
2260        let aws_config = match self.aws_connection.as_ref() {
2261            None => None,
2262            Some(aws_ref) => Some(
2263                aws_ref
2264                    .connection
2265                    .load_sdk_config(
2266                        &storage_configuration.connection_context,
2267                        aws_ref.connection_id,
2268                        in_task,
2269                        ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2270                    )
2271                    .await?,
2272            ),
2273        };
2274
2275        Ok(mz_mysql_util::Config::new(
2276            opts,
2277            tunnel,
2278            storage_configuration.parameters.ssh_timeout_config,
2279            in_task,
2280            storage_configuration
2281                .parameters
2282                .mysql_source_timeouts
2283                .clone(),
2284            aws_config,
2285        )?)
2286    }
2287
2288    pub async fn validate(
2289        &self,
2290        _id: CatalogItemId,
2291        storage_configuration: &StorageConfiguration,
2292    ) -> Result<MySqlConn, MySqlConnectionValidationError> {
2293        let config = self
2294            .config(
2295                &storage_configuration.connection_context.secrets_reader,
2296                storage_configuration,
2297                // We are in a normal tokio context during validation, already.
2298                InTask::No,
2299            )
2300            .await?;
2301        let mut conn = config
2302            .connect(
2303                "connection validation",
2304                &storage_configuration.connection_context.ssh_tunnel_manager,
2305            )
2306            .await?;
2307
2308        // Check if the MySQL database is configured to allow row-based consistent GTID replication
2309        let mut setting_errors = vec![];
2310        let gtid_res = mz_mysql_util::ensure_gtid_consistency(&mut conn).await;
2311        let binlog_res = mz_mysql_util::ensure_full_row_binlog_format(&mut conn).await;
2312        let order_res = mz_mysql_util::ensure_replication_commit_order(&mut conn).await;
2313        for res in [gtid_res, binlog_res, order_res] {
2314            match res {
2315                Err(MySqlError::InvalidSystemSetting {
2316                    setting,
2317                    expected,
2318                    actual,
2319                }) => {
2320                    setting_errors.push((setting, expected, actual));
2321                }
2322                Err(err) => Err(err)?,
2323                Ok(()) => {}
2324            }
2325        }
2326        if !setting_errors.is_empty() {
2327            Err(MySqlConnectionValidationError::ReplicationSettingsError(
2328                setting_errors,
2329            ))?;
2330        }
2331
2332        Ok(conn)
2333    }
2334}
2335
2336#[derive(Debug, thiserror::Error)]
2337pub enum MySqlConnectionValidationError {
2338    #[error("Invalid MySQL system replication settings")]
2339    ReplicationSettingsError(Vec<(String, String, String)>),
2340    #[error(transparent)]
2341    Client(#[from] MySqlError),
2342    #[error("{}", .0.display_with_causes())]
2343    Other(#[from] anyhow::Error),
2344}
2345
2346impl MySqlConnectionValidationError {
2347    pub fn detail(&self) -> Option<String> {
2348        match self {
2349            Self::ReplicationSettingsError(settings) => Some(format!(
2350                "Invalid MySQL system replication settings: {}",
2351                itertools::join(
2352                    settings.iter().map(|(setting, expected, actual)| format!(
2353                        "{}: expected {}, got {}",
2354                        setting, expected, actual
2355                    )),
2356                    "; "
2357                )
2358            )),
2359            _ => None,
2360        }
2361    }
2362
2363    pub fn hint(&self) -> Option<String> {
2364        match self {
2365            Self::ReplicationSettingsError(_) => {
2366                Some("Set the necessary MySQL database system settings.".into())
2367            }
2368            _ => None,
2369        }
2370    }
2371}
2372
2373impl<C: ConnectionAccess> AlterCompatible for MySqlConnection<C> {
2374    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2375        let MySqlConnection {
2376            tunnel,
2377            // All non-tunnel options may change arbitrarily
2378            host: _,
2379            port: _,
2380            user: _,
2381            password: _,
2382            tls_mode: _,
2383            tls_root_cert: _,
2384            tls_identity: _,
2385            aws_connection: _,
2386        } = self;
2387
2388        let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
2389
2390        for (compatible, field) in compatibility_checks {
2391            if !compatible {
2392                tracing::warn!(
2393                    "MySqlConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2394                    self,
2395                    other
2396                );
2397
2398                return Err(AlterError { id });
2399            }
2400        }
2401        Ok(())
2402    }
2403}
2404
2405/// Details how to connect to an instance of Microsoft SQL Server.
2406///
2407/// For specifics of connecting to SQL Server for purposes of creating a
2408/// Materialize Source, see [`SqlServerSourceConnection`] which wraps this type.
2409///
2410/// [`SqlServerSourceConnection`]: crate::sources::SqlServerSourceConnection
2411#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2412pub struct SqlServerConnectionDetails<C: ConnectionAccess = InlinedConnection> {
2413    /// The hostname of the server.
2414    pub host: String,
2415    /// The port of the server.
2416    pub port: u16,
2417    /// Database we should connect to.
2418    pub database: String,
2419    /// The username to authenticate as.
2420    pub user: StringOrSecret,
2421    /// Password used for authentication.
2422    pub password: CatalogItemId,
2423    /// A tunnel through which to route traffic.
2424    pub tunnel: Tunnel<C>,
2425    /// Level of encryption to use for the connection.
2426    pub encryption: mz_sql_server_util::config::EncryptionLevel,
2427    /// Certificate validation policy
2428    pub certificate_validation_policy: mz_sql_server_util::config::CertificateValidationPolicy,
2429    /// TLS CA Certifiecate in PEM format
2430    pub tls_root_cert: Option<StringOrSecret>,
2431}
2432
2433impl<C: ConnectionAccess> SqlServerConnectionDetails<C> {
2434    fn validate_by_default(&self) -> bool {
2435        true
2436    }
2437}
2438
2439impl SqlServerConnectionDetails<InlinedConnection> {
2440    /// Attempts to open a connection to the upstream SQL Server instance.
2441    pub async fn validate(
2442        &self,
2443        _id: CatalogItemId,
2444        storage_configuration: &StorageConfiguration,
2445    ) -> Result<mz_sql_server_util::Client, anyhow::Error> {
2446        let config = self
2447            .resolve_config(
2448                &storage_configuration.connection_context.secrets_reader,
2449                storage_configuration,
2450                InTask::No,
2451            )
2452            .await?;
2453        tracing::debug!(?config, "Validating SQL Server connection");
2454
2455        let mut client = mz_sql_server_util::Client::connect(config).await?;
2456
2457        // Ensure the upstream SQL Server instance is configured to allow CDC.
2458        //
2459        // Run all of the checks necessary and collect the errors to provide the best
2460        // guidance as to which system settings need to be enabled.
2461        let mut replication_errors = vec![];
2462        for error in [
2463            mz_sql_server_util::inspect::ensure_database_cdc_enabled(&mut client).await,
2464            mz_sql_server_util::inspect::ensure_snapshot_isolation_enabled(&mut client).await,
2465            mz_sql_server_util::inspect::ensure_sql_server_agent_running(&mut client).await,
2466        ] {
2467            match error {
2468                Err(mz_sql_server_util::SqlServerError::InvalidSystemSetting {
2469                    name,
2470                    expected,
2471                    actual,
2472                }) => replication_errors.push((name, expected, actual)),
2473                Err(other) => Err(other)?,
2474                Ok(()) => (),
2475            }
2476        }
2477        if !replication_errors.is_empty() {
2478            Err(SqlServerConnectionValidationError::ReplicationSettingsError(replication_errors))?;
2479        }
2480
2481        Ok(client)
2482    }
2483
2484    /// Resolve all of the connection details (e.g. read from the [`SecretsReader`])
2485    /// so the returned [`Config`] can be used to open a connection with the
2486    /// upstream system.
2487    ///
2488    /// The provided [`InTask`] argument determines whether any I/O is run in an
2489    /// [`mz_ore::task`] (i.e. a different thread) or directly in the returned
2490    /// future. The main goal here is to prevent running I/O in timely threads.
2491    ///
2492    /// [`Config`]: mz_sql_server_util::Config
2493    pub async fn resolve_config(
2494        &self,
2495        secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
2496        storage_configuration: &StorageConfiguration,
2497        in_task: InTask,
2498    ) -> Result<mz_sql_server_util::Config, anyhow::Error> {
2499        let dyncfg = storage_configuration.config_set();
2500        let mut inner_config = tiberius::Config::new();
2501
2502        // Setup default connection params.
2503        inner_config.host(&self.host);
2504        inner_config.port(self.port);
2505        inner_config.database(self.database.clone());
2506        inner_config.encryption(self.encryption.into());
2507        match self.certificate_validation_policy {
2508            mz_sql_server_util::config::CertificateValidationPolicy::TrustAll => {
2509                inner_config.trust_cert()
2510            }
2511            mz_sql_server_util::config::CertificateValidationPolicy::VerifyCA => {
2512                inner_config.trust_cert_ca_pem(
2513                    self.tls_root_cert
2514                        .as_ref()
2515                        .unwrap()
2516                        .get_string(in_task, secrets_reader)
2517                        .await
2518                        .context("ca certificate")?,
2519                );
2520            }
2521            mz_sql_server_util::config::CertificateValidationPolicy::VerifySystem => (), // no-op
2522        }
2523
2524        inner_config.application_name("materialize");
2525
2526        // Read our auth settings from
2527        let user = self
2528            .user
2529            .get_string(in_task, secrets_reader)
2530            .await
2531            .context("username")?;
2532        let password = secrets_reader
2533            .read_string_in_task_if(in_task, self.password)
2534            .await
2535            .context("password")?;
2536        // TODO(sql_server3): Support other methods of authentication besides
2537        // username and password.
2538        inner_config.authentication(tiberius::AuthMethod::sql_server(user, password));
2539
2540        // Prevent users from probing our internal network ports by trying to
2541        // connect to localhost, or another non-external IP.
2542        let enforce_external_addresses = ENFORCE_EXTERNAL_ADDRESSES.get(dyncfg);
2543
2544        let tunnel = match &self.tunnel {
2545            Tunnel::Direct => {
2546                let resolved_addresses: Vec<SocketAddr> =
2547                    resolve_address(&self.host, enforce_external_addresses)
2548                        .await?
2549                        .into_iter()
2550                        .map(|ip| SocketAddr::new(ip, self.port))
2551                        .collect();
2552                mz_sql_server_util::config::TunnelConfig::Direct {
2553                    resolved_addresses: resolved_addresses.into_boxed_slice(),
2554                }
2555            }
2556            Tunnel::Ssh(SshTunnel {
2557                connection_id,
2558                connection: ssh_connection,
2559            }) => {
2560                let secret = secrets_reader
2561                    .read_in_task_if(in_task, *connection_id)
2562                    .await
2563                    .context("ssh secret")?;
2564                let key_pair = SshKeyPair::from_bytes(&secret).context("ssh key pair")?;
2565                // Ensure any SSH-bastion host we connect to is resolved to an
2566                // external address.
2567                let addresses = resolve_address(&ssh_connection.host, enforce_external_addresses)
2568                    .await
2569                    .context("ssh tunnel")?;
2570
2571                let config = SshTunnelConfig {
2572                    host: addresses.into_iter().map(|a| a.to_string()).collect(),
2573                    port: ssh_connection.port,
2574                    user: ssh_connection.user.clone(),
2575                    key_pair,
2576                };
2577                mz_sql_server_util::config::TunnelConfig::Ssh {
2578                    config,
2579                    manager: storage_configuration
2580                        .connection_context
2581                        .ssh_tunnel_manager
2582                        .clone(),
2583                    timeout: storage_configuration.parameters.ssh_timeout_config.clone(),
2584                    host: self.host.clone(),
2585                    port: self.port,
2586                }
2587            }
2588            Tunnel::AwsPrivatelink(private_link_connection) => {
2589                assert_none!(private_link_connection.port);
2590                mz_sql_server_util::config::TunnelConfig::AwsPrivatelink {
2591                    connection_id: private_link_connection.connection_id,
2592                    port: self.port,
2593                }
2594            }
2595            Tunnel::AwsPrivatelinks(_) => {
2596                unreachable!("MATCHING broker rules are only available for Kafka connections.");
2597            }
2598        };
2599
2600        Ok(mz_sql_server_util::Config::new(
2601            inner_config,
2602            tunnel,
2603            in_task,
2604        ))
2605    }
2606}
2607
2608#[derive(Debug, Clone, thiserror::Error)]
2609pub enum SqlServerConnectionValidationError {
2610    #[error("Invalid SQL Server system replication settings")]
2611    ReplicationSettingsError(Vec<(String, String, String)>),
2612}
2613
2614impl SqlServerConnectionValidationError {
2615    pub fn detail(&self) -> Option<String> {
2616        match self {
2617            Self::ReplicationSettingsError(settings) => Some(format!(
2618                "Invalid SQL Server system replication settings: {}",
2619                itertools::join(
2620                    settings.iter().map(|(setting, expected, actual)| format!(
2621                        "{}: expected {}, got {}",
2622                        setting, expected, actual
2623                    )),
2624                    "; "
2625                )
2626            )),
2627        }
2628    }
2629
2630    pub fn hint(&self) -> Option<String> {
2631        match self {
2632            _ => None,
2633        }
2634    }
2635}
2636
2637impl<R: ConnectionResolver> IntoInlineConnection<SqlServerConnectionDetails, R>
2638    for SqlServerConnectionDetails<ReferencedConnection>
2639{
2640    fn into_inline_connection(self, r: R) -> SqlServerConnectionDetails {
2641        let SqlServerConnectionDetails {
2642            host,
2643            port,
2644            database,
2645            user,
2646            password,
2647            tunnel,
2648            encryption,
2649            certificate_validation_policy,
2650            tls_root_cert,
2651        } = self;
2652
2653        SqlServerConnectionDetails {
2654            host,
2655            port,
2656            database,
2657            user,
2658            password,
2659            tunnel: tunnel.into_inline_connection(&r),
2660            encryption,
2661            certificate_validation_policy,
2662            tls_root_cert,
2663        }
2664    }
2665}
2666
2667impl<C: ConnectionAccess> AlterCompatible for SqlServerConnectionDetails<C> {
2668    fn alter_compatible(
2669        &self,
2670        id: mz_repr::GlobalId,
2671        other: &Self,
2672    ) -> Result<(), crate::controller::AlterError> {
2673        let SqlServerConnectionDetails {
2674            tunnel,
2675            // TODO(sql_server2): Figure out how these variables are allowed to change.
2676            host: _,
2677            port: _,
2678            database: _,
2679            user: _,
2680            password: _,
2681            encryption: _,
2682            certificate_validation_policy: _,
2683            tls_root_cert: _,
2684        } = self;
2685
2686        let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
2687
2688        for (compatible, field) in compatibility_checks {
2689            if !compatible {
2690                tracing::warn!(
2691                    "SqlServerConnectionDetails incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2692                    self,
2693                    other
2694                );
2695
2696                return Err(AlterError { id });
2697            }
2698        }
2699        Ok(())
2700    }
2701}
2702
2703/// A connection to an SSH tunnel.
2704#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2705pub struct SshConnection {
2706    pub host: String,
2707    pub port: u16,
2708    pub user: String,
2709}
2710
2711use self::inline::{
2712    ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
2713    ReferencedConnection,
2714};
2715
2716impl AlterCompatible for SshConnection {
2717    fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
2718        // Every element of the SSH connection is configurable.
2719        Ok(())
2720    }
2721}
2722
2723/// Specifies an AWS PrivateLink service for a [`Tunnel`].
2724#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2725pub struct AwsPrivatelink {
2726    /// The ID of the connection to the AWS PrivateLink service.
2727    pub connection_id: CatalogItemId,
2728    // The availability zone to use when connecting to the AWS PrivateLink service.
2729    pub availability_zone: Option<String>,
2730    /// The port to use when connecting to the AWS PrivateLink service, if
2731    /// different from the port in [`KafkaBroker::address`].
2732    pub port: Option<u16>,
2733}
2734
2735impl AlterCompatible for AwsPrivatelink {
2736    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2737        let AwsPrivatelink {
2738            connection_id,
2739            availability_zone: _,
2740            port: _,
2741        } = self;
2742
2743        let compatibility_checks = [(connection_id == &other.connection_id, "connection_id")];
2744
2745        for (compatible, field) in compatibility_checks {
2746            if !compatible {
2747                tracing::warn!(
2748                    "AwsPrivatelink incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2749                    self,
2750                    other
2751                );
2752
2753                return Err(AlterError { id });
2754            }
2755        }
2756
2757        Ok(())
2758    }
2759}
2760
2761#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2762pub struct AwsPrivatelinks {
2763    /// Route to brokers through PrivateLink connections according to these rules.
2764    /// Exact-match rules (no wildcards) are used as bootstrap brokers.
2765    /// Wildcard rules are applied dynamically to discovered brokers.
2766    pub rules: Vec<AwsPrivatelinkRule>,
2767}
2768
2769#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2770pub struct AwsPrivatelinkRule {
2771    /// Given a broker's host:port, should we use this route?
2772    pub pattern: ConnectionRulePattern,
2773    /// Route to the broker through this PrivateLink connection.
2774    pub to: AwsPrivatelink,
2775}
2776
2777/// Specifies an SSH tunnel connection.
2778#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2779pub struct SshTunnel<C: ConnectionAccess = InlinedConnection> {
2780    /// id of the ssh connection
2781    pub connection_id: CatalogItemId,
2782    /// ssh connection object
2783    pub connection: C::Ssh,
2784}
2785
2786impl<R: ConnectionResolver> IntoInlineConnection<SshTunnel, R> for SshTunnel<ReferencedConnection> {
2787    fn into_inline_connection(self, r: R) -> SshTunnel {
2788        let SshTunnel {
2789            connection,
2790            connection_id,
2791        } = self;
2792
2793        SshTunnel {
2794            connection: r.resolve_connection(connection).unwrap_ssh(),
2795            connection_id,
2796        }
2797    }
2798}
2799
2800impl SshTunnel<InlinedConnection> {
2801    /// Like [`SshTunnelConfig::connect`], but the SSH key is loaded from a
2802    /// secret.
2803    async fn connect(
2804        &self,
2805        storage_configuration: &StorageConfiguration,
2806        remote_host: &str,
2807        remote_port: u16,
2808        in_task: InTask,
2809    ) -> Result<ManagedSshTunnelHandle, anyhow::Error> {
2810        // Ensure any ssh-bastion host we connect to is resolved to an external address.
2811        let resolved = resolve_address(
2812            &self.connection.host,
2813            ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2814        )
2815        .await?;
2816        storage_configuration
2817            .connection_context
2818            .ssh_tunnel_manager
2819            .connect(
2820                SshTunnelConfig {
2821                    host: resolved
2822                        .iter()
2823                        .map(|a| a.to_string())
2824                        .collect::<BTreeSet<_>>(),
2825                    port: self.connection.port,
2826                    user: self.connection.user.clone(),
2827                    key_pair: SshKeyPair::from_bytes(
2828                        &storage_configuration
2829                            .connection_context
2830                            .secrets_reader
2831                            .read_in_task_if(in_task, self.connection_id)
2832                            .await?,
2833                    )?,
2834                },
2835                remote_host,
2836                remote_port,
2837                storage_configuration.parameters.ssh_timeout_config,
2838                in_task,
2839            )
2840            .await
2841    }
2842}
2843
2844impl<C: ConnectionAccess> AlterCompatible for SshTunnel<C> {
2845    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2846        let SshTunnel {
2847            connection_id,
2848            connection,
2849        } = self;
2850
2851        let compatibility_checks = [
2852            (connection_id == &other.connection_id, "connection_id"),
2853            (
2854                connection.alter_compatible(id, &other.connection).is_ok(),
2855                "connection",
2856            ),
2857        ];
2858
2859        for (compatible, field) in compatibility_checks {
2860            if !compatible {
2861                tracing::warn!(
2862                    "SshTunnel incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2863                    self,
2864                    other
2865                );
2866
2867                return Err(AlterError { id });
2868            }
2869        }
2870
2871        Ok(())
2872    }
2873}
2874
2875impl SshConnection {
2876    #[allow(clippy::unused_async)]
2877    async fn validate(
2878        &self,
2879        id: CatalogItemId,
2880        storage_configuration: &StorageConfiguration,
2881    ) -> Result<(), anyhow::Error> {
2882        let secret = storage_configuration
2883            .connection_context
2884            .secrets_reader
2885            .read_in_task_if(
2886                // We are in a normal tokio context during validation, already.
2887                InTask::No,
2888                id,
2889            )
2890            .await?;
2891        let key_pair = SshKeyPair::from_bytes(&secret)?;
2892
2893        // Ensure any ssh-bastion host we connect to is resolved to an external address.
2894        let resolved = resolve_address(
2895            &self.host,
2896            ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2897        )
2898        .await?;
2899
2900        let config = SshTunnelConfig {
2901            host: resolved
2902                .iter()
2903                .map(|a| a.to_string())
2904                .collect::<BTreeSet<_>>(),
2905            port: self.port,
2906            user: self.user.clone(),
2907            key_pair,
2908        };
2909        // Note that we do NOT use the `SshTunnelManager` here, as we want to validate that we
2910        // can actually create a new connection to the ssh bastion, without tunneling.
2911        config
2912            .validate(storage_configuration.parameters.ssh_timeout_config)
2913            .await
2914    }
2915
2916    fn validate_by_default(&self) -> bool {
2917        false
2918    }
2919}
2920
2921impl AwsPrivatelinkConnection {
2922    #[allow(clippy::unused_async)]
2923    async fn validate(
2924        &self,
2925        id: CatalogItemId,
2926        storage_configuration: &StorageConfiguration,
2927    ) -> Result<(), anyhow::Error> {
2928        let Some(ref cloud_resource_reader) = storage_configuration
2929            .connection_context
2930            .cloud_resource_reader
2931        else {
2932            return Err(anyhow!("AWS PrivateLink connections are unsupported"));
2933        };
2934
2935        // No need to optionally run this in a task, as we are just validating from envd.
2936        let status = cloud_resource_reader.read(id).await?;
2937
2938        let availability = status
2939            .conditions
2940            .as_ref()
2941            .and_then(|conditions| conditions.iter().find(|c| c.type_ == "Available"));
2942
2943        match availability {
2944            Some(condition) if condition.status == "True" => Ok(()),
2945            Some(condition) => Err(anyhow!("{}", condition.message)),
2946            None => Err(anyhow!("Endpoint availability is unknown")),
2947        }
2948    }
2949
2950    fn validate_by_default(&self) -> bool {
2951        false
2952    }
2953}