Skip to main content

mz_storage_types/
connections.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Connection types.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::net::SocketAddr;
15use std::sync::Arc;
16use std::time::SystemTime;
17
18use anyhow::{Context, anyhow};
19use async_trait::async_trait;
20use aws_credential_types::provider::{ProvideCredentials, SharedCredentialsProvider};
21use aws_sigv4::http_request::{SignableBody, SignableRequest, SigningSettings, sign};
22use aws_sigv4::sign::v4;
23// Aliased to avoid colliding with `mz_ccsr::tls::Identity`.
24use aws_smithy_runtime_api::client::identity::Identity as AwsIdentity;
25use base64::Engine;
26use http::{HeaderName, HeaderValue};
27use iceberg::Catalog;
28use iceberg::CatalogBuilder;
29use iceberg::io::{
30    GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, GCS_DISABLE_VM_METADATA, GCS_USER_PROJECT,
31    S3_ACCESS_KEY_ID, S3_DISABLE_EC2_METADATA, S3_REGION, S3_SECRET_ACCESS_KEY,
32};
33use iceberg_catalog_rest::{
34    REST_CATALOG_PROP_URI, REST_CATALOG_PROP_WAREHOUSE, RequestAuthenticator, RestCatalogBuilder,
35};
36use iceberg_storage_opendal::{
37    AwsCredential, AwsCredentialLoad, CustomAwsCredentialLoader, OpenDalStorageFactory,
38};
39use itertools::Itertools;
40use mz_ccsr::tls::{Certificate, Identity};
41use mz_cloud_resources::{AwsExternalIdPrefix, CloudResourceReader, vpc_endpoint_host};
42use mz_dyncfg::ConfigSet;
43use mz_kafka_util::client::{
44    BrokerAddr, BrokerRewrite, HostMappingRules, MzClientContext, MzKafkaError, TunnelConfig,
45    TunnelingClientContext,
46};
47use mz_mysql_util::{MySqlConn, MySqlError};
48use mz_ore::assert_none;
49use mz_ore::error::ErrorExt;
50use mz_ore::future::{InTask, OreFutureExt};
51use mz_ore::netio::resolve_address;
52use mz_ore::num::NonNeg;
53use mz_repr::{CatalogItemId, GlobalId};
54use mz_secrets::SecretsReader;
55use mz_sql_parser::ast::ConnectionRulePattern;
56use mz_ssh_util::keys::SshKeyPair;
57use mz_ssh_util::tunnel::SshTunnelConfig;
58use mz_ssh_util::tunnel_manager::{ManagedSshTunnelHandle, SshTunnelManager};
59use mz_tracing::CloneableEnvFilter;
60use rdkafka::ClientContext;
61use rdkafka::config::FromClientConfigAndContext;
62use rdkafka::consumer::{BaseConsumer, Consumer};
63use regex::Regex;
64use reqwest::Request;
65use serde::{Deserialize, Deserializer, Serialize};
66use tokio::net;
67use tokio::runtime::Handle;
68use tokio_postgres::config::SslMode;
69use tracing::{debug, info, warn};
70use url::Url;
71
72use crate::AlterCompatible;
73use crate::configuration::StorageConfiguration;
74use crate::connections::aws::{
75    AwsAuth, AwsConnection, AwsConnectionReference, AwsConnectionValidationError,
76};
77use crate::connections::gcp::{GcpConnectionReference, GcpTokenProvider};
78use crate::connections::string_or_secret::StringOrSecret;
79use crate::controller::AlterError;
80use crate::dyncfgs::{
81    ENFORCE_EXTERNAL_ADDRESSES, KAFKA_CLIENT_ID_ENRICHMENT_RULES,
82    KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM, KAFKA_RECONNECT_BACKOFF,
83    KAFKA_RECONNECT_BACKOFF_MAX, KAFKA_RETRY_BACKOFF, KAFKA_RETRY_BACKOFF_MAX,
84};
85use crate::errors::{ContextCreationError, CsrConnectError};
86
87pub mod aws;
88pub mod gcp;
89pub mod inline;
90pub mod string_or_secret;
91
92const REST_CATALOG_PROP_SCOPE: &str = "scope";
93const REST_CATALOG_PROP_CREDENTIAL: &str = "credential";
94
95/// A credential loader that wraps an aws-sdk-rust credentials provider for use with
96/// iceberg/OpenDAL. This allows us to provide refreshable credentials from the AWS SDK
97/// credential chain (including the full assume role chain) to OpenDAL's S3 implementation.
98///
99/// We use this instead of OpenDAL's built-in assume role support because Materialize
100/// has a runtime-defined credential chain (ambient → jump role → user role with external ID)
101/// that can't be expressed via OpenDAL's static configuration properties.
102struct AwsSdkCredentialLoader {
103    /// The underlying AWS SDK credentials provider. For assume role auth, this provider
104    /// already handles the full chain: ambient creds -> jump role -> user role.
105    provider: SharedCredentialsProvider,
106}
107
108impl AwsSdkCredentialLoader {
109    fn new(provider: SharedCredentialsProvider) -> Self {
110        Self { provider }
111    }
112}
113
114#[async_trait]
115impl AwsCredentialLoad for AwsSdkCredentialLoader {
116    async fn load_credential(
117        &self,
118        _client: reqwest::Client,
119    ) -> anyhow::Result<Option<AwsCredential>> {
120        let creds = self
121            .provider
122            .provide_credentials()
123            .await
124            .map_err(|e| {
125                warn!(
126                    error = %e.display_with_causes(),
127                    "failed to load AWS credentials for Iceberg FileIO from SDK provider"
128                );
129                e
130            })
131            .context(
132                "failed to load AWS credentials from SDK provider for Iceberg FileIO \
133                 (credential source may be temporarily unavailable)",
134            )?;
135
136        Ok(Some(AwsCredential {
137            access_key_id: creds.access_key_id().to_string(),
138            secret_access_key: creds.secret_access_key().to_string(),
139            session_token: creds.session_token().map(|s| s.to_string()),
140            expires_in: creds.expiry().map(|t| t.into()),
141        }))
142    }
143}
144
145/// Signs each outgoing REST-catalog request with AWS SigV4.
146///
147/// Holds a [`SharedCredentialsProvider`] (not static `Credentials`) so each
148/// request signs with refreshable creds from Materialize's chain
149/// (ambient -> jump role -> user role w/ external ID).
150struct Sigv4Authenticator {
151    provider: SharedCredentialsProvider,
152    region: String,
153    /// The AWS signing name. `"s3tables"` for AWS S3 Tables REST catalog.
154    signing_name: String,
155}
156
157impl std::fmt::Debug for Sigv4Authenticator {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        f.debug_struct("Sigv4Authenticator")
160            .field("region", &self.region)
161            .field("signing_name", &self.signing_name)
162            .finish_non_exhaustive()
163    }
164}
165
166fn sigv4_err(e: impl Into<anyhow::Error>) -> iceberg::Error {
167    iceberg::Error::new(iceberg::ErrorKind::DataInvalid, "AWS SigV4").with_source(e)
168}
169
170#[async_trait]
171impl RequestAuthenticator for Sigv4Authenticator {
172    async fn authenticate_request(&self, req: &mut Request) -> iceberg::Result<()> {
173        let creds = self
174            .provider
175            .provide_credentials()
176            .await
177            .map_err(sigv4_err)?;
178        let identity: AwsIdentity = creds.into();
179        let params = v4::SigningParams::builder()
180            .identity(&identity)
181            .region(&self.region)
182            .name(&self.signing_name)
183            .time(SystemTime::now())
184            .settings(SigningSettings::default())
185            .build()
186            .map_err(sigv4_err)?
187            .into();
188        let body: &[u8] = req
189            .body()
190            .map(|b| match b.as_bytes() {
191                Some(b) => Ok(b),
192                None => Err(iceberg::Error::new(
193                    iceberg::ErrorKind::FeatureUnsupported,
194                    "SigV4 Authenticator cannot sign a streaming request body.",
195                )),
196            })
197            .transpose()?
198            .unwrap_or_default();
199        let headers = req
200            .headers()
201            .iter()
202            .map(|(k, v)| {
203                Ok((
204                    k.as_str(),
205                    v.to_str().map_err(|_| {
206                        iceberg::Error::new(
207                            iceberg::ErrorKind::DataInvalid,
208                            format!("header '{}' value is not all visible ASCII", k),
209                        )
210                    })?,
211                ))
212            })
213            .collect::<iceberg::Result<Vec<(&str, &str)>>>()?;
214        let signable = SignableRequest::new(
215            req.method().as_str(),
216            req.url().as_str(),
217            headers.into_iter(),
218            SignableBody::Bytes(body),
219        )
220        .map_err(sigv4_err)?;
221        let (instructions, _sig) = sign(signable, &params).map_err(sigv4_err)?.into_parts();
222        let (new_headers, new_query) = instructions.into_parts();
223        for header in new_headers {
224            let mut value = HeaderValue::from_str(header.value()).map_err(sigv4_err)?;
225            value.set_sensitive(header.sensitive());
226            req.headers_mut()
227                .insert(HeaderName::from_static(header.name()), value);
228        }
229        if !new_query.is_empty() {
230            let url = req.url_mut();
231            let mut pairs = url.query_pairs_mut();
232            for (name, value) in new_query {
233                pairs.append_pair(name, &value);
234            }
235        }
236        Ok(())
237    }
238
239    // SigV4 is stateless: nothing to cache, invalidate, or refresh.
240    async fn invalidate_cache(&self) -> iceberg::Result<()> {
241        Ok(())
242    }
243    async fn regenerate_cache(&self) -> iceberg::Result<()> {
244        Ok(())
245    }
246}
247
248/// An extension trait for [`SecretsReader`]
249#[async_trait::async_trait]
250trait SecretsReaderExt {
251    /// `SecretsReader::read`, but optionally run in a task.
252    async fn read_in_task_if(
253        &self,
254        in_task: InTask,
255        id: CatalogItemId,
256    ) -> Result<Vec<u8>, anyhow::Error>;
257
258    /// `SecretsReader::read_string`, but optionally run in a task.
259    async fn read_string_in_task_if(
260        &self,
261        in_task: InTask,
262        id: CatalogItemId,
263    ) -> Result<String, anyhow::Error>;
264}
265
266#[async_trait::async_trait]
267impl SecretsReaderExt for Arc<dyn SecretsReader> {
268    async fn read_in_task_if(
269        &self,
270        in_task: InTask,
271        id: CatalogItemId,
272    ) -> Result<Vec<u8>, anyhow::Error> {
273        let sr = Arc::clone(self);
274        async move { sr.read(id).await }
275            .run_in_task_if(in_task, || "secrets_reader_read".to_string())
276            .await
277    }
278    async fn read_string_in_task_if(
279        &self,
280        in_task: InTask,
281        id: CatalogItemId,
282    ) -> Result<String, anyhow::Error> {
283        let sr = Arc::clone(self);
284        async move { sr.read_string(id).await }
285            .run_in_task_if(in_task, || "secrets_reader_read".to_string())
286            .await
287    }
288}
289
290/// Extra context to pass through when instantiating a connection for a source
291/// or sink.
292///
293/// Should be kept cheaply cloneable.
294#[derive(Debug, Clone)]
295pub struct ConnectionContext {
296    /// An opaque identifier for the environment in which this process is
297    /// running.
298    ///
299    /// The storage layer is intentionally unaware of the structure within this
300    /// identifier. Higher layers of the stack can make use of that structure,
301    /// but the storage layer should be oblivious to it.
302    pub environment_id: String,
303    /// The level for librdkafka's logs.
304    pub librdkafka_log_level: tracing::Level,
305    /// A prefix for an external ID to use for all AWS AssumeRole operations.
306    pub aws_external_id_prefix: Option<AwsExternalIdPrefix>,
307    /// The ARN for a Materialize-controlled role to assume before assuming
308    /// a customer's requested role for an AWS connection.
309    pub aws_connection_role_arn: Option<String>,
310    /// A secrets reader.
311    pub secrets_reader: Arc<dyn SecretsReader>,
312    /// A cloud resource reader, if supported in this configuration.
313    pub cloud_resource_reader: Option<Arc<dyn CloudResourceReader>>,
314    /// A manager for SSH tunnels.
315    pub ssh_tunnel_manager: SshTunnelManager,
316}
317
318impl ConnectionContext {
319    /// Constructs a new connection context from command line arguments.
320    ///
321    /// **WARNING:** it is critical for security that the `aws_external_id` be
322    /// provided by the operator of the Materialize service (i.e., via a CLI
323    /// argument or environment variable) and not the end user of Materialize
324    /// (e.g., via a configuration option in a SQL statement). See
325    /// [`AwsExternalIdPrefix`] for details.
326    pub fn from_cli_args(
327        environment_id: String,
328        startup_log_level: &CloneableEnvFilter,
329        aws_external_id_prefix: Option<AwsExternalIdPrefix>,
330        aws_connection_role_arn: Option<String>,
331        secrets_reader: Arc<dyn SecretsReader>,
332        cloud_resource_reader: Option<Arc<dyn CloudResourceReader>>,
333    ) -> ConnectionContext {
334        ConnectionContext {
335            environment_id,
336            librdkafka_log_level: mz_ore::tracing::crate_level(
337                &startup_log_level.clone().into(),
338                "librdkafka",
339            ),
340            aws_external_id_prefix,
341            aws_connection_role_arn,
342            secrets_reader,
343            cloud_resource_reader,
344            ssh_tunnel_manager: SshTunnelManager::default(),
345        }
346    }
347
348    /// Constructs a new connection context for usage in tests.
349    pub fn for_tests(secrets_reader: Arc<dyn SecretsReader>) -> ConnectionContext {
350        ConnectionContext {
351            environment_id: "test-environment-id".into(),
352            librdkafka_log_level: tracing::Level::INFO,
353            aws_external_id_prefix: Some(
354                AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable(
355                    "test-aws-external-id-prefix",
356                )
357                .expect("infallible"),
358            ),
359            aws_connection_role_arn: Some(
360                "arn:aws:iam::123456789000:role/MaterializeConnection".into(),
361            ),
362            secrets_reader,
363            cloud_resource_reader: None,
364            ssh_tunnel_manager: SshTunnelManager::default(),
365        }
366    }
367}
368
369#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
370pub enum Connection<C: ConnectionAccess = InlinedConnection> {
371    Kafka(KafkaConnection<C>),
372    Csr(CsrConnection<C>),
373    GlueSchemaRegistry(GlueSchemaRegistryConnection<C>),
374    Postgres(PostgresConnection<C>),
375    Ssh(SshConnection),
376    Aws(AwsConnection),
377    AwsPrivatelink(AwsPrivatelinkConnection),
378    Gcp(gcp::GcpConnection),
379    MySql(MySqlConnection<C>),
380    SqlServer(SqlServerConnectionDetails<C>),
381    IcebergCatalog(IcebergCatalogConnection<C>),
382}
383
384impl<R: ConnectionResolver> IntoInlineConnection<Connection, R>
385    for Connection<ReferencedConnection>
386{
387    fn into_inline_connection(self, r: R) -> Connection {
388        match self {
389            Connection::Kafka(kafka) => Connection::Kafka(kafka.into_inline_connection(r)),
390            Connection::Csr(csr) => Connection::Csr(csr.into_inline_connection(r)),
391            Connection::GlueSchemaRegistry(glue) => {
392                Connection::GlueSchemaRegistry(glue.into_inline_connection(r))
393            }
394            Connection::Postgres(pg) => Connection::Postgres(pg.into_inline_connection(r)),
395            Connection::Ssh(ssh) => Connection::Ssh(ssh),
396            Connection::Aws(aws) => Connection::Aws(aws),
397            Connection::AwsPrivatelink(awspl) => Connection::AwsPrivatelink(awspl),
398            Connection::Gcp(gcp) => Connection::Gcp(gcp),
399            Connection::MySql(mysql) => Connection::MySql(mysql.into_inline_connection(r)),
400            Connection::SqlServer(sql_server) => {
401                Connection::SqlServer(sql_server.into_inline_connection(r))
402            }
403            Connection::IcebergCatalog(iceberg) => {
404                Connection::IcebergCatalog(iceberg.into_inline_connection(r))
405            }
406        }
407    }
408}
409
410impl<C: ConnectionAccess> Connection<C> {
411    /// Whether this connection should be validated by default on creation.
412    pub fn validate_by_default(&self) -> bool {
413        match self {
414            Connection::Kafka(conn) => conn.validate_by_default(),
415            Connection::Csr(conn) => conn.validate_by_default(),
416            Connection::GlueSchemaRegistry(conn) => conn.validate_by_default(),
417            Connection::Postgres(conn) => conn.validate_by_default(),
418            Connection::Ssh(conn) => conn.validate_by_default(),
419            Connection::Aws(conn) => conn.validate_by_default(),
420            Connection::AwsPrivatelink(conn) => conn.validate_by_default(),
421            Connection::Gcp(conn) => conn.validate_by_default(),
422            Connection::MySql(conn) => conn.validate_by_default(),
423            Connection::SqlServer(conn) => conn.validate_by_default(),
424            Connection::IcebergCatalog(conn) => conn.validate_by_default(),
425        }
426    }
427}
428
429impl Connection<InlinedConnection> {
430    /// Validates this connection by attempting to connect to the upstream system.
431    pub async fn validate(
432        &self,
433        id: CatalogItemId,
434        storage_configuration: &StorageConfiguration,
435    ) -> Result<(), ConnectionValidationError> {
436        match self {
437            Connection::Kafka(conn) => conn.validate(id, storage_configuration).await?,
438            Connection::Csr(conn) => conn.validate(id, storage_configuration).await?,
439            Connection::GlueSchemaRegistry(conn) => {
440                conn.validate(id, storage_configuration).await?
441            }
442            Connection::Postgres(conn) => {
443                conn.validate(id, storage_configuration).await?;
444            }
445            Connection::Ssh(conn) => conn.validate(id, storage_configuration).await?,
446            Connection::Aws(conn) => conn.validate(id, storage_configuration).await?,
447            Connection::AwsPrivatelink(conn) => conn.validate(id, storage_configuration).await?,
448            Connection::Gcp(conn) => conn.validate(id, storage_configuration).await?,
449            Connection::MySql(conn) => {
450                conn.validate(id, storage_configuration).await?;
451            }
452            Connection::SqlServer(conn) => {
453                conn.validate(id, storage_configuration).await?;
454            }
455            Connection::IcebergCatalog(conn) => conn.validate(id, storage_configuration).await?,
456        }
457        Ok(())
458    }
459
460    pub fn unwrap_kafka(self) -> <InlinedConnection as ConnectionAccess>::Kafka {
461        match self {
462            Self::Kafka(conn) => conn,
463            o => unreachable!("{o:?} is not a Kafka connection"),
464        }
465    }
466
467    pub fn unwrap_pg(self) -> <InlinedConnection as ConnectionAccess>::Pg {
468        match self {
469            Self::Postgres(conn) => conn,
470            o => unreachable!("{o:?} is not a Postgres connection"),
471        }
472    }
473
474    pub fn unwrap_mysql(self) -> <InlinedConnection as ConnectionAccess>::MySql {
475        match self {
476            Self::MySql(conn) => conn,
477            o => unreachable!("{o:?} is not a MySQL connection"),
478        }
479    }
480
481    pub fn unwrap_sql_server(self) -> <InlinedConnection as ConnectionAccess>::SqlServer {
482        match self {
483            Self::SqlServer(conn) => conn,
484            o => unreachable!("{o:?} is not a SQL Server connection"),
485        }
486    }
487
488    pub fn unwrap_aws(self) -> <InlinedConnection as ConnectionAccess>::Aws {
489        match self {
490            Self::Aws(conn) => conn,
491            o => unreachable!("{o:?} is not an AWS connection"),
492        }
493    }
494
495    pub fn unwrap_gcp(self) -> <InlinedConnection as ConnectionAccess>::Gcp {
496        match self {
497            Self::Gcp(conn) => conn,
498            o => unreachable!("{o:?} is not a GCP connection"),
499        }
500    }
501
502    pub fn unwrap_ssh(self) -> <InlinedConnection as ConnectionAccess>::Ssh {
503        match self {
504            Self::Ssh(conn) => conn,
505            o => unreachable!("{o:?} is not an SSH connection"),
506        }
507    }
508
509    pub fn unwrap_csr(self) -> <InlinedConnection as ConnectionAccess>::Csr {
510        match self {
511            Self::Csr(conn) => conn,
512            o => unreachable!("{o:?} is not a Kafka connection"),
513        }
514    }
515
516    pub fn unwrap_glue_schema_registry(
517        self,
518    ) -> <InlinedConnection as ConnectionAccess>::GlueSchemaRegistry {
519        match self {
520            Self::GlueSchemaRegistry(conn) => conn,
521            o => unreachable!("{o:?} is not an AWS Glue Schema Registry connection"),
522        }
523    }
524
525    pub fn unwrap_iceberg_catalog(self) -> <InlinedConnection as ConnectionAccess>::IcebergCatalog {
526        match self {
527            Self::IcebergCatalog(conn) => conn,
528            o => unreachable!("{o:?} is not an Iceberg catalog connection"),
529        }
530    }
531}
532
533/// An error returned by [`Connection::validate`].
534#[derive(thiserror::Error, Debug)]
535pub enum ConnectionValidationError {
536    #[error(transparent)]
537    Postgres(#[from] PostgresConnectionValidationError),
538    #[error(transparent)]
539    MySql(#[from] MySqlConnectionValidationError),
540    #[error(transparent)]
541    SqlServer(#[from] SqlServerConnectionValidationError),
542    #[error(transparent)]
543    Aws(#[from] AwsConnectionValidationError),
544    #[error(transparent)]
545    Gcp(#[from] gcp::GcpConnectionValidationError),
546    #[error("{}", .0.display_with_causes())]
547    Other(#[from] anyhow::Error),
548}
549
550impl ConnectionValidationError {
551    /// Reports additional details about the error, if any are available.
552    pub fn detail(&self) -> Option<String> {
553        match self {
554            ConnectionValidationError::Postgres(e) => e.detail(),
555            ConnectionValidationError::MySql(e) => e.detail(),
556            ConnectionValidationError::SqlServer(e) => e.detail(),
557            ConnectionValidationError::Aws(e) => e.detail(),
558            ConnectionValidationError::Gcp(e) => e.detail(),
559            ConnectionValidationError::Other(_) => None,
560        }
561    }
562
563    /// Reports a hint for the user about how the error could be fixed.
564    pub fn hint(&self) -> Option<String> {
565        match self {
566            ConnectionValidationError::Postgres(e) => e.hint(),
567            ConnectionValidationError::MySql(e) => e.hint(),
568            ConnectionValidationError::SqlServer(e) => e.hint(),
569            ConnectionValidationError::Aws(e) => e.hint(),
570            ConnectionValidationError::Gcp(e) => e.hint(),
571            ConnectionValidationError::Other(_) => None,
572        }
573    }
574}
575
576impl<C: ConnectionAccess> AlterCompatible for Connection<C> {
577    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
578        match (self, other) {
579            (Self::Aws(s), Self::Aws(o)) => s.alter_compatible(id, o),
580            (Self::AwsPrivatelink(s), Self::AwsPrivatelink(o)) => s.alter_compatible(id, o),
581            (Self::Gcp(s), Self::Gcp(o)) => s.alter_compatible(id, o),
582            (Self::Ssh(s), Self::Ssh(o)) => s.alter_compatible(id, o),
583            (Self::Csr(s), Self::Csr(o)) => s.alter_compatible(id, o),
584            (Self::Kafka(s), Self::Kafka(o)) => s.alter_compatible(id, o),
585            (Self::Postgres(s), Self::Postgres(o)) => s.alter_compatible(id, o),
586            (Self::MySql(s), Self::MySql(o)) => s.alter_compatible(id, o),
587            _ => {
588                tracing::warn!(
589                    "Connection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
590                    self,
591                    other
592                );
593                Err(AlterError { id })
594            }
595        }
596    }
597}
598
599/// Auth mechanism for Iceberg REST catalogs.
600#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
601pub enum IcebergCatalogAuth<C: ConnectionAccess = InlinedConnection> {
602    /// Use Iceberg catalog REST API's standard OAuth flow.
603    OAuth {
604        /// client_id:client_secret
605        credential: StringOrSecret,
606        /// OAuth2 scope
607        scope: Option<String>,
608    },
609    Gcp(GcpConnectionReference<C>),
610}
611
612#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
613pub struct RestIcebergCatalog<C: ConnectionAccess = InlinedConnection> {
614    pub auth: IcebergCatalogAuth<C>,
615    /// The warehouse for REST catalogs
616    pub warehouse: Option<String>,
617}
618
619#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
620pub struct S3TablesRestIcebergCatalog<C: ConnectionAccess = InlinedConnection> {
621    /// The AWS connection details, for s3tables
622    pub aws_connection: AwsConnectionReference<C>,
623    /// The warehouse for s3tables
624    pub warehouse: String,
625}
626
627impl<R: ConnectionResolver> IntoInlineConnection<IcebergCatalogAuth, R>
628    for IcebergCatalogAuth<ReferencedConnection>
629{
630    fn into_inline_connection(self, r: R) -> IcebergCatalogAuth {
631        match self {
632            IcebergCatalogAuth::Gcp(x) => IcebergCatalogAuth::Gcp(x.into_inline_connection(&r)),
633            IcebergCatalogAuth::OAuth { credential, scope } => {
634                IcebergCatalogAuth::OAuth { credential, scope }
635            }
636        }
637    }
638}
639
640impl<R: ConnectionResolver> IntoInlineConnection<RestIcebergCatalog, R>
641    for RestIcebergCatalog<ReferencedConnection>
642{
643    fn into_inline_connection(self, r: R) -> RestIcebergCatalog {
644        RestIcebergCatalog {
645            auth: self.auth.into_inline_connection(&r),
646            warehouse: self.warehouse,
647        }
648    }
649}
650
651impl<R: ConnectionResolver> IntoInlineConnection<S3TablesRestIcebergCatalog, R>
652    for S3TablesRestIcebergCatalog<ReferencedConnection>
653{
654    fn into_inline_connection(self, r: R) -> S3TablesRestIcebergCatalog {
655        S3TablesRestIcebergCatalog {
656            aws_connection: self.aws_connection.into_inline_connection(&r),
657            warehouse: self.warehouse,
658        }
659    }
660}
661
662#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
663pub enum IcebergCatalogType {
664    Rest,
665    S3TablesRest,
666}
667
668#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
669pub enum IcebergCatalogImpl<C: ConnectionAccess = InlinedConnection> {
670    Rest(RestIcebergCatalog<C>),
671    S3TablesRest(S3TablesRestIcebergCatalog<C>),
672}
673
674impl<R: ConnectionResolver> IntoInlineConnection<IcebergCatalogImpl, R>
675    for IcebergCatalogImpl<ReferencedConnection>
676{
677    fn into_inline_connection(self, r: R) -> IcebergCatalogImpl {
678        match self {
679            IcebergCatalogImpl::Rest(rest) => {
680                IcebergCatalogImpl::Rest(rest.into_inline_connection(r))
681            }
682            IcebergCatalogImpl::S3TablesRest(s3tables) => {
683                IcebergCatalogImpl::S3TablesRest(s3tables.into_inline_connection(r))
684            }
685        }
686    }
687}
688
689#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
690pub struct IcebergCatalogConnection<C: ConnectionAccess = InlinedConnection> {
691    /// The catalog impl impl of that catalog
692    pub catalog: IcebergCatalogImpl<C>,
693    /// Where the catalog is located
694    pub uri: reqwest::Url,
695}
696
697impl AlterCompatible for IcebergCatalogConnection {
698    fn alter_compatible(&self, id: GlobalId, _other: &Self) -> Result<(), AlterError> {
699        Err(AlterError { id })
700    }
701}
702
703impl<R: ConnectionResolver> IntoInlineConnection<IcebergCatalogConnection, R>
704    for IcebergCatalogConnection<ReferencedConnection>
705{
706    fn into_inline_connection(self, r: R) -> IcebergCatalogConnection {
707        IcebergCatalogConnection {
708            catalog: self.catalog.into_inline_connection(&r),
709            uri: self.uri,
710        }
711    }
712}
713
714impl<C: ConnectionAccess> IcebergCatalogConnection<C> {
715    fn validate_by_default(&self) -> bool {
716        true
717    }
718}
719
720impl IcebergCatalogConnection<InlinedConnection> {
721    pub async fn connect(
722        &self,
723        storage_configuration: &StorageConfiguration,
724        in_task: InTask,
725    ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
726        match self.catalog {
727            IcebergCatalogImpl::S3TablesRest(ref s3tables) => {
728                self.connect_s3tables(s3tables, storage_configuration, in_task)
729                    .await
730            }
731            IcebergCatalogImpl::Rest(ref rest) => {
732                self.connect_rest(rest, storage_configuration, in_task)
733                    .await
734            }
735        }
736    }
737
738    pub fn catalog_type(&self) -> IcebergCatalogType {
739        match self.catalog {
740            IcebergCatalogImpl::S3TablesRest(_) => IcebergCatalogType::S3TablesRest,
741            IcebergCatalogImpl::Rest(_) => IcebergCatalogType::Rest,
742        }
743    }
744
745    pub fn s3tables_catalog(&self) -> Option<&S3TablesRestIcebergCatalog> {
746        match &self.catalog {
747            IcebergCatalogImpl::S3TablesRest(s3tables) => Some(s3tables),
748            IcebergCatalogImpl::Rest(_) => None,
749        }
750    }
751
752    pub fn rest_catalog(&self) -> Option<&RestIcebergCatalog> {
753        match &self.catalog {
754            IcebergCatalogImpl::Rest(rest) => Some(rest),
755            IcebergCatalogImpl::S3TablesRest(_) => None,
756        }
757    }
758
759    async fn connect_s3tables(
760        &self,
761        s3tables: &S3TablesRestIcebergCatalog,
762        storage_configuration: &StorageConfiguration,
763        in_task: InTask,
764    ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
765        let secret_reader = &storage_configuration.connection_context.secrets_reader;
766        let aws_ref = &s3tables.aws_connection;
767        let aws_config = aws_ref
768            .connection
769            .load_sdk_config(
770                &storage_configuration.connection_context,
771                aws_ref.connection_id,
772                in_task,
773                ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
774            )
775            .await
776            .with_context(|| {
777                format!(
778                    "failed to load AWS SDK config for S3 Tables Iceberg catalog \
779                     (connection id: {}, auth method: {}, catalog uri: {}, warehouse: {})",
780                    aws_ref.connection_id,
781                    aws_ref.connection.auth_method(),
782                    self.uri,
783                    s3tables.warehouse
784                )
785            })?;
786
787        let aws_region = aws_ref
788            .connection
789            .region
790            .clone()
791            .unwrap_or_else(|| "us-east-1".to_string());
792
793        let mut props = vec![
794            (S3_REGION.to_string(), aws_region.clone()),
795            (S3_DISABLE_EC2_METADATA.to_string(), "true".to_string()),
796            (
797                REST_CATALOG_PROP_WAREHOUSE.to_string(),
798                s3tables.warehouse.clone(),
799            ),
800            (REST_CATALOG_PROP_URI.to_string(), self.uri.to_string()),
801        ];
802
803        let aws_auth = aws_ref.connection.auth.clone();
804
805        if let AwsAuth::Credentials(creds) = &aws_auth {
806            props.push((
807                S3_ACCESS_KEY_ID.to_string(),
808                creds
809                    .access_key_id
810                    .get_string(in_task, secret_reader)
811                    .await?,
812            ));
813            props.push((
814                S3_SECRET_ACCESS_KEY.to_string(),
815                secret_reader.read_string(creds.secret_access_key).await?,
816            ));
817        }
818
819        // Sign REST catalog requests with the Materialize AWS credential chain
820        // via a custom `RequestAuthenticator`. For AssumeRole auth, also feed
821        // the chain to OpenDAL's S3 loader so data-file IO uses the same creds.
822        let credentials_provider = aws_config
823            .credentials_provider()
824            .ok_or_else(|| anyhow!("aws_config missing credentials provider"))?;
825
826        let authenticator = Arc::new(Sigv4Authenticator {
827            provider: credentials_provider.clone(),
828            region: aws_region.clone(),
829            signing_name: "s3tables".to_string(),
830        });
831
832        // N.B. We're using the AWS credentials from the catalog connection for the storage layer
833        //   even though the sink comes with its own (unused) AWS credentials for storage.
834        let customized_credential_load = if matches!(aws_auth, AwsAuth::AssumeRole(_)) {
835            Some(CustomAwsCredentialLoader::new(Arc::new(
836                AwsSdkCredentialLoader::new(credentials_provider),
837            )))
838        } else {
839            None
840        };
841
842        let storage_factory = Arc::new(OpenDalStorageFactory::S3 {
843            configured_scheme: "s3".to_string(),
844            customized_credential_load,
845        });
846
847        let catalog = RestCatalogBuilder::default()
848            .with_storage_factory(storage_factory)
849            .with_authenticator(authenticator)
850            .load("IcebergCatalog", props.into_iter().collect())
851            .await
852            .with_context(|| {
853                format!(
854                    "failed to create S3 Tables Iceberg catalog \
855                     (connection id: {}, catalog uri: {}, warehouse: {})",
856                    aws_ref.connection_id, self.uri, s3tables.warehouse
857                )
858            })?;
859
860        Ok(Arc::new(catalog))
861    }
862
863    async fn connect_rest(
864        &self,
865        rest: &RestIcebergCatalog,
866        storage_configuration: &StorageConfiguration,
867        in_task: InTask,
868    ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
869        let mut props = BTreeMap::from([(
870            REST_CATALOG_PROP_URI.to_string(),
871            self.uri.to_string().clone(),
872        )]);
873
874        if let Some(warehouse) = &rest.warehouse {
875            props.insert(REST_CATALOG_PROP_WAREHOUSE.to_string(), warehouse.clone());
876        }
877
878        // Catalog auth is configured through a combination of `props` and `.with_authenticator(...)`,
879        // which happen at different stages of the [`RestCatalogBuilder`] -> [`RestCatalog`]
880        // construction pipeline.
881        let (storage_factory, custom_authenticator) = match &rest.auth {
882            IcebergCatalogAuth::OAuth { credential, scope } => {
883                let credential = credential
884                    .get_string(
885                        in_task,
886                        &storage_configuration.connection_context.secrets_reader,
887                    )
888                    .await
889                    .map_err(|e| anyhow!("failed to read Iceberg catalog credential: {e}"))?;
890                props.insert(REST_CATALOG_PROP_CREDENTIAL.to_string(), credential);
891
892                if let Some(scope) = scope {
893                    props.insert(REST_CATALOG_PROP_SCOPE.to_string(), scope.clone());
894                }
895                (
896                    OpenDalStorageFactory::S3 {
897                        configured_scheme: "s3".to_string(),
898                        // When used with MinIO, Polaris returns a config with:
899                        //   s3.access-key-id, s3.secret-access-key, s3.endpoint, ...
900                        // `iceberg-rust` forwards these props to `opendal`.
901                        // N.B. This is not confirmed to work with other catalog & storage implementations.
902                        customized_credential_load: None,
903                    },
904                    None,
905                )
906            }
907            IcebergCatalogAuth::Gcp(gcp_connection_reference) => {
908                let (creds_json, service_account) = gcp_connection_reference
909                    .connection
910                    .read_credentials(storage_configuration)
911                    .await
912                    .map_err(|e| anyhow!("failed to parse GCP service account JSON: {e}"))?;
913
914                props.insert(
915                    GCS_CREDENTIALS_JSON.to_owned(),
916                    base64::engine::general_purpose::STANDARD.encode(creds_json),
917                );
918                // We supplied a service account key. Don't look elsewhere for GCP credentials.
919                props.insert(GCS_DISABLE_VM_METADATA.to_owned(), "true".to_owned());
920                props.insert(GCS_DISABLE_CONFIG_LOAD.to_owned(), "true".to_owned());
921                if let Some(project_id) = service_account.project_id() {
922                    props.insert(GCS_USER_PROJECT.to_owned(), project_id.to_owned());
923                    props.insert(
924                        "header.x-goog-user-project".to_owned(),
925                        project_id.to_owned(),
926                    );
927                }
928
929                (
930                    OpenDalStorageFactory::Gcs,
931                    Some(iceberg_catalog_rest::BearerTokenAuthenticator::new(
932                        Arc::new(GcpTokenProvider { service_account }),
933                    )),
934                )
935            }
936        };
937
938        let mut catalog =
939            RestCatalogBuilder::default().with_storage_factory(Arc::new(storage_factory));
940        if let Some(auth) = custom_authenticator {
941            catalog = catalog.with_authenticator(Arc::new(auth));
942        }
943        let catalog = catalog
944            .load("IcebergCatalog", props.into_iter().collect())
945            .await
946            .map_err(|e| anyhow!("failed to create Iceberg catalog: {e}"))?;
947        Ok(Arc::new(catalog))
948    }
949
950    async fn validate(
951        &self,
952        _id: CatalogItemId,
953        storage_configuration: &StorageConfiguration,
954    ) -> Result<(), ConnectionValidationError> {
955        let catalog = self
956            .connect(storage_configuration, InTask::No)
957            .await
958            .map_err(|e| {
959                ConnectionValidationError::Other(anyhow!("failed to connect to catalog: {e}"))
960            })?;
961
962        // If we can list namespaces, the connection is valid.
963        catalog.list_namespaces(None).await.map_err(|e| {
964            ConnectionValidationError::Other(anyhow!("failed to list namespaces: {e}"))
965        })?;
966
967        Ok(())
968    }
969}
970
971#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
972pub struct AwsPrivatelinkConnection {
973    pub service_name: String,
974    pub availability_zones: Vec<String>,
975}
976
977impl AlterCompatible for AwsPrivatelinkConnection {
978    fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
979        // Every element of the AwsPrivatelinkConnection connection is configurable.
980        Ok(())
981    }
982}
983
984#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
985pub struct KafkaTlsConfig {
986    pub identity: Option<TlsIdentity>,
987    pub root_cert: Option<StringOrSecret>,
988}
989
990#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
991pub struct KafkaSaslConfig<C: ConnectionAccess = InlinedConnection> {
992    pub mechanism: String,
993    pub username: StringOrSecret,
994    pub password: Option<CatalogItemId>,
995    pub aws: Option<AwsConnectionReference<C>>,
996}
997
998impl<R: ConnectionResolver> IntoInlineConnection<KafkaSaslConfig, R>
999    for KafkaSaslConfig<ReferencedConnection>
1000{
1001    fn into_inline_connection(self, r: R) -> KafkaSaslConfig {
1002        KafkaSaslConfig {
1003            mechanism: self.mechanism,
1004            username: self.username,
1005            password: self.password,
1006            aws: self.aws.map(|aws| aws.into_inline_connection(&r)),
1007        }
1008    }
1009}
1010
1011/// Specifies a Kafka broker in a [`KafkaConnection`].
1012#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1013pub struct KafkaBroker<C: ConnectionAccess = InlinedConnection> {
1014    /// The address of the Kafka broker.
1015    pub address: String,
1016    /// An optional tunnel to use when connecting to the broker.
1017    pub tunnel: Tunnel<C>,
1018}
1019
1020impl<R: ConnectionResolver> IntoInlineConnection<KafkaBroker, R>
1021    for KafkaBroker<ReferencedConnection>
1022{
1023    fn into_inline_connection(self, r: R) -> KafkaBroker {
1024        let KafkaBroker { address, tunnel } = self;
1025        KafkaBroker {
1026            address,
1027            tunnel: tunnel.into_inline_connection(r),
1028        }
1029    }
1030}
1031
1032#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Default)]
1033pub struct KafkaTopicOptions {
1034    /// The replication factor for the topic.
1035    /// If `None`, the broker default will be used.
1036    pub replication_factor: Option<NonNeg<i32>>,
1037    /// The number of partitions to create.
1038    /// If `None`, the broker default will be used.
1039    pub partition_count: Option<NonNeg<i32>>,
1040    /// The initial configuration parameters for the topic.
1041    pub topic_config: BTreeMap<String, String>,
1042}
1043
1044#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1045pub struct KafkaConnection<C: ConnectionAccess = InlinedConnection> {
1046    pub brokers: Vec<KafkaBroker<C>>,
1047    /// A tunnel through which to route traffic,
1048    /// that can be overridden for individual brokers
1049    /// in `brokers`.
1050    pub default_tunnel: Tunnel<C>,
1051    pub progress_topic: Option<String>,
1052    pub progress_topic_options: KafkaTopicOptions,
1053    pub options: BTreeMap<String, StringOrSecret>,
1054    pub tls: Option<KafkaTlsConfig>,
1055    pub sasl: Option<KafkaSaslConfig<C>>,
1056}
1057
1058impl<R: ConnectionResolver> IntoInlineConnection<KafkaConnection, R>
1059    for KafkaConnection<ReferencedConnection>
1060{
1061    fn into_inline_connection(self, r: R) -> KafkaConnection {
1062        let KafkaConnection {
1063            brokers,
1064            progress_topic,
1065            progress_topic_options,
1066            default_tunnel,
1067            options,
1068            tls,
1069            sasl,
1070        } = self;
1071
1072        let brokers = brokers
1073            .into_iter()
1074            .map(|broker| broker.into_inline_connection(&r))
1075            .collect();
1076
1077        KafkaConnection {
1078            brokers,
1079            progress_topic,
1080            progress_topic_options,
1081            default_tunnel: default_tunnel.into_inline_connection(&r),
1082            options,
1083            tls,
1084            sasl: sasl.map(|sasl| sasl.into_inline_connection(&r)),
1085        }
1086    }
1087}
1088
1089impl<C: ConnectionAccess> KafkaConnection<C> {
1090    /// Returns the name of the progress topic to use for the connection.
1091    ///
1092    /// The caller is responsible for providing the connection ID as it is not
1093    /// known to `KafkaConnection`.
1094    pub fn progress_topic(
1095        &self,
1096        connection_context: &ConnectionContext,
1097        connection_id: CatalogItemId,
1098    ) -> Cow<'_, str> {
1099        if let Some(progress_topic) = &self.progress_topic {
1100            Cow::Borrowed(progress_topic)
1101        } else {
1102            Cow::Owned(format!(
1103                "_materialize-progress-{}-{}",
1104                connection_context.environment_id, connection_id,
1105            ))
1106        }
1107    }
1108
1109    fn validate_by_default(&self) -> bool {
1110        true
1111    }
1112}
1113
1114impl KafkaConnection {
1115    /// Generates a string that can be used as the base for a configuration ID
1116    /// (e.g., `client.id`, `group.id`, `transactional.id`) for a Kafka source
1117    /// or sink.
1118    pub fn id_base(
1119        connection_context: &ConnectionContext,
1120        connection_id: CatalogItemId,
1121        object_id: GlobalId,
1122    ) -> String {
1123        format!(
1124            "materialize-{}-{}-{}",
1125            connection_context.environment_id, connection_id, object_id,
1126        )
1127    }
1128
1129    /// Enriches the provided `client_id` according to any enrichment rules in
1130    /// the `kafka_client_id_enrichment_rules` configuration parameter.
1131    pub fn enrich_client_id(&self, configs: &ConfigSet, client_id: &mut String) {
1132        #[derive(Debug, Deserialize)]
1133        struct EnrichmentRule {
1134            #[serde(deserialize_with = "deserialize_regex")]
1135            pattern: Regex,
1136            payload: String,
1137        }
1138
1139        fn deserialize_regex<'de, D>(deserializer: D) -> Result<Regex, D::Error>
1140        where
1141            D: Deserializer<'de>,
1142        {
1143            let buf = String::deserialize(deserializer)?;
1144            Regex::new(&buf).map_err(serde::de::Error::custom)
1145        }
1146
1147        let rules = KAFKA_CLIENT_ID_ENRICHMENT_RULES.get(configs);
1148        let rules = match serde_json::from_value::<Vec<EnrichmentRule>>(rules) {
1149            Ok(rules) => rules,
1150            Err(e) => {
1151                warn!(%e, "failed to decode kafka_client_id_enrichment_rules");
1152                return;
1153            }
1154        };
1155
1156        // Check every rule against every broker. Rules are matched in the order
1157        // that they are specified. It is usually a configuration error if
1158        // multiple rules match the same list of Kafka brokers, but we
1159        // nonetheless want to provide well defined semantics.
1160        debug!(?self.brokers, "evaluating client ID enrichment rules");
1161        for rule in rules {
1162            let is_match = self
1163                .brokers
1164                .iter()
1165                .any(|b| rule.pattern.is_match(&b.address));
1166            debug!(?rule, is_match, "evaluated client ID enrichment rule");
1167            if is_match {
1168                client_id.push('-');
1169                client_id.push_str(&rule.payload);
1170            }
1171        }
1172    }
1173
1174    /// Creates a Kafka client for the connection.
1175    pub async fn create_with_context<C, T>(
1176        &self,
1177        storage_configuration: &StorageConfiguration,
1178        context: C,
1179        extra_options: &BTreeMap<&str, String>,
1180        in_task: InTask,
1181    ) -> Result<T, ContextCreationError>
1182    where
1183        C: ClientContext,
1184        T: FromClientConfigAndContext<TunnelingClientContext<C>>,
1185    {
1186        let mut options = self.options.clone();
1187
1188        // Ensure that Kafka topics are *not* automatically created when
1189        // consuming, producing, or fetching metadata for a topic. This ensures
1190        // that we don't accidentally create topics with the wrong number of
1191        // partitions.
1192        options.insert("allow.auto.create.topics".into(), "false".into());
1193
1194        let brokers = match &self.default_tunnel {
1195            Tunnel::AwsPrivatelink(t) => {
1196                assert!(&self.brokers.is_empty());
1197
1198                let algo = KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM
1199                    .get(storage_configuration.config_set());
1200                options.insert("ssl.endpoint.identification.algorithm".into(), algo.into());
1201
1202                // When using a default privatelink tunnel broker/brokers cannot be specified
1203                // instead the tunnel connection_id and port are used for the initial connection.
1204                format!(
1205                    "{}:{}",
1206                    vpc_endpoint_host(
1207                        t.connection_id,
1208                        None, // Default tunnel does not support availability zones.
1209                    ),
1210                    t.port.unwrap_or(9092)
1211                )
1212            }
1213            Tunnel::AwsPrivatelinks(_pl) => {
1214                let algo = KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM
1215                    .get(storage_configuration.config_set());
1216                options.insert("ssl.endpoint.identification.algorithm".into(), algo.into());
1217
1218                if self.brokers.is_empty() {
1219                    return Err(ContextCreationError::Other(anyhow::anyhow!(
1220                        "at least one static broker is required when using BROKER or BROKERS"
1221                    )));
1222                }
1223                self.brokers.iter().map(|b| &b.address).join(",")
1224            }
1225            _ => self.brokers.iter().map(|b| &b.address).join(","),
1226        };
1227        options.insert("bootstrap.servers".into(), brokers.clone().into());
1228        let security_protocol = match (self.tls.is_some(), self.sasl.is_some()) {
1229            (false, false) => "PLAINTEXT",
1230            (true, false) => "SSL",
1231            (false, true) => "SASL_PLAINTEXT",
1232            (true, true) => "SASL_SSL",
1233        };
1234        info!(
1235            "kafka: create_with_context bootstrap.servers={brokers}, security_protocol={security_protocol}"
1236        );
1237        options.insert("security.protocol".into(), security_protocol.into());
1238        if let Some(tls) = &self.tls {
1239            if let Some(root_cert) = &tls.root_cert {
1240                options.insert("ssl.ca.pem".into(), root_cert.clone());
1241            }
1242            if let Some(identity) = &tls.identity {
1243                options.insert("ssl.key.pem".into(), StringOrSecret::Secret(identity.key));
1244                options.insert("ssl.certificate.pem".into(), identity.cert.clone());
1245            }
1246        }
1247        if let Some(sasl) = &self.sasl {
1248            options.insert("sasl.mechanisms".into(), (&sasl.mechanism).into());
1249            options.insert("sasl.username".into(), sasl.username.clone());
1250            if let Some(password) = sasl.password {
1251                options.insert("sasl.password".into(), StringOrSecret::Secret(password));
1252            }
1253        }
1254
1255        options.insert(
1256            "retry.backoff.ms".into(),
1257            KAFKA_RETRY_BACKOFF
1258                .get(storage_configuration.config_set())
1259                .as_millis()
1260                .into(),
1261        );
1262        options.insert(
1263            "retry.backoff.max.ms".into(),
1264            KAFKA_RETRY_BACKOFF_MAX
1265                .get(storage_configuration.config_set())
1266                .as_millis()
1267                .into(),
1268        );
1269        options.insert(
1270            "reconnect.backoff.ms".into(),
1271            KAFKA_RECONNECT_BACKOFF
1272                .get(storage_configuration.config_set())
1273                .as_millis()
1274                .into(),
1275        );
1276        options.insert(
1277            "reconnect.backoff.max.ms".into(),
1278            KAFKA_RECONNECT_BACKOFF_MAX
1279                .get(storage_configuration.config_set())
1280                .as_millis()
1281                .into(),
1282        );
1283
1284        let mut config = mz_kafka_util::client::create_new_client_config(
1285            storage_configuration
1286                .connection_context
1287                .librdkafka_log_level,
1288            storage_configuration.parameters.kafka_timeout_config,
1289        );
1290        for (k, v) in options {
1291            config.set(
1292                k,
1293                v.get_string(
1294                    in_task,
1295                    &storage_configuration.connection_context.secrets_reader,
1296                )
1297                .await
1298                .context("reading kafka secret")?,
1299            );
1300        }
1301        for (k, v) in extra_options {
1302            config.set(*k, v);
1303        }
1304
1305        let aws_config = match self.sasl.as_ref().and_then(|sasl| sasl.aws.as_ref()) {
1306            None => None,
1307            Some(aws) => Some(
1308                aws.connection
1309                    .load_sdk_config(
1310                        &storage_configuration.connection_context,
1311                        aws.connection_id,
1312                        in_task,
1313                        ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1314                    )
1315                    .await?,
1316            ),
1317        };
1318
1319        // TODO(roshan): Implement enforcement of external address validation once
1320        // rdkafka client has been updated to support providing multiple resolved
1321        // addresses for brokers
1322        let mut context = TunnelingClientContext::new(
1323            context,
1324            Handle::current(),
1325            storage_configuration
1326                .connection_context
1327                .ssh_tunnel_manager
1328                .clone(),
1329            storage_configuration.parameters.ssh_timeout_config,
1330            aws_config,
1331            in_task,
1332        );
1333
1334        match &self.default_tunnel {
1335            Tunnel::Direct => {
1336                // By default, don't offer a default override for broker address lookup.
1337            }
1338            Tunnel::AwsPrivatelink(pl) => {
1339                context.set_default_tunnel(TunnelConfig::StaticHost(
1340                    // Possible bug: We have been ignoring the configured port.
1341                    KafkaConnection::from_default_aws_privatelink(pl).host,
1342                ));
1343            }
1344            Tunnel::AwsPrivatelinks(pl) => {
1345                context.set_default_tunnel(TunnelConfig::Rules(
1346                    KafkaConnection::from_aws_privatelinks(pl),
1347                ));
1348            }
1349            Tunnel::Ssh(ssh_tunnel) => {
1350                let secret = storage_configuration
1351                    .connection_context
1352                    .secrets_reader
1353                    .read_in_task_if(in_task, ssh_tunnel.connection_id)
1354                    .await?;
1355                let key_pair = SshKeyPair::from_bytes(&secret)?;
1356
1357                // Ensure any ssh-bastion address we connect to is resolved to an external address.
1358                let resolved = resolve_address(
1359                    &ssh_tunnel.connection.host,
1360                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1361                )
1362                .await?;
1363                context.set_default_tunnel(TunnelConfig::Ssh(SshTunnelConfig {
1364                    host: resolved
1365                        .iter()
1366                        .map(|a| a.to_string())
1367                        .collect::<BTreeSet<_>>(),
1368                    port: ssh_tunnel.connection.port,
1369                    user: ssh_tunnel.connection.user.clone(),
1370                    key_pair,
1371                }));
1372            }
1373        }
1374        info!(
1375            "kafka: tunnel config set to {}",
1376            match &self.default_tunnel {
1377                Tunnel::Direct => "Direct".to_string(),
1378                Tunnel::AwsPrivatelink(_) => "AwsPrivatelink (static host)".to_string(),
1379                Tunnel::AwsPrivatelinks(pl) =>
1380                    format!("AwsPrivatelinks ({} rules)", pl.rules.len()),
1381                Tunnel::Ssh(_) => "Ssh".to_string(),
1382            }
1383        );
1384
1385        // Here, we preemptively rewrite broker addresses.
1386        // In concept, this overlaps with 'TunnelingClientContext::resolve_broker_addr'.
1387        for broker in &self.brokers {
1388            let mut addr_parts = broker.address.splitn(2, ':');
1389            let addr = BrokerAddr {
1390                host: addr_parts
1391                    .next()
1392                    .context("BROKER is not address:port")?
1393                    .into(),
1394                port: addr_parts
1395                    .next()
1396                    .unwrap_or("9092")
1397                    .parse()
1398                    .context("parsing BROKER port")?,
1399            };
1400            match &broker.tunnel {
1401                Tunnel::Direct => {
1402                    // By default, don't override broker address lookup.
1403                    //
1404                    // N.B.
1405                    //
1406                    // We _could_ pre-setup the default ssh tunnel for all known brokers here, but
1407                    // we avoid doing because:
1408                    // - Its not necessary.
1409                    // - Not doing so makes it easier to test the `FailedDefaultSshTunnel` path
1410                    // in the `TunnelingClientContext`.
1411                }
1412                Tunnel::AwsPrivatelink(aws_privatelink) => {
1413                    context.add_broker_rewrite(
1414                        addr,
1415                        KafkaConnection::from_aws_privatelink(aws_privatelink),
1416                    );
1417                }
1418                Tunnel::AwsPrivatelinks(_) => unreachable!(
1419                    "Individually predefined brokers do not use rule-based PrivateLinks routing."
1420                ),
1421                Tunnel::Ssh(ssh_tunnel) => {
1422                    // Ensure any SSH bastion address we connect to is resolved to an external address.
1423                    let ssh_host_resolved = resolve_address(
1424                        &ssh_tunnel.connection.host,
1425                        ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1426                    )
1427                    .await?;
1428                    context
1429                        .add_ssh_tunnel(
1430                            addr,
1431                            SshTunnelConfig {
1432                                host: ssh_host_resolved
1433                                    .iter()
1434                                    .map(|a| a.to_string())
1435                                    .collect::<BTreeSet<_>>(),
1436                                port: ssh_tunnel.connection.port,
1437                                user: ssh_tunnel.connection.user.clone(),
1438                                key_pair: SshKeyPair::from_bytes(
1439                                    &storage_configuration
1440                                        .connection_context
1441                                        .secrets_reader
1442                                        .read_in_task_if(in_task, ssh_tunnel.connection_id)
1443                                        .await?,
1444                                )?,
1445                            },
1446                        )
1447                        .await
1448                        .map_err(ContextCreationError::Ssh)?;
1449                }
1450            }
1451        }
1452
1453        Ok(config.create_with_context(context)?)
1454    }
1455
1456    async fn validate(
1457        &self,
1458        _id: CatalogItemId,
1459        storage_configuration: &StorageConfiguration,
1460    ) -> Result<(), anyhow::Error> {
1461        let (context, error_rx) = MzClientContext::with_errors();
1462        let consumer: BaseConsumer<_> = self
1463            .create_with_context(
1464                storage_configuration,
1465                context,
1466                &BTreeMap::new(),
1467                // We are in a normal tokio context during validation, already.
1468                InTask::No,
1469            )
1470            .await?;
1471        let consumer = Arc::new(consumer);
1472
1473        let timeout = storage_configuration
1474            .parameters
1475            .kafka_timeout_config
1476            .fetch_metadata_timeout;
1477
1478        // librdkafka doesn't expose an API for determining whether a connection to
1479        // the Kafka cluster has been successfully established. So we make a
1480        // metadata request, though we don't care about the results, so that we can
1481        // report any errors making that request. If the request succeeds, we know
1482        // we were able to contact at least one broker, and that's a good proxy for
1483        // being able to contact all the brokers in the cluster.
1484        //
1485        // The downside of this approach is it produces a generic error message like
1486        // "metadata fetch error" with no additional details. The real networking
1487        // error is buried in the librdkafka logs, which are not visible to users.
1488        info!("kafka: starting connection validation via fetch_metadata (timeout={timeout:?})");
1489        let result = mz_ore::task::spawn_blocking(|| "kafka_get_metadata", {
1490            let consumer = Arc::clone(&consumer);
1491            move || consumer.fetch_metadata(None, timeout)
1492        })
1493        .await;
1494        info!(
1495            "kafka: connection validation result: {}",
1496            if result.is_ok() { "success" } else { "failed" },
1497        );
1498        match result {
1499            Ok(_) => Ok(()),
1500            // The error returned by `fetch_metadata` does not provide any details which makes for
1501            // a crappy user facing error message. For this reason we attempt to grab a better
1502            // error message from the client context, which should contain any error logs emitted
1503            // by librdkafka, and fallback to the generic error if there is nothing there.
1504            Err(err) => {
1505                // Multiple errors might have been logged during this validation but some are more
1506                // relevant than others. Specifically, we prefer non-internal errors over internal
1507                // errors since those give much more useful information to the users.
1508                let main_err = error_rx.try_iter().reduce(|cur, new| match cur {
1509                    MzKafkaError::Internal(_) => new,
1510                    _ => cur,
1511                });
1512
1513                // Don't drop the consumer until after we've drained the errors
1514                // channel. Dropping the consumer can introduce spurious errors.
1515                // See database-issues#7432.
1516                drop(consumer);
1517
1518                match main_err {
1519                    Some(err) => Err(err.into()),
1520                    None => Err(err.into()),
1521                }
1522            }
1523        }
1524    }
1525
1526    /// The "default" PrivateLink connection is used for bootstrapping Kafka.
1527    fn from_default_aws_privatelink(pl: &AwsPrivatelink) -> BrokerRewrite {
1528        BrokerRewrite {
1529            host: vpc_endpoint_host(
1530                pl.connection_id,
1531                None, // Default tunnel does not support availability zones.
1532            ),
1533            port: pl.port,
1534        }
1535    }
1536
1537    /// The "not default" PrivateLink connections are used for routing to specific Kafka brokers.
1538    fn from_aws_privatelink(pl: &AwsPrivatelink) -> BrokerRewrite {
1539        BrokerRewrite {
1540            host: vpc_endpoint_host(pl.connection_id, pl.availability_zone.as_deref()),
1541            port: pl.port,
1542        }
1543    }
1544
1545    fn from_aws_privatelink_rule(
1546        AwsPrivatelinkRule { pattern, to }: &AwsPrivatelinkRule,
1547    ) -> (mz_kafka_util::client::ConnectionRulePattern, BrokerRewrite) {
1548        (
1549            mz_kafka_util::client::ConnectionRulePattern {
1550                prefix_wildcard: pattern.prefix_wildcard,
1551                literal_match: pattern.literal_match.clone(),
1552                suffix_wildcard: pattern.suffix_wildcard,
1553            },
1554            KafkaConnection::from_aws_privatelink(to),
1555        )
1556    }
1557
1558    fn from_aws_privatelinks(pl: &AwsPrivatelinks) -> HostMappingRules {
1559        HostMappingRules {
1560            rules: pl
1561                .rules
1562                .iter()
1563                .map(KafkaConnection::from_aws_privatelink_rule)
1564                .collect_vec(),
1565        }
1566    }
1567}
1568
1569impl<C: ConnectionAccess> AlterCompatible for KafkaConnection<C> {
1570    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1571        let KafkaConnection {
1572            brokers: _,
1573            default_tunnel: _,
1574            progress_topic,
1575            progress_topic_options,
1576            options: _,
1577            tls: _,
1578            sasl: _,
1579        } = self;
1580
1581        let compatibility_checks = [
1582            (progress_topic == &other.progress_topic, "progress_topic"),
1583            (
1584                progress_topic_options == &other.progress_topic_options,
1585                "progress_topic_options",
1586            ),
1587        ];
1588
1589        for (compatible, field) in compatibility_checks {
1590            if !compatible {
1591                tracing::warn!(
1592                    "KafkaConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1593                    self,
1594                    other
1595                );
1596
1597                return Err(AlterError { id });
1598            }
1599        }
1600
1601        Ok(())
1602    }
1603}
1604
1605/// A connection to a Confluent Schema Registry.
1606#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1607pub struct CsrConnection<C: ConnectionAccess = InlinedConnection> {
1608    /// The URL of the schema registry.
1609    pub url: Url,
1610    /// Trusted root TLS certificate in PEM format.
1611    pub tls_root_cert: Option<StringOrSecret>,
1612    /// An optional TLS client certificate for authentication with the schema
1613    /// registry.
1614    pub tls_identity: Option<TlsIdentity>,
1615    /// Optional HTTP authentication credentials for the schema registry.
1616    pub http_auth: Option<CsrConnectionHttpAuth>,
1617    /// A tunnel through which to route traffic.
1618    pub tunnel: Tunnel<C>,
1619}
1620
1621impl<R: ConnectionResolver> IntoInlineConnection<CsrConnection, R>
1622    for CsrConnection<ReferencedConnection>
1623{
1624    fn into_inline_connection(self, r: R) -> CsrConnection {
1625        let CsrConnection {
1626            url,
1627            tls_root_cert,
1628            tls_identity,
1629            http_auth,
1630            tunnel,
1631        } = self;
1632        CsrConnection {
1633            url,
1634            tls_root_cert,
1635            tls_identity,
1636            http_auth,
1637            tunnel: tunnel.into_inline_connection(r),
1638        }
1639    }
1640}
1641
1642impl<C: ConnectionAccess> CsrConnection<C> {
1643    fn validate_by_default(&self) -> bool {
1644        true
1645    }
1646}
1647
1648impl CsrConnection {
1649    /// Constructs a schema registry client from the connection.
1650    pub async fn connect(
1651        &self,
1652        storage_configuration: &StorageConfiguration,
1653        in_task: InTask,
1654    ) -> Result<mz_ccsr::Client, CsrConnectError> {
1655        let mut client_config = mz_ccsr::ClientConfig::new(self.url.clone());
1656        if let Some(root_cert) = &self.tls_root_cert {
1657            let root_cert = root_cert
1658                .get_string(
1659                    in_task,
1660                    &storage_configuration.connection_context.secrets_reader,
1661                )
1662                .await?;
1663            let root_cert = Certificate::from_pem(root_cert.as_bytes())?;
1664            client_config = client_config.add_root_certificate(root_cert);
1665        }
1666
1667        if let Some(tls_identity) = &self.tls_identity {
1668            let key = &storage_configuration
1669                .connection_context
1670                .secrets_reader
1671                .read_string_in_task_if(in_task, tls_identity.key)
1672                .await?;
1673            let cert = tls_identity
1674                .cert
1675                .get_string(
1676                    in_task,
1677                    &storage_configuration.connection_context.secrets_reader,
1678                )
1679                .await?;
1680            let ident = Identity::from_pem(key.as_bytes(), cert.as_bytes())?;
1681            client_config = client_config.identity(ident);
1682        }
1683
1684        if let Some(http_auth) = &self.http_auth {
1685            let username = http_auth
1686                .username
1687                .get_string(
1688                    in_task,
1689                    &storage_configuration.connection_context.secrets_reader,
1690                )
1691                .await?;
1692            let password = match http_auth.password {
1693                None => None,
1694                Some(password) => Some(
1695                    storage_configuration
1696                        .connection_context
1697                        .secrets_reader
1698                        .read_string_in_task_if(in_task, password)
1699                        .await?,
1700                ),
1701            };
1702            client_config = client_config.auth(username, password);
1703        }
1704
1705        // TODO: use types to enforce that the URL has a string hostname.
1706        let host = self
1707            .url
1708            .host_str()
1709            .ok_or_else(|| anyhow!("url missing host"))?;
1710        match &self.tunnel {
1711            Tunnel::Direct => {
1712                // Ensure any host we connect to is resolved to an external address.
1713                let resolved = resolve_address(
1714                    host,
1715                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1716                )
1717                .await?;
1718                client_config = client_config.resolve_to_addrs(
1719                    host,
1720                    &resolved
1721                        .iter()
1722                        .map(|addr| SocketAddr::new(*addr, 0))
1723                        .collect::<Vec<_>>(),
1724                )
1725            }
1726            Tunnel::Ssh(ssh_tunnel) => {
1727                let ssh_tunnel = ssh_tunnel
1728                    .connect(
1729                        storage_configuration,
1730                        host,
1731                        // Honor the URL scheme's default port (443 for https,
1732                        // 80 for http) if no explicit port was provided.
1733                        self.url.port_or_known_default().unwrap_or(80),
1734                        in_task,
1735                    )
1736                    .await
1737                    .map_err(CsrConnectError::Ssh)?;
1738
1739                // Carefully inject the SSH tunnel into the client
1740                // configuration. This is delicate because we need TLS
1741                // verification to continue to use the remote hostname rather
1742                // than the tunnel hostname.
1743
1744                client_config = client_config
1745                    // `resolve_to_addrs` allows us to rewrite the hostname
1746                    // at the DNS level, which means the TCP connection is
1747                    // correctly routed through the tunnel, but TLS verification
1748                    // is still performed against the remote hostname.
1749                    // Unfortunately the port here is ignored if the URL also
1750                    // specifies a port...
1751                    .resolve_to_addrs(host, &[SocketAddr::new(ssh_tunnel.local_addr().ip(), 0)])
1752                    // ...so we also dynamically rewrite the URL to use the
1753                    // current port for the SSH tunnel.
1754                    //
1755                    // WARNING: this is brittle, because we only dynamically
1756                    // update the client configuration with the tunnel *port*,
1757                    // and not the hostname This works fine in practice, because
1758                    // only the SSH tunnel port will change if the tunnel fails
1759                    // and has to be restarted (the hostname is always
1760                    // 127.0.0.1)--but this is an an implementation detail of
1761                    // the SSH tunnel code that we're relying on.
1762                    .dynamic_url({
1763                        let remote_url = self.url.clone();
1764                        move || {
1765                            let mut url = remote_url.clone();
1766                            url.set_port(Some(ssh_tunnel.local_addr().port()))
1767                                .expect("cannot fail");
1768                            url
1769                        }
1770                    });
1771            }
1772            Tunnel::AwsPrivatelink(connection) => {
1773                assert_none!(connection.port);
1774
1775                let privatelink_host = mz_cloud_resources::vpc_endpoint_host(
1776                    connection.connection_id,
1777                    connection.availability_zone.as_deref(),
1778                );
1779                let addrs: Vec<_> = net::lookup_host((privatelink_host, 0))
1780                    .await
1781                    .context("resolving PrivateLink host")?
1782                    .collect();
1783                client_config = client_config.resolve_to_addrs(host, &addrs)
1784            }
1785            Tunnel::AwsPrivatelinks(_) => {
1786                unreachable!("MATCHING broker rules are only available for Kafka connections.");
1787            }
1788        }
1789
1790        Ok(client_config.build()?)
1791    }
1792
1793    async fn validate(
1794        &self,
1795        _id: CatalogItemId,
1796        storage_configuration: &StorageConfiguration,
1797    ) -> Result<(), anyhow::Error> {
1798        let client = self
1799            .connect(
1800                storage_configuration,
1801                // We are in a normal tokio context during validation, already.
1802                InTask::No,
1803            )
1804            .await?;
1805        client.list_subjects().await?;
1806        Ok(())
1807    }
1808}
1809
1810impl<C: ConnectionAccess> AlterCompatible for CsrConnection<C> {
1811    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1812        let CsrConnection {
1813            tunnel,
1814            // All non-tunnel fields may change
1815            url: _,
1816            tls_root_cert: _,
1817            tls_identity: _,
1818            http_auth: _,
1819        } = self;
1820
1821        let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
1822
1823        for (compatible, field) in compatibility_checks {
1824            if !compatible {
1825                tracing::warn!(
1826                    "CsrConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1827                    self,
1828                    other
1829                );
1830
1831                return Err(AlterError { id });
1832            }
1833        }
1834        Ok(())
1835    }
1836}
1837
1838/// A connection to an AWS Glue Schema Registry.
1839///
1840/// AWS credentials, region, and endpoint are inherited from the referenced
1841/// [`AwsConnection`]; this struct only carries the per-registry settings.
1842#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1843pub struct GlueSchemaRegistryConnection<C: ConnectionAccess = InlinedConnection> {
1844    /// The referenced AWS connection that supplies credentials, region, and
1845    /// (optional) endpoint.
1846    pub aws_connection: AwsConnectionReference<C>,
1847    /// The Glue Schema Registry name within the AWS account/region.
1848    pub registry_name: String,
1849}
1850
1851impl<R: ConnectionResolver> IntoInlineConnection<GlueSchemaRegistryConnection, R>
1852    for GlueSchemaRegistryConnection<ReferencedConnection>
1853{
1854    fn into_inline_connection(self, r: R) -> GlueSchemaRegistryConnection {
1855        let GlueSchemaRegistryConnection {
1856            aws_connection,
1857            registry_name,
1858        } = self;
1859        GlueSchemaRegistryConnection {
1860            aws_connection: aws_connection.into_inline_connection(&r),
1861            registry_name,
1862        }
1863    }
1864}
1865
1866impl<C: ConnectionAccess> GlueSchemaRegistryConnection<C> {
1867    fn validate_by_default(&self) -> bool {
1868        // Matches CSR: default-validate so a bad registry name fails at
1869        // `CREATE CONNECTION` rather than surfacing later on first use.
1870        // Users can still opt out with `WITH (VALIDATE = false)`.
1871        true
1872    }
1873}
1874
1875impl GlueSchemaRegistryConnection {
1876    async fn validate(
1877        &self,
1878        _id: CatalogItemId,
1879        storage_configuration: &StorageConfiguration,
1880    ) -> Result<(), anyhow::Error> {
1881        let enforce_external_addresses =
1882            crate::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set());
1883        let sdk_config = self
1884            .aws_connection
1885            .connection
1886            .load_sdk_config(
1887                &storage_configuration.connection_context,
1888                self.aws_connection.connection_id,
1889                // We are in a normal tokio context during validation.
1890                InTask::No,
1891                enforce_external_addresses,
1892            )
1893            .await?;
1894        let client = mz_aws_glue_schema_registry::ClientConfig::new(sdk_config).build();
1895        match client.get_registry(&self.registry_name).await {
1896            Ok(_) => Ok(()),
1897            Err(mz_aws_glue_schema_registry::GetRegistryError::NotFound) => Err(anyhow!(
1898                "AWS Glue Schema Registry {:?} does not exist in the configured account/region",
1899                self.registry_name
1900            )),
1901            Err(err) => Err(anyhow::Error::new(err).context(format!(
1902                "failed to validate AWS Glue Schema Registry connection (registry={:?})",
1903                self.registry_name
1904            ))),
1905        }
1906    }
1907}
1908
1909impl<C: ConnectionAccess> AlterCompatible for GlueSchemaRegistryConnection<C> {
1910    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1911        let GlueSchemaRegistryConnection {
1912            registry_name,
1913            // The referenced AWS connection itself may be swapped; matches
1914            // the permissive policy of MySqlConnection / SqlServerConnection.
1915            aws_connection: _,
1916        } = self;
1917
1918        let compatibility_checks = [(registry_name == &other.registry_name, "registry_name")];
1919
1920        for (compatible, field) in compatibility_checks {
1921            if !compatible {
1922                tracing::warn!(
1923                    "GlueSchemaRegistryConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1924                    self,
1925                    other
1926                );
1927
1928                return Err(AlterError { id });
1929            }
1930        }
1931        Ok(())
1932    }
1933}
1934
1935/// A TLS key pair used for client identity.
1936#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1937pub struct TlsIdentity {
1938    /// The client's TLS public certificate in PEM format.
1939    pub cert: StringOrSecret,
1940    /// The ID of the secret containing the client's TLS private key in PEM
1941    /// format.
1942    pub key: CatalogItemId,
1943}
1944
1945/// HTTP authentication credentials in a [`CsrConnection`].
1946#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1947pub struct CsrConnectionHttpAuth {
1948    /// The username.
1949    pub username: StringOrSecret,
1950    /// The ID of the secret containing the password, if any.
1951    pub password: Option<CatalogItemId>,
1952}
1953
1954/// A connection to a PostgreSQL server.
1955#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1956pub struct PostgresConnection<C: ConnectionAccess = InlinedConnection> {
1957    /// The hostname of the server.
1958    pub host: String,
1959    /// The port of the server.
1960    pub port: u16,
1961    /// The name of the database to connect to.
1962    pub database: String,
1963    /// The username to authenticate as.
1964    pub user: StringOrSecret,
1965    /// An optional password for authentication.
1966    pub password: Option<CatalogItemId>,
1967    /// A tunnel through which to route traffic.
1968    pub tunnel: Tunnel<C>,
1969    /// Whether to use TLS for encryption, authentication, or both.
1970    pub tls_mode: SslMode,
1971    /// An optional root TLS certificate in PEM format, to verify the server's
1972    /// identity.
1973    pub tls_root_cert: Option<StringOrSecret>,
1974    /// An optional TLS client certificate for authentication.
1975    pub tls_identity: Option<TlsIdentity>,
1976}
1977
1978impl<R: ConnectionResolver> IntoInlineConnection<PostgresConnection, R>
1979    for PostgresConnection<ReferencedConnection>
1980{
1981    fn into_inline_connection(self, r: R) -> PostgresConnection {
1982        let PostgresConnection {
1983            host,
1984            port,
1985            database,
1986            user,
1987            password,
1988            tunnel,
1989            tls_mode,
1990            tls_root_cert,
1991            tls_identity,
1992        } = self;
1993
1994        PostgresConnection {
1995            host,
1996            port,
1997            database,
1998            user,
1999            password,
2000            tunnel: tunnel.into_inline_connection(r),
2001            tls_mode,
2002            tls_root_cert,
2003            tls_identity,
2004        }
2005    }
2006}
2007
2008impl<C: ConnectionAccess> PostgresConnection<C> {
2009    fn validate_by_default(&self) -> bool {
2010        true
2011    }
2012}
2013
2014impl PostgresConnection<InlinedConnection> {
2015    pub async fn config(
2016        &self,
2017        secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
2018        storage_configuration: &StorageConfiguration,
2019        in_task: InTask,
2020    ) -> Result<mz_postgres_util::Config, anyhow::Error> {
2021        let params = &storage_configuration.parameters;
2022
2023        let mut config = tokio_postgres::Config::new();
2024        config
2025            .host(&self.host)
2026            .port(self.port)
2027            .dbname(&self.database)
2028            .user(&self.user.get_string(in_task, secrets_reader).await?)
2029            .ssl_mode(self.tls_mode);
2030        if let Some(password) = self.password {
2031            let password = secrets_reader
2032                .read_string_in_task_if(in_task, password)
2033                .await?;
2034            config.password(password);
2035        }
2036        if let Some(tls_root_cert) = &self.tls_root_cert {
2037            let tls_root_cert = tls_root_cert.get_string(in_task, secrets_reader).await?;
2038            config.ssl_root_cert(tls_root_cert.as_bytes());
2039        }
2040        if let Some(tls_identity) = &self.tls_identity {
2041            let cert = tls_identity
2042                .cert
2043                .get_string(in_task, secrets_reader)
2044                .await?;
2045            let key = secrets_reader
2046                .read_string_in_task_if(in_task, tls_identity.key)
2047                .await?;
2048            config.ssl_cert(cert.as_bytes()).ssl_key(key.as_bytes());
2049        }
2050
2051        if let Some(connect_timeout) = params.pg_source_connect_timeout {
2052            config.connect_timeout(connect_timeout);
2053        }
2054        if let Some(keepalives_retries) = params.pg_source_tcp_keepalives_retries {
2055            config.keepalives_retries(keepalives_retries);
2056        }
2057        if let Some(keepalives_idle) = params.pg_source_tcp_keepalives_idle {
2058            config.keepalives_idle(keepalives_idle);
2059        }
2060        if let Some(keepalives_interval) = params.pg_source_tcp_keepalives_interval {
2061            config.keepalives_interval(keepalives_interval);
2062        }
2063        if let Some(tcp_user_timeout) = params.pg_source_tcp_user_timeout {
2064            config.tcp_user_timeout(tcp_user_timeout);
2065        }
2066
2067        let mut options = vec![];
2068        if let Some(wal_sender_timeout) = params.pg_source_wal_sender_timeout {
2069            options.push(format!(
2070                "--wal_sender_timeout={}",
2071                wal_sender_timeout.as_millis()
2072            ));
2073        };
2074        if params.pg_source_tcp_configure_server {
2075            if let Some(keepalives_retries) = params.pg_source_tcp_keepalives_retries {
2076                options.push(format!("--tcp_keepalives_count={}", keepalives_retries));
2077            }
2078            if let Some(keepalives_idle) = params.pg_source_tcp_keepalives_idle {
2079                options.push(format!(
2080                    "--tcp_keepalives_idle={}",
2081                    keepalives_idle.as_secs()
2082                ));
2083            }
2084            if let Some(keepalives_interval) = params.pg_source_tcp_keepalives_interval {
2085                options.push(format!(
2086                    "--tcp_keepalives_interval={}",
2087                    keepalives_interval.as_secs()
2088                ));
2089            }
2090            if let Some(tcp_user_timeout) = params.pg_source_tcp_user_timeout {
2091                options.push(format!(
2092                    "--tcp_user_timeout={}",
2093                    tcp_user_timeout.as_millis()
2094                ));
2095            }
2096        }
2097        config.options(options.join(" ").as_str());
2098
2099        let tunnel = match &self.tunnel {
2100            Tunnel::Direct => {
2101                // Ensure any host we connect to is resolved to an external address.
2102                let resolved = resolve_address(
2103                    &self.host,
2104                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2105                )
2106                .await?;
2107                mz_postgres_util::TunnelConfig::Direct {
2108                    resolved_ips: Some(resolved),
2109                }
2110            }
2111            Tunnel::Ssh(SshTunnel {
2112                connection_id,
2113                connection,
2114            }) => {
2115                let secret = secrets_reader
2116                    .read_in_task_if(in_task, *connection_id)
2117                    .await?;
2118                let key_pair = SshKeyPair::from_bytes(&secret)?;
2119                // Ensure any ssh-bastion host we connect to is resolved to an external address.
2120                let resolved = resolve_address(
2121                    &connection.host,
2122                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2123                )
2124                .await?;
2125                mz_postgres_util::TunnelConfig::Ssh {
2126                    config: SshTunnelConfig {
2127                        host: resolved
2128                            .iter()
2129                            .map(|a| a.to_string())
2130                            .collect::<BTreeSet<_>>(),
2131                        port: connection.port,
2132                        user: connection.user.clone(),
2133                        key_pair,
2134                    },
2135                }
2136            }
2137            Tunnel::AwsPrivatelink(connection) => {
2138                assert_none!(connection.port);
2139                mz_postgres_util::TunnelConfig::AwsPrivatelink {
2140                    connection_id: connection.connection_id,
2141                }
2142            }
2143            Tunnel::AwsPrivatelinks(_) => {
2144                unreachable!("MATCHING broker rules are only available for Kafka connections.");
2145            }
2146        };
2147
2148        Ok(mz_postgres_util::Config::new(
2149            config,
2150            tunnel,
2151            params.ssh_timeout_config,
2152            in_task,
2153        )?)
2154    }
2155
2156    pub async fn validate(
2157        &self,
2158        _id: CatalogItemId,
2159        storage_configuration: &StorageConfiguration,
2160    ) -> Result<mz_postgres_util::Client, anyhow::Error> {
2161        let config = self
2162            .config(
2163                &storage_configuration.connection_context.secrets_reader,
2164                storage_configuration,
2165                // We are in a normal tokio context during validation, already.
2166                InTask::No,
2167            )
2168            .await?;
2169        let client = config
2170            .connect(
2171                "connection validation",
2172                &storage_configuration.connection_context.ssh_tunnel_manager,
2173            )
2174            .await?;
2175
2176        let wal_level = mz_postgres_util::get_wal_level(&client).await?;
2177
2178        if wal_level < mz_postgres_util::replication::WalLevel::Logical {
2179            Err(PostgresConnectionValidationError::InsufficientWalLevel { wal_level })?;
2180        }
2181
2182        let max_wal_senders = mz_postgres_util::get_max_wal_senders(&client).await?;
2183
2184        if max_wal_senders < 1 {
2185            Err(PostgresConnectionValidationError::ReplicationDisabled)?;
2186        }
2187
2188        let available_replication_slots =
2189            mz_postgres_util::available_replication_slots(&client).await?;
2190
2191        // We need 1 replication slot for the snapshots and 1 for the continuing replication
2192        if available_replication_slots < 2 {
2193            Err(
2194                PostgresConnectionValidationError::InsufficientReplicationSlotsAvailable {
2195                    count: 2,
2196                },
2197            )?;
2198        }
2199
2200        Ok(client)
2201    }
2202}
2203
2204#[derive(Debug, Clone, thiserror::Error)]
2205pub enum PostgresConnectionValidationError {
2206    #[error("PostgreSQL server has insufficient number of replication slots available")]
2207    InsufficientReplicationSlotsAvailable { count: usize },
2208    #[error("server must have wal_level >= logical, but has {wal_level}")]
2209    InsufficientWalLevel {
2210        wal_level: mz_postgres_util::replication::WalLevel,
2211    },
2212    #[error("replication disabled on server")]
2213    ReplicationDisabled,
2214}
2215
2216impl PostgresConnectionValidationError {
2217    pub fn detail(&self) -> Option<String> {
2218        match self {
2219            Self::InsufficientReplicationSlotsAvailable { count } => Some(format!(
2220                "executing this statement requires {} replication slot{}",
2221                count,
2222                if *count == 1 { "" } else { "s" }
2223            )),
2224            _ => None,
2225        }
2226    }
2227
2228    pub fn hint(&self) -> Option<String> {
2229        match self {
2230            Self::InsufficientReplicationSlotsAvailable { .. } => Some(
2231                "you might be able to wait for other sources to finish snapshotting and try again"
2232                    .into(),
2233            ),
2234            Self::ReplicationDisabled => Some("set max_wal_senders to a value > 0".into()),
2235            Self::InsufficientWalLevel { .. } => None,
2236        }
2237    }
2238}
2239
2240impl<C: ConnectionAccess> AlterCompatible for PostgresConnection<C> {
2241    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2242        let PostgresConnection {
2243            tunnel,
2244            // All non-tunnel options may change arbitrarily
2245            host: _,
2246            port: _,
2247            database: _,
2248            user: _,
2249            password: _,
2250            tls_mode: _,
2251            tls_root_cert: _,
2252            tls_identity: _,
2253        } = self;
2254
2255        let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
2256
2257        for (compatible, field) in compatibility_checks {
2258            if !compatible {
2259                tracing::warn!(
2260                    "PostgresConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2261                    self,
2262                    other
2263                );
2264
2265                return Err(AlterError { id });
2266            }
2267        }
2268        Ok(())
2269    }
2270}
2271
2272/// Specifies how to tunnel a connection.
2273#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2274pub enum Tunnel<C: ConnectionAccess = InlinedConnection> {
2275    /// No tunneling.
2276    Direct,
2277    /// Via the specified SSH tunnel connection.
2278    Ssh(SshTunnel<C>),
2279    /// Via the specified AWS PrivateLink connection.
2280    AwsPrivatelink(AwsPrivatelink),
2281    AwsPrivatelinks(AwsPrivatelinks),
2282}
2283
2284impl<R: ConnectionResolver> IntoInlineConnection<Tunnel, R> for Tunnel<ReferencedConnection> {
2285    fn into_inline_connection(self, r: R) -> Tunnel {
2286        match self {
2287            Tunnel::Direct => Tunnel::Direct,
2288            Tunnel::Ssh(ssh) => Tunnel::Ssh(ssh.into_inline_connection(r)),
2289            Tunnel::AwsPrivatelink(awspl) => Tunnel::AwsPrivatelink(awspl),
2290            Tunnel::AwsPrivatelinks(x) => Tunnel::AwsPrivatelinks(x),
2291        }
2292    }
2293}
2294
2295impl<C: ConnectionAccess> AlterCompatible for Tunnel<C> {
2296    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2297        let compatible = match (self, other) {
2298            (Self::Ssh(s), Self::Ssh(o)) => s.alter_compatible(id, o).is_ok(),
2299            (s, o) => s == o,
2300        };
2301
2302        if !compatible {
2303            tracing::warn!(
2304                "Tunnel incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
2305                self,
2306                other
2307            );
2308
2309            return Err(AlterError { id });
2310        }
2311
2312        Ok(())
2313    }
2314}
2315
2316/// Specifies which MySQL SSL Mode to use:
2317/// <https://dev.mysql.com/doc/refman/8.0/en/connection-options.html#option_general_ssl-mode>
2318/// This is not available as an enum in the mysql-async crate, so we define our own.
2319#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2320pub enum MySqlSslMode {
2321    Disabled,
2322    Required,
2323    VerifyCa,
2324    VerifyIdentity,
2325}
2326
2327/// A connection to a MySQL server.
2328#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2329pub struct MySqlConnection<C: ConnectionAccess = InlinedConnection> {
2330    /// The hostname of the server.
2331    pub host: String,
2332    /// The port of the server.
2333    pub port: u16,
2334    /// The username to authenticate as.
2335    pub user: StringOrSecret,
2336    /// An optional password for authentication.
2337    pub password: Option<CatalogItemId>,
2338    /// A tunnel through which to route traffic.
2339    pub tunnel: Tunnel<C>,
2340    /// Whether to use TLS for encryption, verify the server's certificate, and identity.
2341    pub tls_mode: MySqlSslMode,
2342    /// An optional root TLS certificate in PEM format, to verify the server's
2343    /// identity.
2344    pub tls_root_cert: Option<StringOrSecret>,
2345    /// An optional TLS client certificate for authentication.
2346    pub tls_identity: Option<TlsIdentity>,
2347    /// Reference to the AWS connection information to be used for IAM authenitcation and
2348    /// assuming AWS roles.
2349    pub aws_connection: Option<AwsConnectionReference<C>>,
2350}
2351
2352impl<R: ConnectionResolver> IntoInlineConnection<MySqlConnection, R>
2353    for MySqlConnection<ReferencedConnection>
2354{
2355    fn into_inline_connection(self, r: R) -> MySqlConnection {
2356        let MySqlConnection {
2357            host,
2358            port,
2359            user,
2360            password,
2361            tunnel,
2362            tls_mode,
2363            tls_root_cert,
2364            tls_identity,
2365            aws_connection,
2366        } = self;
2367
2368        MySqlConnection {
2369            host,
2370            port,
2371            user,
2372            password,
2373            tunnel: tunnel.into_inline_connection(&r),
2374            tls_mode,
2375            tls_root_cert,
2376            tls_identity,
2377            aws_connection: aws_connection.map(|aws| aws.into_inline_connection(&r)),
2378        }
2379    }
2380}
2381
2382impl<C: ConnectionAccess> MySqlConnection<C> {
2383    fn validate_by_default(&self) -> bool {
2384        true
2385    }
2386}
2387
2388impl MySqlConnection<InlinedConnection> {
2389    pub async fn config(
2390        &self,
2391        secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
2392        storage_configuration: &StorageConfiguration,
2393        in_task: InTask,
2394    ) -> Result<mz_mysql_util::Config, anyhow::Error> {
2395        // TODO(roshan): Set appropriate connection timeouts
2396        let mut opts = mysql_async::OptsBuilder::default()
2397            .ip_or_hostname(&self.host)
2398            .tcp_port(self.port)
2399            .user(Some(&self.user.get_string(in_task, secrets_reader).await?));
2400
2401        if let Some(password) = self.password {
2402            let password = secrets_reader
2403                .read_string_in_task_if(in_task, password)
2404                .await?;
2405            opts = opts.pass(Some(password));
2406        }
2407
2408        // Our `MySqlSslMode` enum matches the official MySQL Client `--ssl-mode` parameter values
2409        // which uses opt-in security features (SSL, CA verification, & Identity verification).
2410        // The mysql_async crate `SslOpts` struct uses an opt-out mechanism for each of these, so
2411        // we need to appropriately disable features to match the intent of each enum value.
2412        let mut ssl_opts = match self.tls_mode {
2413            MySqlSslMode::Disabled => None,
2414            MySqlSslMode::Required => Some(
2415                mysql_async::SslOpts::default()
2416                    .with_danger_accept_invalid_certs(true)
2417                    .with_danger_skip_domain_validation(true),
2418            ),
2419            MySqlSslMode::VerifyCa => {
2420                Some(mysql_async::SslOpts::default().with_danger_skip_domain_validation(true))
2421            }
2422            MySqlSslMode::VerifyIdentity => Some(mysql_async::SslOpts::default()),
2423        };
2424
2425        if matches!(
2426            self.tls_mode,
2427            MySqlSslMode::VerifyCa | MySqlSslMode::VerifyIdentity
2428        ) {
2429            if let Some(tls_root_cert) = &self.tls_root_cert {
2430                let tls_root_cert = tls_root_cert.get_string(in_task, secrets_reader).await?;
2431                ssl_opts = ssl_opts.map(|opts| {
2432                    opts.with_root_certs(vec![tls_root_cert.as_bytes().to_vec().into()])
2433                });
2434            }
2435        }
2436
2437        if let Some(identity) = &self.tls_identity {
2438            let key = secrets_reader
2439                .read_string_in_task_if(in_task, identity.key)
2440                .await?;
2441            let cert = identity.cert.get_string(in_task, secrets_reader).await?;
2442            let (der, pass) =
2443                mz_tls_util::pkcs12der_from_pem(key.as_bytes(), cert.as_bytes())?.into_parts();
2444
2445            // Add client identity to SSLOpts
2446            ssl_opts = ssl_opts.map(|opts| {
2447                opts.with_client_identity(Some(
2448                    mysql_async::ClientIdentity::new(der.into()).with_password(pass),
2449                ))
2450            });
2451        }
2452
2453        opts = opts.ssl_opts(ssl_opts);
2454
2455        let tunnel = match &self.tunnel {
2456            Tunnel::Direct => {
2457                // Ensure any host we connect to is resolved to an external address.
2458                let resolved = resolve_address(
2459                    &self.host,
2460                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2461                )
2462                .await?;
2463                mz_mysql_util::TunnelConfig::Direct {
2464                    resolved_ips: Some(resolved),
2465                }
2466            }
2467            Tunnel::Ssh(SshTunnel {
2468                connection_id,
2469                connection,
2470            }) => {
2471                let secret = secrets_reader
2472                    .read_in_task_if(in_task, *connection_id)
2473                    .await?;
2474                let key_pair = SshKeyPair::from_bytes(&secret)?;
2475                // Ensure any ssh-bastion host we connect to is resolved to an external address.
2476                let resolved = resolve_address(
2477                    &connection.host,
2478                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2479                )
2480                .await?;
2481                mz_mysql_util::TunnelConfig::Ssh {
2482                    config: SshTunnelConfig {
2483                        host: resolved
2484                            .iter()
2485                            .map(|a| a.to_string())
2486                            .collect::<BTreeSet<_>>(),
2487                        port: connection.port,
2488                        user: connection.user.clone(),
2489                        key_pair,
2490                    },
2491                }
2492            }
2493            Tunnel::AwsPrivatelink(connection) => {
2494                assert_none!(connection.port);
2495                mz_mysql_util::TunnelConfig::AwsPrivatelink {
2496                    connection_id: connection.connection_id,
2497                }
2498            }
2499            Tunnel::AwsPrivatelinks(_) => {
2500                unreachable!("MATCHING broker rules are only available for Kafka connections.");
2501            }
2502        };
2503
2504        let aws_config = match self.aws_connection.as_ref() {
2505            None => None,
2506            Some(aws_ref) => Some(
2507                aws_ref
2508                    .connection
2509                    .load_sdk_config(
2510                        &storage_configuration.connection_context,
2511                        aws_ref.connection_id,
2512                        in_task,
2513                        ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2514                    )
2515                    .await?,
2516            ),
2517        };
2518
2519        Ok(mz_mysql_util::Config::new(
2520            opts,
2521            tunnel,
2522            storage_configuration.parameters.ssh_timeout_config,
2523            in_task,
2524            storage_configuration
2525                .parameters
2526                .mysql_source_timeouts
2527                .clone(),
2528            aws_config,
2529        )?)
2530    }
2531
2532    pub async fn validate(
2533        &self,
2534        _id: CatalogItemId,
2535        storage_configuration: &StorageConfiguration,
2536    ) -> Result<MySqlConn, MySqlConnectionValidationError> {
2537        let config = self
2538            .config(
2539                &storage_configuration.connection_context.secrets_reader,
2540                storage_configuration,
2541                // We are in a normal tokio context during validation, already.
2542                InTask::No,
2543            )
2544            .await?;
2545        let mut conn = config
2546            .connect(
2547                "connection validation",
2548                &storage_configuration.connection_context.ssh_tunnel_manager,
2549            )
2550            .await?;
2551
2552        // Check if the MySQL database is configured to allow row-based consistent GTID replication
2553        let mut setting_errors = vec![];
2554        let gtid_res = mz_mysql_util::ensure_gtid_consistency(&mut conn).await;
2555        let binlog_res = mz_mysql_util::ensure_full_row_binlog_format(&mut conn).await;
2556        let order_res = mz_mysql_util::ensure_replication_commit_order(&mut conn).await;
2557        for res in [gtid_res, binlog_res, order_res] {
2558            match res {
2559                Err(MySqlError::InvalidSystemSetting {
2560                    setting,
2561                    expected,
2562                    actual,
2563                }) => {
2564                    setting_errors.push((setting, expected, actual));
2565                }
2566                Err(err) => Err(err)?,
2567                Ok(()) => {}
2568            }
2569        }
2570        if !setting_errors.is_empty() {
2571            Err(MySqlConnectionValidationError::ReplicationSettingsError(
2572                setting_errors,
2573            ))?;
2574        }
2575
2576        Ok(conn)
2577    }
2578}
2579
2580#[derive(Debug, thiserror::Error)]
2581pub enum MySqlConnectionValidationError {
2582    #[error("Invalid MySQL system replication settings")]
2583    ReplicationSettingsError(Vec<(String, String, String)>),
2584    #[error(transparent)]
2585    Client(#[from] MySqlError),
2586    #[error("{}", .0.display_with_causes())]
2587    Other(#[from] anyhow::Error),
2588}
2589
2590impl MySqlConnectionValidationError {
2591    pub fn detail(&self) -> Option<String> {
2592        match self {
2593            Self::ReplicationSettingsError(settings) => Some(format!(
2594                "Invalid MySQL system replication settings: {}",
2595                itertools::join(
2596                    settings.iter().map(|(setting, expected, actual)| format!(
2597                        "{}: expected {}, got {}",
2598                        setting, expected, actual
2599                    )),
2600                    "; "
2601                )
2602            )),
2603            _ => None,
2604        }
2605    }
2606
2607    pub fn hint(&self) -> Option<String> {
2608        match self {
2609            Self::ReplicationSettingsError(_) => {
2610                Some("Set the necessary MySQL database system settings.".into())
2611            }
2612            _ => None,
2613        }
2614    }
2615}
2616
2617impl<C: ConnectionAccess> AlterCompatible for MySqlConnection<C> {
2618    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2619        let MySqlConnection {
2620            tunnel,
2621            // All non-tunnel options may change arbitrarily
2622            host: _,
2623            port: _,
2624            user: _,
2625            password: _,
2626            tls_mode: _,
2627            tls_root_cert: _,
2628            tls_identity: _,
2629            aws_connection: _,
2630        } = self;
2631
2632        let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
2633
2634        for (compatible, field) in compatibility_checks {
2635            if !compatible {
2636                tracing::warn!(
2637                    "MySqlConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2638                    self,
2639                    other
2640                );
2641
2642                return Err(AlterError { id });
2643            }
2644        }
2645        Ok(())
2646    }
2647}
2648
2649/// Details how to connect to an instance of Microsoft SQL Server.
2650///
2651/// For specifics of connecting to SQL Server for purposes of creating a
2652/// Materialize Source, see [`SqlServerSourceConnection`] which wraps this type.
2653///
2654/// [`SqlServerSourceConnection`]: crate::sources::SqlServerSourceConnection
2655#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2656pub struct SqlServerConnectionDetails<C: ConnectionAccess = InlinedConnection> {
2657    /// The hostname of the server.
2658    pub host: String,
2659    /// The port of the server.
2660    pub port: u16,
2661    /// Database we should connect to.
2662    pub database: String,
2663    /// The username to authenticate as.
2664    pub user: StringOrSecret,
2665    /// Password used for authentication.
2666    pub password: CatalogItemId,
2667    /// A tunnel through which to route traffic.
2668    pub tunnel: Tunnel<C>,
2669    /// Level of encryption to use for the connection.
2670    pub encryption: mz_sql_server_util::config::EncryptionLevel,
2671    /// Certificate validation policy
2672    pub certificate_validation_policy: mz_sql_server_util::config::CertificateValidationPolicy,
2673    /// TLS CA Certifiecate in PEM format
2674    pub tls_root_cert: Option<StringOrSecret>,
2675}
2676
2677impl<C: ConnectionAccess> SqlServerConnectionDetails<C> {
2678    fn validate_by_default(&self) -> bool {
2679        true
2680    }
2681}
2682
2683impl SqlServerConnectionDetails<InlinedConnection> {
2684    /// Attempts to open a connection to the upstream SQL Server instance.
2685    pub async fn validate(
2686        &self,
2687        _id: CatalogItemId,
2688        storage_configuration: &StorageConfiguration,
2689    ) -> Result<mz_sql_server_util::Client, anyhow::Error> {
2690        let config = self
2691            .resolve_config(
2692                &storage_configuration.connection_context.secrets_reader,
2693                storage_configuration,
2694                InTask::No,
2695            )
2696            .await?;
2697        tracing::debug!(?config, "Validating SQL Server connection");
2698
2699        let mut client = mz_sql_server_util::Client::connect(config).await?;
2700
2701        // Ensure the upstream SQL Server instance is configured to allow CDC.
2702        //
2703        // Run all of the checks necessary and collect the errors to provide the best
2704        // guidance as to which system settings need to be enabled.
2705        let mut replication_errors = vec![];
2706        for error in [
2707            mz_sql_server_util::inspect::ensure_database_cdc_enabled(&mut client).await,
2708            mz_sql_server_util::inspect::ensure_snapshot_isolation_enabled(&mut client).await,
2709            mz_sql_server_util::inspect::ensure_sql_server_agent_running(&mut client).await,
2710        ] {
2711            match error {
2712                Err(mz_sql_server_util::SqlServerError::InvalidSystemSetting {
2713                    name,
2714                    expected,
2715                    actual,
2716                }) => replication_errors.push((name, expected, actual)),
2717                Err(other) => Err(other)?,
2718                Ok(()) => (),
2719            }
2720        }
2721        if !replication_errors.is_empty() {
2722            Err(SqlServerConnectionValidationError::ReplicationSettingsError(replication_errors))?;
2723        }
2724
2725        Ok(client)
2726    }
2727
2728    /// Resolve all of the connection details (e.g. read from the [`SecretsReader`])
2729    /// so the returned [`Config`] can be used to open a connection with the
2730    /// upstream system.
2731    ///
2732    /// The provided [`InTask`] argument determines whether any I/O is run in an
2733    /// [`mz_ore::task`] (i.e. a different thread) or directly in the returned
2734    /// future. The main goal here is to prevent running I/O in timely threads.
2735    ///
2736    /// [`Config`]: mz_sql_server_util::Config
2737    pub async fn resolve_config(
2738        &self,
2739        secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
2740        storage_configuration: &StorageConfiguration,
2741        in_task: InTask,
2742    ) -> Result<mz_sql_server_util::Config, anyhow::Error> {
2743        let dyncfg = storage_configuration.config_set();
2744        let mut inner_config = tiberius::Config::new();
2745
2746        // Setup default connection params.
2747        inner_config.host(&self.host);
2748        inner_config.port(self.port);
2749        inner_config.database(self.database.clone());
2750        inner_config.encryption(self.encryption.into());
2751        match self.certificate_validation_policy {
2752            mz_sql_server_util::config::CertificateValidationPolicy::TrustAll => {
2753                inner_config.trust_cert()
2754            }
2755            mz_sql_server_util::config::CertificateValidationPolicy::VerifyCA => {
2756                inner_config.trust_cert_ca_pem(
2757                    self.tls_root_cert
2758                        .as_ref()
2759                        .unwrap()
2760                        .get_string(in_task, secrets_reader)
2761                        .await
2762                        .context("ca certificate")?,
2763                );
2764            }
2765            mz_sql_server_util::config::CertificateValidationPolicy::VerifySystem => (), // no-op
2766        }
2767
2768        inner_config.application_name("materialize");
2769
2770        // Read our auth settings from
2771        let user = self
2772            .user
2773            .get_string(in_task, secrets_reader)
2774            .await
2775            .context("username")?;
2776        let password = secrets_reader
2777            .read_string_in_task_if(in_task, self.password)
2778            .await
2779            .context("password")?;
2780        // TODO(sql_server3): Support other methods of authentication besides
2781        // username and password.
2782        inner_config.authentication(tiberius::AuthMethod::sql_server(user, password));
2783
2784        // Prevent users from probing our internal network ports by trying to
2785        // connect to localhost, or another non-external IP.
2786        let enforce_external_addresses = ENFORCE_EXTERNAL_ADDRESSES.get(dyncfg);
2787
2788        let tunnel = match &self.tunnel {
2789            Tunnel::Direct => {
2790                let resolved_addresses: Vec<SocketAddr> =
2791                    resolve_address(&self.host, enforce_external_addresses)
2792                        .await?
2793                        .into_iter()
2794                        .map(|ip| SocketAddr::new(ip, self.port))
2795                        .collect();
2796                mz_sql_server_util::config::TunnelConfig::Direct {
2797                    resolved_addresses: resolved_addresses.into_boxed_slice(),
2798                }
2799            }
2800            Tunnel::Ssh(SshTunnel {
2801                connection_id,
2802                connection: ssh_connection,
2803            }) => {
2804                let secret = secrets_reader
2805                    .read_in_task_if(in_task, *connection_id)
2806                    .await
2807                    .context("ssh secret")?;
2808                let key_pair = SshKeyPair::from_bytes(&secret).context("ssh key pair")?;
2809                // Ensure any SSH-bastion host we connect to is resolved to an
2810                // external address.
2811                let addresses = resolve_address(&ssh_connection.host, enforce_external_addresses)
2812                    .await
2813                    .context("ssh tunnel")?;
2814
2815                let config = SshTunnelConfig {
2816                    host: addresses.into_iter().map(|a| a.to_string()).collect(),
2817                    port: ssh_connection.port,
2818                    user: ssh_connection.user.clone(),
2819                    key_pair,
2820                };
2821                mz_sql_server_util::config::TunnelConfig::Ssh {
2822                    config,
2823                    manager: storage_configuration
2824                        .connection_context
2825                        .ssh_tunnel_manager
2826                        .clone(),
2827                    timeout: storage_configuration.parameters.ssh_timeout_config.clone(),
2828                    host: self.host.clone(),
2829                    port: self.port,
2830                }
2831            }
2832            Tunnel::AwsPrivatelink(private_link_connection) => {
2833                assert_none!(private_link_connection.port);
2834                mz_sql_server_util::config::TunnelConfig::AwsPrivatelink {
2835                    connection_id: private_link_connection.connection_id,
2836                    port: self.port,
2837                }
2838            }
2839            Tunnel::AwsPrivatelinks(_) => {
2840                unreachable!("MATCHING broker rules are only available for Kafka connections.");
2841            }
2842        };
2843
2844        Ok(mz_sql_server_util::Config::new(
2845            inner_config,
2846            tunnel,
2847            in_task,
2848        ))
2849    }
2850}
2851
2852#[derive(Debug, Clone, thiserror::Error)]
2853pub enum SqlServerConnectionValidationError {
2854    #[error("Invalid SQL Server system replication settings")]
2855    ReplicationSettingsError(Vec<(String, String, String)>),
2856}
2857
2858impl SqlServerConnectionValidationError {
2859    pub fn detail(&self) -> Option<String> {
2860        match self {
2861            Self::ReplicationSettingsError(settings) => Some(format!(
2862                "Invalid SQL Server system replication settings: {}",
2863                itertools::join(
2864                    settings.iter().map(|(setting, expected, actual)| format!(
2865                        "{}: expected {}, got {}",
2866                        setting, expected, actual
2867                    )),
2868                    "; "
2869                )
2870            )),
2871        }
2872    }
2873
2874    pub fn hint(&self) -> Option<String> {
2875        match self {
2876            _ => None,
2877        }
2878    }
2879}
2880
2881impl<R: ConnectionResolver> IntoInlineConnection<SqlServerConnectionDetails, R>
2882    for SqlServerConnectionDetails<ReferencedConnection>
2883{
2884    fn into_inline_connection(self, r: R) -> SqlServerConnectionDetails {
2885        let SqlServerConnectionDetails {
2886            host,
2887            port,
2888            database,
2889            user,
2890            password,
2891            tunnel,
2892            encryption,
2893            certificate_validation_policy,
2894            tls_root_cert,
2895        } = self;
2896
2897        SqlServerConnectionDetails {
2898            host,
2899            port,
2900            database,
2901            user,
2902            password,
2903            tunnel: tunnel.into_inline_connection(&r),
2904            encryption,
2905            certificate_validation_policy,
2906            tls_root_cert,
2907        }
2908    }
2909}
2910
2911impl<C: ConnectionAccess> AlterCompatible for SqlServerConnectionDetails<C> {
2912    fn alter_compatible(
2913        &self,
2914        id: mz_repr::GlobalId,
2915        other: &Self,
2916    ) -> Result<(), crate::controller::AlterError> {
2917        let SqlServerConnectionDetails {
2918            tunnel,
2919            // TODO(sql_server2): Figure out how these variables are allowed to change.
2920            host: _,
2921            port: _,
2922            database: _,
2923            user: _,
2924            password: _,
2925            encryption: _,
2926            certificate_validation_policy: _,
2927            tls_root_cert: _,
2928        } = self;
2929
2930        let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
2931
2932        for (compatible, field) in compatibility_checks {
2933            if !compatible {
2934                tracing::warn!(
2935                    "SqlServerConnectionDetails incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2936                    self,
2937                    other
2938                );
2939
2940                return Err(AlterError { id });
2941            }
2942        }
2943        Ok(())
2944    }
2945}
2946
2947/// A connection to an SSH tunnel.
2948#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2949pub struct SshConnection {
2950    pub host: String,
2951    pub port: u16,
2952    pub user: String,
2953}
2954
2955use self::inline::{
2956    ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
2957    ReferencedConnection,
2958};
2959
2960impl AlterCompatible for SshConnection {
2961    fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
2962        // Every element of the SSH connection is configurable.
2963        Ok(())
2964    }
2965}
2966
2967/// Specifies an AWS PrivateLink service for a [`Tunnel`].
2968#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2969pub struct AwsPrivatelink {
2970    /// The ID of the connection to the AWS PrivateLink service.
2971    pub connection_id: CatalogItemId,
2972    // The availability zone to use when connecting to the AWS PrivateLink service.
2973    pub availability_zone: Option<String>,
2974    /// The port to use when connecting to the AWS PrivateLink service, if
2975    /// different from the port in [`KafkaBroker::address`].
2976    pub port: Option<u16>,
2977}
2978
2979impl AlterCompatible for AwsPrivatelink {
2980    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2981        let AwsPrivatelink {
2982            connection_id,
2983            availability_zone: _,
2984            port: _,
2985        } = self;
2986
2987        let compatibility_checks = [(connection_id == &other.connection_id, "connection_id")];
2988
2989        for (compatible, field) in compatibility_checks {
2990            if !compatible {
2991                tracing::warn!(
2992                    "AwsPrivatelink incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2993                    self,
2994                    other
2995                );
2996
2997                return Err(AlterError { id });
2998            }
2999        }
3000
3001        Ok(())
3002    }
3003}
3004
3005#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
3006pub struct AwsPrivatelinks {
3007    /// Route to brokers through PrivateLink connections according to these rules.
3008    /// Exact-match rules (no wildcards) are used as bootstrap brokers.
3009    /// Wildcard rules are applied dynamically to discovered brokers.
3010    pub rules: Vec<AwsPrivatelinkRule>,
3011}
3012
3013#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
3014pub struct AwsPrivatelinkRule {
3015    /// Given a broker's host:port, should we use this route?
3016    pub pattern: ConnectionRulePattern,
3017    /// Route to the broker through this PrivateLink connection.
3018    pub to: AwsPrivatelink,
3019}
3020
3021/// Specifies an SSH tunnel connection.
3022#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
3023pub struct SshTunnel<C: ConnectionAccess = InlinedConnection> {
3024    /// id of the ssh connection
3025    pub connection_id: CatalogItemId,
3026    /// ssh connection object
3027    pub connection: C::Ssh,
3028}
3029
3030impl<R: ConnectionResolver> IntoInlineConnection<SshTunnel, R> for SshTunnel<ReferencedConnection> {
3031    fn into_inline_connection(self, r: R) -> SshTunnel {
3032        let SshTunnel {
3033            connection,
3034            connection_id,
3035        } = self;
3036
3037        SshTunnel {
3038            connection: r.resolve_connection(connection).unwrap_ssh(),
3039            connection_id,
3040        }
3041    }
3042}
3043
3044impl SshTunnel<InlinedConnection> {
3045    /// Like [`SshTunnelConfig::connect`], but the SSH key is loaded from a
3046    /// secret.
3047    async fn connect(
3048        &self,
3049        storage_configuration: &StorageConfiguration,
3050        remote_host: &str,
3051        remote_port: u16,
3052        in_task: InTask,
3053    ) -> Result<ManagedSshTunnelHandle, anyhow::Error> {
3054        // Ensure any ssh-bastion host we connect to is resolved to an external address.
3055        let resolved = resolve_address(
3056            &self.connection.host,
3057            ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
3058        )
3059        .await?;
3060        storage_configuration
3061            .connection_context
3062            .ssh_tunnel_manager
3063            .connect(
3064                SshTunnelConfig {
3065                    host: resolved
3066                        .iter()
3067                        .map(|a| a.to_string())
3068                        .collect::<BTreeSet<_>>(),
3069                    port: self.connection.port,
3070                    user: self.connection.user.clone(),
3071                    key_pair: SshKeyPair::from_bytes(
3072                        &storage_configuration
3073                            .connection_context
3074                            .secrets_reader
3075                            .read_in_task_if(in_task, self.connection_id)
3076                            .await?,
3077                    )?,
3078                },
3079                remote_host,
3080                remote_port,
3081                storage_configuration.parameters.ssh_timeout_config,
3082                in_task,
3083            )
3084            .await
3085    }
3086}
3087
3088impl<C: ConnectionAccess> AlterCompatible for SshTunnel<C> {
3089    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
3090        let SshTunnel {
3091            connection_id,
3092            connection,
3093        } = self;
3094
3095        let compatibility_checks = [
3096            (connection_id == &other.connection_id, "connection_id"),
3097            (
3098                connection.alter_compatible(id, &other.connection).is_ok(),
3099                "connection",
3100            ),
3101        ];
3102
3103        for (compatible, field) in compatibility_checks {
3104            if !compatible {
3105                tracing::warn!(
3106                    "SshTunnel incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
3107                    self,
3108                    other
3109                );
3110
3111                return Err(AlterError { id });
3112            }
3113        }
3114
3115        Ok(())
3116    }
3117}
3118
3119impl SshConnection {
3120    #[allow(clippy::unused_async)]
3121    async fn validate(
3122        &self,
3123        id: CatalogItemId,
3124        storage_configuration: &StorageConfiguration,
3125    ) -> Result<(), anyhow::Error> {
3126        let secret = storage_configuration
3127            .connection_context
3128            .secrets_reader
3129            .read_in_task_if(
3130                // We are in a normal tokio context during validation, already.
3131                InTask::No,
3132                id,
3133            )
3134            .await?;
3135        let key_pair = SshKeyPair::from_bytes(&secret)?;
3136
3137        // Ensure any ssh-bastion host we connect to is resolved to an external address.
3138        let resolved = resolve_address(
3139            &self.host,
3140            ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
3141        )
3142        .await?;
3143
3144        let config = SshTunnelConfig {
3145            host: resolved
3146                .iter()
3147                .map(|a| a.to_string())
3148                .collect::<BTreeSet<_>>(),
3149            port: self.port,
3150            user: self.user.clone(),
3151            key_pair,
3152        };
3153        // Note that we do NOT use the `SshTunnelManager` here, as we want to validate that we
3154        // can actually create a new connection to the ssh bastion, without tunneling.
3155        config
3156            .validate(storage_configuration.parameters.ssh_timeout_config)
3157            .await
3158    }
3159
3160    fn validate_by_default(&self) -> bool {
3161        false
3162    }
3163}
3164
3165impl AwsPrivatelinkConnection {
3166    #[allow(clippy::unused_async)]
3167    async fn validate(
3168        &self,
3169        id: CatalogItemId,
3170        storage_configuration: &StorageConfiguration,
3171    ) -> Result<(), anyhow::Error> {
3172        let Some(ref cloud_resource_reader) = storage_configuration
3173            .connection_context
3174            .cloud_resource_reader
3175        else {
3176            return Err(anyhow!("AWS PrivateLink connections are unsupported"));
3177        };
3178
3179        // No need to optionally run this in a task, as we are just validating from envd.
3180        let status = cloud_resource_reader.read(id).await?;
3181
3182        let availability = status
3183            .conditions
3184            .as_ref()
3185            .and_then(|conditions| conditions.iter().find(|c| c.type_ == "Available"));
3186
3187        match availability {
3188            Some(condition) if condition.status == "True" => Ok(()),
3189            Some(condition) => Err(anyhow!("{}", condition.message)),
3190            None => Err(anyhow!("Endpoint availability is unknown")),
3191        }
3192    }
3193
3194    fn validate_by_default(&self) -> bool {
3195        false
3196    }
3197}