mz_storage_types/
connections.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Connection types.
11
12use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::net::SocketAddr;
15use std::sync::Arc;
16
17use anyhow::{Context, anyhow};
18use iceberg::{Catalog, CatalogBuilder};
19use iceberg_catalog_rest::{
20    REST_CATALOG_PROP_URI, REST_CATALOG_PROP_WAREHOUSE, RestCatalogBuilder,
21};
22use itertools::Itertools;
23use mz_ccsr::tls::{Certificate, Identity};
24use mz_cloud_resources::{AwsExternalIdPrefix, CloudResourceReader, vpc_endpoint_host};
25use mz_dyncfg::ConfigSet;
26use mz_kafka_util::client::{
27    BrokerAddr, BrokerRewrite, MzClientContext, MzKafkaError, TunnelConfig, TunnelingClientContext,
28};
29use mz_mysql_util::{MySqlConn, MySqlError};
30use mz_ore::assert_none;
31use mz_ore::error::ErrorExt;
32use mz_ore::future::{InTask, OreFutureExt};
33use mz_ore::netio::DUMMY_DNS_PORT;
34use mz_ore::netio::resolve_address;
35use mz_ore::num::NonNeg;
36use mz_repr::{CatalogItemId, GlobalId};
37use mz_secrets::SecretsReader;
38use mz_ssh_util::keys::SshKeyPair;
39use mz_ssh_util::tunnel::SshTunnelConfig;
40use mz_ssh_util::tunnel_manager::{ManagedSshTunnelHandle, SshTunnelManager};
41use mz_tls_util::Pkcs12Archive;
42use mz_tracing::CloneableEnvFilter;
43use rdkafka::ClientContext;
44use rdkafka::config::FromClientConfigAndContext;
45use rdkafka::consumer::{BaseConsumer, Consumer};
46use regex::Regex;
47use serde::{Deserialize, Deserializer, Serialize};
48use tokio::net;
49use tokio::runtime::Handle;
50use tokio_postgres::config::SslMode;
51use tracing::{debug, warn};
52use url::Url;
53
54use crate::AlterCompatible;
55use crate::configuration::StorageConfiguration;
56use crate::connections::aws::{
57    AwsConnection, AwsConnectionReference, AwsConnectionValidationError,
58};
59use crate::connections::string_or_secret::StringOrSecret;
60use crate::controller::AlterError;
61use crate::dyncfgs::{
62    ENFORCE_EXTERNAL_ADDRESSES, KAFKA_CLIENT_ID_ENRICHMENT_RULES,
63    KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM, KAFKA_RECONNECT_BACKOFF,
64    KAFKA_RECONNECT_BACKOFF_MAX, KAFKA_RETRY_BACKOFF, KAFKA_RETRY_BACKOFF_MAX,
65};
66use crate::errors::{ContextCreationError, CsrConnectError};
67
68pub mod aws;
69pub mod inline;
70pub mod string_or_secret;
71
72const REST_CATALOG_PROP_SCOPE: &str = "scope";
73const REST_CATALOG_PROP_CREDENTIAL: &str = "credential";
74
75/// An extension trait for [`SecretsReader`]
76#[async_trait::async_trait]
77trait SecretsReaderExt {
78    /// `SecretsReader::read`, but optionally run in a task.
79    async fn read_in_task_if(
80        &self,
81        in_task: InTask,
82        id: CatalogItemId,
83    ) -> Result<Vec<u8>, anyhow::Error>;
84
85    /// `SecretsReader::read_string`, but optionally run in a task.
86    async fn read_string_in_task_if(
87        &self,
88        in_task: InTask,
89        id: CatalogItemId,
90    ) -> Result<String, anyhow::Error>;
91}
92
93#[async_trait::async_trait]
94impl SecretsReaderExt for Arc<dyn SecretsReader> {
95    async fn read_in_task_if(
96        &self,
97        in_task: InTask,
98        id: CatalogItemId,
99    ) -> Result<Vec<u8>, anyhow::Error> {
100        let sr = Arc::clone(self);
101        async move { sr.read(id).await }
102            .run_in_task_if(in_task, || "secrets_reader_read".to_string())
103            .await
104    }
105    async fn read_string_in_task_if(
106        &self,
107        in_task: InTask,
108        id: CatalogItemId,
109    ) -> Result<String, anyhow::Error> {
110        let sr = Arc::clone(self);
111        async move { sr.read_string(id).await }
112            .run_in_task_if(in_task, || "secrets_reader_read".to_string())
113            .await
114    }
115}
116
117/// Extra context to pass through when instantiating a connection for a source
118/// or sink.
119///
120/// Should be kept cheaply cloneable.
121#[derive(Debug, Clone)]
122pub struct ConnectionContext {
123    /// An opaque identifier for the environment in which this process is
124    /// running.
125    ///
126    /// The storage layer is intentionally unaware of the structure within this
127    /// identifier. Higher layers of the stack can make use of that structure,
128    /// but the storage layer should be oblivious to it.
129    pub environment_id: String,
130    /// The level for librdkafka's logs.
131    pub librdkafka_log_level: tracing::Level,
132    /// A prefix for an external ID to use for all AWS AssumeRole operations.
133    pub aws_external_id_prefix: Option<AwsExternalIdPrefix>,
134    /// The ARN for a Materialize-controlled role to assume before assuming
135    /// a customer's requested role for an AWS connection.
136    pub aws_connection_role_arn: Option<String>,
137    /// A secrets reader.
138    pub secrets_reader: Arc<dyn SecretsReader>,
139    /// A cloud resource reader, if supported in this configuration.
140    pub cloud_resource_reader: Option<Arc<dyn CloudResourceReader>>,
141    /// A manager for SSH tunnels.
142    pub ssh_tunnel_manager: SshTunnelManager,
143}
144
145impl ConnectionContext {
146    /// Constructs a new connection context from command line arguments.
147    ///
148    /// **WARNING:** it is critical for security that the `aws_external_id` be
149    /// provided by the operator of the Materialize service (i.e., via a CLI
150    /// argument or environment variable) and not the end user of Materialize
151    /// (e.g., via a configuration option in a SQL statement). See
152    /// [`AwsExternalIdPrefix`] for details.
153    pub fn from_cli_args(
154        environment_id: String,
155        startup_log_level: &CloneableEnvFilter,
156        aws_external_id_prefix: Option<AwsExternalIdPrefix>,
157        aws_connection_role_arn: Option<String>,
158        secrets_reader: Arc<dyn SecretsReader>,
159        cloud_resource_reader: Option<Arc<dyn CloudResourceReader>>,
160    ) -> ConnectionContext {
161        ConnectionContext {
162            environment_id,
163            librdkafka_log_level: mz_ore::tracing::crate_level(
164                &startup_log_level.clone().into(),
165                "librdkafka",
166            ),
167            aws_external_id_prefix,
168            aws_connection_role_arn,
169            secrets_reader,
170            cloud_resource_reader,
171            ssh_tunnel_manager: SshTunnelManager::default(),
172        }
173    }
174
175    /// Constructs a new connection context for usage in tests.
176    pub fn for_tests(secrets_reader: Arc<dyn SecretsReader>) -> ConnectionContext {
177        ConnectionContext {
178            environment_id: "test-environment-id".into(),
179            librdkafka_log_level: tracing::Level::INFO,
180            aws_external_id_prefix: Some(
181                AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable(
182                    "test-aws-external-id-prefix",
183                )
184                .expect("infallible"),
185            ),
186            aws_connection_role_arn: Some(
187                "arn:aws:iam::123456789000:role/MaterializeConnection".into(),
188            ),
189            secrets_reader,
190            cloud_resource_reader: None,
191            ssh_tunnel_manager: SshTunnelManager::default(),
192        }
193    }
194}
195
196#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
197pub enum Connection<C: ConnectionAccess = InlinedConnection> {
198    Kafka(KafkaConnection<C>),
199    Csr(CsrConnection<C>),
200    Postgres(PostgresConnection<C>),
201    Ssh(SshConnection),
202    Aws(AwsConnection),
203    AwsPrivatelink(AwsPrivatelinkConnection),
204    MySql(MySqlConnection<C>),
205    SqlServer(SqlServerConnectionDetails<C>),
206    IcebergCatalog(IcebergCatalogConnection<C>),
207}
208
209impl<R: ConnectionResolver> IntoInlineConnection<Connection, R>
210    for Connection<ReferencedConnection>
211{
212    fn into_inline_connection(self, r: R) -> Connection {
213        match self {
214            Connection::Kafka(kafka) => Connection::Kafka(kafka.into_inline_connection(r)),
215            Connection::Csr(csr) => Connection::Csr(csr.into_inline_connection(r)),
216            Connection::Postgres(pg) => Connection::Postgres(pg.into_inline_connection(r)),
217            Connection::Ssh(ssh) => Connection::Ssh(ssh),
218            Connection::Aws(aws) => Connection::Aws(aws),
219            Connection::AwsPrivatelink(awspl) => Connection::AwsPrivatelink(awspl),
220            Connection::MySql(mysql) => Connection::MySql(mysql.into_inline_connection(r)),
221            Connection::SqlServer(sql_server) => {
222                Connection::SqlServer(sql_server.into_inline_connection(r))
223            }
224            Connection::IcebergCatalog(iceberg) => {
225                Connection::IcebergCatalog(iceberg.into_inline_connection(r))
226            }
227        }
228    }
229}
230
231impl<C: ConnectionAccess> Connection<C> {
232    /// Whether this connection should be validated by default on creation.
233    pub fn validate_by_default(&self) -> bool {
234        match self {
235            Connection::Kafka(conn) => conn.validate_by_default(),
236            Connection::Csr(conn) => conn.validate_by_default(),
237            Connection::Postgres(conn) => conn.validate_by_default(),
238            Connection::Ssh(conn) => conn.validate_by_default(),
239            Connection::Aws(conn) => conn.validate_by_default(),
240            Connection::AwsPrivatelink(conn) => conn.validate_by_default(),
241            Connection::MySql(conn) => conn.validate_by_default(),
242            Connection::SqlServer(conn) => conn.validate_by_default(),
243            Connection::IcebergCatalog(conn) => conn.validate_by_default(),
244        }
245    }
246}
247
248impl Connection<InlinedConnection> {
249    /// Validates this connection by attempting to connect to the upstream system.
250    pub async fn validate(
251        &self,
252        id: CatalogItemId,
253        storage_configuration: &StorageConfiguration,
254    ) -> Result<(), ConnectionValidationError> {
255        match self {
256            Connection::Kafka(conn) => conn.validate(id, storage_configuration).await?,
257            Connection::Csr(conn) => conn.validate(id, storage_configuration).await?,
258            Connection::Postgres(conn) => {
259                conn.validate(id, storage_configuration).await?;
260            }
261            Connection::Ssh(conn) => conn.validate(id, storage_configuration).await?,
262            Connection::Aws(conn) => conn.validate(id, storage_configuration).await?,
263            Connection::AwsPrivatelink(conn) => conn.validate(id, storage_configuration).await?,
264            Connection::MySql(conn) => {
265                conn.validate(id, storage_configuration).await?;
266            }
267            Connection::SqlServer(conn) => {
268                conn.validate(id, storage_configuration).await?;
269            }
270            Connection::IcebergCatalog(conn) => conn.validate(id, storage_configuration).await?,
271        }
272        Ok(())
273    }
274
275    pub fn unwrap_kafka(self) -> <InlinedConnection as ConnectionAccess>::Kafka {
276        match self {
277            Self::Kafka(conn) => conn,
278            o => unreachable!("{o:?} is not a Kafka connection"),
279        }
280    }
281
282    pub fn unwrap_pg(self) -> <InlinedConnection as ConnectionAccess>::Pg {
283        match self {
284            Self::Postgres(conn) => conn,
285            o => unreachable!("{o:?} is not a Postgres connection"),
286        }
287    }
288
289    pub fn unwrap_mysql(self) -> <InlinedConnection as ConnectionAccess>::MySql {
290        match self {
291            Self::MySql(conn) => conn,
292            o => unreachable!("{o:?} is not a MySQL connection"),
293        }
294    }
295
296    pub fn unwrap_sql_server(self) -> <InlinedConnection as ConnectionAccess>::SqlServer {
297        match self {
298            Self::SqlServer(conn) => conn,
299            o => unreachable!("{o:?} is not a SQL Server connection"),
300        }
301    }
302
303    pub fn unwrap_aws(self) -> <InlinedConnection as ConnectionAccess>::Aws {
304        match self {
305            Self::Aws(conn) => conn,
306            o => unreachable!("{o:?} is not an AWS connection"),
307        }
308    }
309
310    pub fn unwrap_ssh(self) -> <InlinedConnection as ConnectionAccess>::Ssh {
311        match self {
312            Self::Ssh(conn) => conn,
313            o => unreachable!("{o:?} is not an SSH connection"),
314        }
315    }
316
317    pub fn unwrap_csr(self) -> <InlinedConnection as ConnectionAccess>::Csr {
318        match self {
319            Self::Csr(conn) => conn,
320            o => unreachable!("{o:?} is not a Kafka connection"),
321        }
322    }
323
324    pub fn unwrap_iceberg_catalog(self) -> <InlinedConnection as ConnectionAccess>::IcebergCatalog {
325        match self {
326            Self::IcebergCatalog(conn) => conn,
327            o => unreachable!("{o:?} is not an Iceberg catalog connection"),
328        }
329    }
330}
331
332/// An error returned by [`Connection::validate`].
333#[derive(thiserror::Error, Debug)]
334pub enum ConnectionValidationError {
335    #[error(transparent)]
336    Postgres(#[from] PostgresConnectionValidationError),
337    #[error(transparent)]
338    MySql(#[from] MySqlConnectionValidationError),
339    #[error(transparent)]
340    SqlServer(#[from] SqlServerConnectionValidationError),
341    #[error(transparent)]
342    Aws(#[from] AwsConnectionValidationError),
343    #[error("{}", .0.display_with_causes())]
344    Other(#[from] anyhow::Error),
345}
346
347impl ConnectionValidationError {
348    /// Reports additional details about the error, if any are available.
349    pub fn detail(&self) -> Option<String> {
350        match self {
351            ConnectionValidationError::Postgres(e) => e.detail(),
352            ConnectionValidationError::MySql(e) => e.detail(),
353            ConnectionValidationError::SqlServer(e) => e.detail(),
354            ConnectionValidationError::Aws(e) => e.detail(),
355            ConnectionValidationError::Other(_) => None,
356        }
357    }
358
359    /// Reports a hint for the user about how the error could be fixed.
360    pub fn hint(&self) -> Option<String> {
361        match self {
362            ConnectionValidationError::Postgres(e) => e.hint(),
363            ConnectionValidationError::MySql(e) => e.hint(),
364            ConnectionValidationError::SqlServer(e) => e.hint(),
365            ConnectionValidationError::Aws(e) => e.hint(),
366            ConnectionValidationError::Other(_) => None,
367        }
368    }
369}
370
371impl<C: ConnectionAccess> AlterCompatible for Connection<C> {
372    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
373        match (self, other) {
374            (Self::Aws(s), Self::Aws(o)) => s.alter_compatible(id, o),
375            (Self::AwsPrivatelink(s), Self::AwsPrivatelink(o)) => s.alter_compatible(id, o),
376            (Self::Ssh(s), Self::Ssh(o)) => s.alter_compatible(id, o),
377            (Self::Csr(s), Self::Csr(o)) => s.alter_compatible(id, o),
378            (Self::Kafka(s), Self::Kafka(o)) => s.alter_compatible(id, o),
379            (Self::Postgres(s), Self::Postgres(o)) => s.alter_compatible(id, o),
380            (Self::MySql(s), Self::MySql(o)) => s.alter_compatible(id, o),
381            _ => {
382                tracing::warn!(
383                    "Connection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
384                    self,
385                    other
386                );
387                Err(AlterError { id })
388            }
389        }
390    }
391}
392
393#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
394pub struct RestIcebergCatalog {
395    /// For REST catalogs, the oauth2 credential in a `CLIENT_ID:CLIENT_SECRET` format
396    pub credential: StringOrSecret,
397    /// The oauth2 scope for REST catalogs
398    pub scope: Option<String>,
399    /// The warehouse for REST catalogs
400    pub warehouse: Option<String>,
401}
402
403#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
404pub struct S3TablesRestIcebergCatalog<C: ConnectionAccess = InlinedConnection> {
405    /// The AWS connection details, for s3tables
406    pub aws_connection: AwsConnectionReference<C>,
407    /// The warehouse for s3tables
408    pub warehouse: String,
409}
410
411impl<R: ConnectionResolver> IntoInlineConnection<S3TablesRestIcebergCatalog, R>
412    for S3TablesRestIcebergCatalog<ReferencedConnection>
413{
414    fn into_inline_connection(self, r: R) -> S3TablesRestIcebergCatalog {
415        S3TablesRestIcebergCatalog {
416            aws_connection: self.aws_connection.into_inline_connection(&r),
417            warehouse: self.warehouse,
418        }
419    }
420}
421
422#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
423pub enum IcebergCatalogType {
424    Rest,
425    S3TablesRest,
426}
427
428#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
429pub enum IcebergCatalogImpl<C: ConnectionAccess = InlinedConnection> {
430    Rest(RestIcebergCatalog),
431    S3TablesRest(S3TablesRestIcebergCatalog<C>),
432}
433
434impl<R: ConnectionResolver> IntoInlineConnection<IcebergCatalogImpl, R>
435    for IcebergCatalogImpl<ReferencedConnection>
436{
437    fn into_inline_connection(self, r: R) -> IcebergCatalogImpl {
438        match self {
439            IcebergCatalogImpl::Rest(rest) => IcebergCatalogImpl::Rest(rest),
440            IcebergCatalogImpl::S3TablesRest(s3tables) => {
441                IcebergCatalogImpl::S3TablesRest(s3tables.into_inline_connection(r))
442            }
443        }
444    }
445}
446
447#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
448pub struct IcebergCatalogConnection<C: ConnectionAccess = InlinedConnection> {
449    /// The catalog impl impl of that catalog
450    pub catalog: IcebergCatalogImpl<C>,
451    /// Where the catalog is located
452    pub uri: reqwest::Url,
453}
454
455impl AlterCompatible for IcebergCatalogConnection {
456    fn alter_compatible(&self, id: GlobalId, _other: &Self) -> Result<(), AlterError> {
457        Err(AlterError { id })
458    }
459}
460
461impl<R: ConnectionResolver> IntoInlineConnection<IcebergCatalogConnection, R>
462    for IcebergCatalogConnection<ReferencedConnection>
463{
464    fn into_inline_connection(self, r: R) -> IcebergCatalogConnection {
465        IcebergCatalogConnection {
466            catalog: self.catalog.into_inline_connection(&r),
467            uri: self.uri,
468        }
469    }
470}
471
472impl<C: ConnectionAccess> IcebergCatalogConnection<C> {
473    fn validate_by_default(&self) -> bool {
474        true
475    }
476}
477
478impl IcebergCatalogConnection<InlinedConnection> {
479    pub async fn connect(
480        &self,
481        storage_configuration: &StorageConfiguration,
482        in_task: InTask,
483    ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
484        match self.catalog {
485            IcebergCatalogImpl::S3TablesRest(ref s3tables) => {
486                self.connect_s3tables(s3tables, storage_configuration, in_task)
487                    .await
488            }
489            IcebergCatalogImpl::Rest(ref rest) => {
490                self.connect_rest(rest, storage_configuration, in_task)
491                    .await
492            }
493        }
494    }
495
496    async fn connect_s3tables(
497        &self,
498        s3tables: &S3TablesRestIcebergCatalog,
499        storage_configuration: &StorageConfiguration,
500        in_task: InTask,
501    ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
502        let mut props = BTreeMap::from([(
503            REST_CATALOG_PROP_URI.to_string(),
504            self.uri.to_string().clone(),
505        )]);
506
507        let aws_ref = &s3tables.aws_connection;
508        let aws_config = aws_ref
509            .connection
510            .load_sdk_config(
511                &storage_configuration.connection_context,
512                aws_ref.connection_id,
513                in_task,
514            )
515            .await?;
516
517        props.insert(
518            REST_CATALOG_PROP_WAREHOUSE.to_string(),
519            s3tables.warehouse.clone(),
520        );
521
522        let catalog = RestCatalogBuilder::default()
523            .with_aws_client(aws_config)
524            .load("IcebergCatalog", props.into_iter().collect())
525            .await
526            .map_err(|e| anyhow!("failed to create Iceberg catalog: {e}"))?;
527        Ok(Arc::new(catalog))
528    }
529
530    async fn connect_rest(
531        &self,
532        rest: &RestIcebergCatalog,
533        storage_configuration: &StorageConfiguration,
534        in_task: InTask,
535    ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
536        let mut props = BTreeMap::from([(
537            REST_CATALOG_PROP_URI.to_string(),
538            self.uri.to_string().clone(),
539        )]);
540
541        if let Some(warehouse) = &rest.warehouse {
542            props.insert(REST_CATALOG_PROP_WAREHOUSE.to_string(), warehouse.clone());
543        }
544
545        let credential = rest
546            .credential
547            .get_string(
548                in_task,
549                &storage_configuration.connection_context.secrets_reader,
550            )
551            .await
552            .map_err(|e| anyhow!("failed to read Iceberg catalog credential: {e}"))?;
553        props.insert(REST_CATALOG_PROP_CREDENTIAL.to_string(), credential);
554
555        if let Some(scope) = &rest.scope {
556            props.insert(REST_CATALOG_PROP_SCOPE.to_string(), scope.clone());
557        }
558
559        let catalog = RestCatalogBuilder::default()
560            .load("IcebergCatalog", props.into_iter().collect())
561            .await
562            .map_err(|e| anyhow!("failed to create Iceberg catalog: {e}"))?;
563        Ok(Arc::new(catalog))
564    }
565
566    async fn validate(
567        &self,
568        _id: CatalogItemId,
569        storage_configuration: &StorageConfiguration,
570    ) -> Result<(), ConnectionValidationError> {
571        let catalog = self
572            .connect(storage_configuration, InTask::No)
573            .await
574            .map_err(|e| {
575                ConnectionValidationError::Other(anyhow!("failed to connect to catalog: {e}"))
576            })?;
577
578        // If we can list namespaces, the connection is valid.
579        catalog.list_namespaces(None).await.map_err(|e| {
580            ConnectionValidationError::Other(anyhow!("failed to list namespaces: {e}"))
581        })?;
582
583        Ok(())
584    }
585}
586
587#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
588pub struct AwsPrivatelinkConnection {
589    pub service_name: String,
590    pub availability_zones: Vec<String>,
591}
592
593impl AlterCompatible for AwsPrivatelinkConnection {
594    fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
595        // Every element of the AwsPrivatelinkConnection connection is configurable.
596        Ok(())
597    }
598}
599
600#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
601pub struct KafkaTlsConfig {
602    pub identity: Option<TlsIdentity>,
603    pub root_cert: Option<StringOrSecret>,
604}
605
606#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
607pub struct KafkaSaslConfig<C: ConnectionAccess = InlinedConnection> {
608    pub mechanism: String,
609    pub username: StringOrSecret,
610    pub password: Option<CatalogItemId>,
611    pub aws: Option<AwsConnectionReference<C>>,
612}
613
614impl<R: ConnectionResolver> IntoInlineConnection<KafkaSaslConfig, R>
615    for KafkaSaslConfig<ReferencedConnection>
616{
617    fn into_inline_connection(self, r: R) -> KafkaSaslConfig {
618        KafkaSaslConfig {
619            mechanism: self.mechanism,
620            username: self.username,
621            password: self.password,
622            aws: self.aws.map(|aws| aws.into_inline_connection(&r)),
623        }
624    }
625}
626
627/// Specifies a Kafka broker in a [`KafkaConnection`].
628#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
629pub struct KafkaBroker<C: ConnectionAccess = InlinedConnection> {
630    /// The address of the Kafka broker.
631    pub address: String,
632    /// An optional tunnel to use when connecting to the broker.
633    pub tunnel: Tunnel<C>,
634}
635
636impl<R: ConnectionResolver> IntoInlineConnection<KafkaBroker, R>
637    for KafkaBroker<ReferencedConnection>
638{
639    fn into_inline_connection(self, r: R) -> KafkaBroker {
640        let KafkaBroker { address, tunnel } = self;
641        KafkaBroker {
642            address,
643            tunnel: tunnel.into_inline_connection(r),
644        }
645    }
646}
647
648#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Default)]
649pub struct KafkaTopicOptions {
650    /// The replication factor for the topic.
651    /// If `None`, the broker default will be used.
652    pub replication_factor: Option<NonNeg<i32>>,
653    /// The number of partitions to create.
654    /// If `None`, the broker default will be used.
655    pub partition_count: Option<NonNeg<i32>>,
656    /// The initial configuration parameters for the topic.
657    pub topic_config: BTreeMap<String, String>,
658}
659
660#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
661pub struct KafkaConnection<C: ConnectionAccess = InlinedConnection> {
662    pub brokers: Vec<KafkaBroker<C>>,
663    /// A tunnel through which to route traffic,
664    /// that can be overridden for individual brokers
665    /// in `brokers`.
666    pub default_tunnel: Tunnel<C>,
667    pub progress_topic: Option<String>,
668    pub progress_topic_options: KafkaTopicOptions,
669    pub options: BTreeMap<String, StringOrSecret>,
670    pub tls: Option<KafkaTlsConfig>,
671    pub sasl: Option<KafkaSaslConfig<C>>,
672}
673
674impl<R: ConnectionResolver> IntoInlineConnection<KafkaConnection, R>
675    for KafkaConnection<ReferencedConnection>
676{
677    fn into_inline_connection(self, r: R) -> KafkaConnection {
678        let KafkaConnection {
679            brokers,
680            progress_topic,
681            progress_topic_options,
682            default_tunnel,
683            options,
684            tls,
685            sasl,
686        } = self;
687
688        let brokers = brokers
689            .into_iter()
690            .map(|broker| broker.into_inline_connection(&r))
691            .collect();
692
693        KafkaConnection {
694            brokers,
695            progress_topic,
696            progress_topic_options,
697            default_tunnel: default_tunnel.into_inline_connection(&r),
698            options,
699            tls,
700            sasl: sasl.map(|sasl| sasl.into_inline_connection(&r)),
701        }
702    }
703}
704
705impl<C: ConnectionAccess> KafkaConnection<C> {
706    /// Returns the name of the progress topic to use for the connection.
707    ///
708    /// The caller is responsible for providing the connection ID as it is not
709    /// known to `KafkaConnection`.
710    pub fn progress_topic(
711        &self,
712        connection_context: &ConnectionContext,
713        connection_id: CatalogItemId,
714    ) -> Cow<'_, str> {
715        if let Some(progress_topic) = &self.progress_topic {
716            Cow::Borrowed(progress_topic)
717        } else {
718            Cow::Owned(format!(
719                "_materialize-progress-{}-{}",
720                connection_context.environment_id, connection_id,
721            ))
722        }
723    }
724
725    fn validate_by_default(&self) -> bool {
726        true
727    }
728}
729
730impl KafkaConnection {
731    /// Generates a string that can be used as the base for a configuration ID
732    /// (e.g., `client.id`, `group.id`, `transactional.id`) for a Kafka source
733    /// or sink.
734    pub fn id_base(
735        connection_context: &ConnectionContext,
736        connection_id: CatalogItemId,
737        object_id: GlobalId,
738    ) -> String {
739        format!(
740            "materialize-{}-{}-{}",
741            connection_context.environment_id, connection_id, object_id,
742        )
743    }
744
745    /// Enriches the provided `client_id` according to any enrichment rules in
746    /// the `kafka_client_id_enrichment_rules` configuration parameter.
747    pub fn enrich_client_id(&self, configs: &ConfigSet, client_id: &mut String) {
748        #[derive(Debug, Deserialize)]
749        struct EnrichmentRule {
750            #[serde(deserialize_with = "deserialize_regex")]
751            pattern: Regex,
752            payload: String,
753        }
754
755        fn deserialize_regex<'de, D>(deserializer: D) -> Result<Regex, D::Error>
756        where
757            D: Deserializer<'de>,
758        {
759            let buf = String::deserialize(deserializer)?;
760            Regex::new(&buf).map_err(serde::de::Error::custom)
761        }
762
763        let rules = KAFKA_CLIENT_ID_ENRICHMENT_RULES.get(configs);
764        let rules = match serde_json::from_value::<Vec<EnrichmentRule>>(rules) {
765            Ok(rules) => rules,
766            Err(e) => {
767                warn!(%e, "failed to decode kafka_client_id_enrichment_rules");
768                return;
769            }
770        };
771
772        // Check every rule against every broker. Rules are matched in the order
773        // that they are specified. It is usually a configuration error if
774        // multiple rules match the same list of Kafka brokers, but we
775        // nonetheless want to provide well defined semantics.
776        debug!(?self.brokers, "evaluating client ID enrichment rules");
777        for rule in rules {
778            let is_match = self
779                .brokers
780                .iter()
781                .any(|b| rule.pattern.is_match(&b.address));
782            debug!(?rule, is_match, "evaluated client ID enrichment rule");
783            if is_match {
784                client_id.push('-');
785                client_id.push_str(&rule.payload);
786            }
787        }
788    }
789
790    /// Creates a Kafka client for the connection.
791    pub async fn create_with_context<C, T>(
792        &self,
793        storage_configuration: &StorageConfiguration,
794        context: C,
795        extra_options: &BTreeMap<&str, String>,
796        in_task: InTask,
797    ) -> Result<T, ContextCreationError>
798    where
799        C: ClientContext,
800        T: FromClientConfigAndContext<TunnelingClientContext<C>>,
801    {
802        let mut options = self.options.clone();
803
804        // Ensure that Kafka topics are *not* automatically created when
805        // consuming, producing, or fetching metadata for a topic. This ensures
806        // that we don't accidentally create topics with the wrong number of
807        // partitions.
808        options.insert("allow.auto.create.topics".into(), "false".into());
809
810        let brokers = match &self.default_tunnel {
811            Tunnel::AwsPrivatelink(t) => {
812                assert!(&self.brokers.is_empty());
813
814                let algo = KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM
815                    .get(storage_configuration.config_set());
816                options.insert("ssl.endpoint.identification.algorithm".into(), algo.into());
817
818                // When using a default privatelink tunnel broker/brokers cannot be specified
819                // instead the tunnel connection_id and port are used for the initial connection.
820                format!(
821                    "{}:{}",
822                    vpc_endpoint_host(
823                        t.connection_id,
824                        None, // Default tunnel does not support availability zones.
825                    ),
826                    t.port.unwrap_or(9092)
827                )
828            }
829            _ => self.brokers.iter().map(|b| &b.address).join(","),
830        };
831        options.insert("bootstrap.servers".into(), brokers.into());
832        let security_protocol = match (self.tls.is_some(), self.sasl.is_some()) {
833            (false, false) => "PLAINTEXT",
834            (true, false) => "SSL",
835            (false, true) => "SASL_PLAINTEXT",
836            (true, true) => "SASL_SSL",
837        };
838        options.insert("security.protocol".into(), security_protocol.into());
839        if let Some(tls) = &self.tls {
840            if let Some(root_cert) = &tls.root_cert {
841                options.insert("ssl.ca.pem".into(), root_cert.clone());
842            }
843            if let Some(identity) = &tls.identity {
844                options.insert("ssl.key.pem".into(), StringOrSecret::Secret(identity.key));
845                options.insert("ssl.certificate.pem".into(), identity.cert.clone());
846            }
847        }
848        if let Some(sasl) = &self.sasl {
849            options.insert("sasl.mechanisms".into(), (&sasl.mechanism).into());
850            options.insert("sasl.username".into(), sasl.username.clone());
851            if let Some(password) = sasl.password {
852                options.insert("sasl.password".into(), StringOrSecret::Secret(password));
853            }
854        }
855
856        options.insert(
857            "retry.backoff.ms".into(),
858            KAFKA_RETRY_BACKOFF
859                .get(storage_configuration.config_set())
860                .as_millis()
861                .into(),
862        );
863        options.insert(
864            "retry.backoff.max.ms".into(),
865            KAFKA_RETRY_BACKOFF_MAX
866                .get(storage_configuration.config_set())
867                .as_millis()
868                .into(),
869        );
870        options.insert(
871            "reconnect.backoff.ms".into(),
872            KAFKA_RECONNECT_BACKOFF
873                .get(storage_configuration.config_set())
874                .as_millis()
875                .into(),
876        );
877        options.insert(
878            "reconnect.backoff.max.ms".into(),
879            KAFKA_RECONNECT_BACKOFF_MAX
880                .get(storage_configuration.config_set())
881                .as_millis()
882                .into(),
883        );
884
885        let mut config = mz_kafka_util::client::create_new_client_config(
886            storage_configuration
887                .connection_context
888                .librdkafka_log_level,
889            storage_configuration.parameters.kafka_timeout_config,
890        );
891        for (k, v) in options {
892            config.set(
893                k,
894                v.get_string(
895                    in_task,
896                    &storage_configuration.connection_context.secrets_reader,
897                )
898                .await
899                .context("reading kafka secret")?,
900            );
901        }
902        for (k, v) in extra_options {
903            config.set(*k, v);
904        }
905
906        let aws_config = match self.sasl.as_ref().and_then(|sasl| sasl.aws.as_ref()) {
907            None => None,
908            Some(aws) => Some(
909                aws.connection
910                    .load_sdk_config(
911                        &storage_configuration.connection_context,
912                        aws.connection_id,
913                        in_task,
914                    )
915                    .await?,
916            ),
917        };
918
919        // TODO(roshan): Implement enforcement of external address validation once
920        // rdkafka client has been updated to support providing multiple resolved
921        // addresses for brokers
922        let mut context = TunnelingClientContext::new(
923            context,
924            Handle::current(),
925            storage_configuration
926                .connection_context
927                .ssh_tunnel_manager
928                .clone(),
929            storage_configuration.parameters.ssh_timeout_config,
930            aws_config,
931            in_task,
932        );
933
934        match &self.default_tunnel {
935            Tunnel::Direct => {
936                // By default, don't offer a default override for broker address lookup.
937            }
938            Tunnel::AwsPrivatelink(pl) => {
939                context.set_default_tunnel(TunnelConfig::StaticHost(vpc_endpoint_host(
940                    pl.connection_id,
941                    None, // Default tunnel does not support availability zones.
942                )));
943            }
944            Tunnel::Ssh(ssh_tunnel) => {
945                let secret = storage_configuration
946                    .connection_context
947                    .secrets_reader
948                    .read_in_task_if(in_task, ssh_tunnel.connection_id)
949                    .await?;
950                let key_pair = SshKeyPair::from_bytes(&secret)?;
951
952                // Ensure any ssh-bastion address we connect to is resolved to an external address.
953                let resolved = resolve_address(
954                    &ssh_tunnel.connection.host,
955                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
956                )
957                .await?;
958                context.set_default_tunnel(TunnelConfig::Ssh(SshTunnelConfig {
959                    host: resolved
960                        .iter()
961                        .map(|a| a.to_string())
962                        .collect::<BTreeSet<_>>(),
963                    port: ssh_tunnel.connection.port,
964                    user: ssh_tunnel.connection.user.clone(),
965                    key_pair,
966                }));
967            }
968        }
969
970        for broker in &self.brokers {
971            let mut addr_parts = broker.address.splitn(2, ':');
972            let addr = BrokerAddr {
973                host: addr_parts
974                    .next()
975                    .context("BROKER is not address:port")?
976                    .into(),
977                port: addr_parts
978                    .next()
979                    .unwrap_or("9092")
980                    .parse()
981                    .context("parsing BROKER port")?,
982            };
983            match &broker.tunnel {
984                Tunnel::Direct => {
985                    // By default, don't override broker address lookup.
986                    //
987                    // N.B.
988                    //
989                    // We _could_ pre-setup the default ssh tunnel for all known brokers here, but
990                    // we avoid doing because:
991                    // - Its not necessary.
992                    // - Not doing so makes it easier to test the `FailedDefaultSshTunnel` path
993                    // in the `TunnelingClientContext`.
994                }
995                Tunnel::AwsPrivatelink(aws_privatelink) => {
996                    let host = mz_cloud_resources::vpc_endpoint_host(
997                        aws_privatelink.connection_id,
998                        aws_privatelink.availability_zone.as_deref(),
999                    );
1000                    let port = aws_privatelink.port;
1001                    context.add_broker_rewrite(
1002                        addr,
1003                        BrokerRewrite {
1004                            host: host.clone(),
1005                            port,
1006                        },
1007                    );
1008                }
1009                Tunnel::Ssh(ssh_tunnel) => {
1010                    // Ensure any SSH bastion address we connect to is resolved to an external address.
1011                    let ssh_host_resolved = resolve_address(
1012                        &ssh_tunnel.connection.host,
1013                        ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1014                    )
1015                    .await?;
1016                    context
1017                        .add_ssh_tunnel(
1018                            addr,
1019                            SshTunnelConfig {
1020                                host: ssh_host_resolved
1021                                    .iter()
1022                                    .map(|a| a.to_string())
1023                                    .collect::<BTreeSet<_>>(),
1024                                port: ssh_tunnel.connection.port,
1025                                user: ssh_tunnel.connection.user.clone(),
1026                                key_pair: SshKeyPair::from_bytes(
1027                                    &storage_configuration
1028                                        .connection_context
1029                                        .secrets_reader
1030                                        .read_in_task_if(in_task, ssh_tunnel.connection_id)
1031                                        .await?,
1032                                )?,
1033                            },
1034                        )
1035                        .await
1036                        .map_err(ContextCreationError::Ssh)?;
1037                }
1038            }
1039        }
1040
1041        Ok(config.create_with_context(context)?)
1042    }
1043
1044    async fn validate(
1045        &self,
1046        _id: CatalogItemId,
1047        storage_configuration: &StorageConfiguration,
1048    ) -> Result<(), anyhow::Error> {
1049        let (context, error_rx) = MzClientContext::with_errors();
1050        let consumer: BaseConsumer<_> = self
1051            .create_with_context(
1052                storage_configuration,
1053                context,
1054                &BTreeMap::new(),
1055                // We are in a normal tokio context during validation, already.
1056                InTask::No,
1057            )
1058            .await?;
1059        let consumer = Arc::new(consumer);
1060
1061        let timeout = storage_configuration
1062            .parameters
1063            .kafka_timeout_config
1064            .fetch_metadata_timeout;
1065
1066        // librdkafka doesn't expose an API for determining whether a connection to
1067        // the Kafka cluster has been successfully established. So we make a
1068        // metadata request, though we don't care about the results, so that we can
1069        // report any errors making that request. If the request succeeds, we know
1070        // we were able to contact at least one broker, and that's a good proxy for
1071        // being able to contact all the brokers in the cluster.
1072        //
1073        // The downside of this approach is it produces a generic error message like
1074        // "metadata fetch error" with no additional details. The real networking
1075        // error is buried in the librdkafka logs, which are not visible to users.
1076        let result = mz_ore::task::spawn_blocking(|| "kafka_get_metadata", {
1077            let consumer = Arc::clone(&consumer);
1078            move || consumer.fetch_metadata(None, timeout)
1079        })
1080        .await;
1081        match result {
1082            Ok(_) => Ok(()),
1083            // The error returned by `fetch_metadata` does not provide any details which makes for
1084            // a crappy user facing error message. For this reason we attempt to grab a better
1085            // error message from the client context, which should contain any error logs emitted
1086            // by librdkafka, and fallback to the generic error if there is nothing there.
1087            Err(err) => {
1088                // Multiple errors might have been logged during this validation but some are more
1089                // relevant than others. Specifically, we prefer non-internal errors over internal
1090                // errors since those give much more useful information to the users.
1091                let main_err = error_rx.try_iter().reduce(|cur, new| match cur {
1092                    MzKafkaError::Internal(_) => new,
1093                    _ => cur,
1094                });
1095
1096                // Don't drop the consumer until after we've drained the errors
1097                // channel. Dropping the consumer can introduce spurious errors.
1098                // See database-issues#7432.
1099                drop(consumer);
1100
1101                match main_err {
1102                    Some(err) => Err(err.into()),
1103                    None => Err(err.into()),
1104                }
1105            }
1106        }
1107    }
1108}
1109
1110impl<C: ConnectionAccess> AlterCompatible for KafkaConnection<C> {
1111    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1112        let KafkaConnection {
1113            brokers: _,
1114            default_tunnel: _,
1115            progress_topic,
1116            progress_topic_options,
1117            options: _,
1118            tls: _,
1119            sasl: _,
1120        } = self;
1121
1122        let compatibility_checks = [
1123            (progress_topic == &other.progress_topic, "progress_topic"),
1124            (
1125                progress_topic_options == &other.progress_topic_options,
1126                "progress_topic_options",
1127            ),
1128        ];
1129
1130        for (compatible, field) in compatibility_checks {
1131            if !compatible {
1132                tracing::warn!(
1133                    "KafkaConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1134                    self,
1135                    other
1136                );
1137
1138                return Err(AlterError { id });
1139            }
1140        }
1141
1142        Ok(())
1143    }
1144}
1145
1146/// A connection to a Confluent Schema Registry.
1147#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1148pub struct CsrConnection<C: ConnectionAccess = InlinedConnection> {
1149    /// The URL of the schema registry.
1150    pub url: Url,
1151    /// Trusted root TLS certificate in PEM format.
1152    pub tls_root_cert: Option<StringOrSecret>,
1153    /// An optional TLS client certificate for authentication with the schema
1154    /// registry.
1155    pub tls_identity: Option<TlsIdentity>,
1156    /// Optional HTTP authentication credentials for the schema registry.
1157    pub http_auth: Option<CsrConnectionHttpAuth>,
1158    /// A tunnel through which to route traffic.
1159    pub tunnel: Tunnel<C>,
1160}
1161
1162impl<R: ConnectionResolver> IntoInlineConnection<CsrConnection, R>
1163    for CsrConnection<ReferencedConnection>
1164{
1165    fn into_inline_connection(self, r: R) -> CsrConnection {
1166        let CsrConnection {
1167            url,
1168            tls_root_cert,
1169            tls_identity,
1170            http_auth,
1171            tunnel,
1172        } = self;
1173        CsrConnection {
1174            url,
1175            tls_root_cert,
1176            tls_identity,
1177            http_auth,
1178            tunnel: tunnel.into_inline_connection(r),
1179        }
1180    }
1181}
1182
1183impl<C: ConnectionAccess> CsrConnection<C> {
1184    fn validate_by_default(&self) -> bool {
1185        true
1186    }
1187}
1188
1189impl CsrConnection {
1190    /// Constructs a schema registry client from the connection.
1191    pub async fn connect(
1192        &self,
1193        storage_configuration: &StorageConfiguration,
1194        in_task: InTask,
1195    ) -> Result<mz_ccsr::Client, CsrConnectError> {
1196        let mut client_config = mz_ccsr::ClientConfig::new(self.url.clone());
1197        if let Some(root_cert) = &self.tls_root_cert {
1198            let root_cert = root_cert
1199                .get_string(
1200                    in_task,
1201                    &storage_configuration.connection_context.secrets_reader,
1202                )
1203                .await?;
1204            let root_cert = Certificate::from_pem(root_cert.as_bytes())?;
1205            client_config = client_config.add_root_certificate(root_cert);
1206        }
1207
1208        if let Some(tls_identity) = &self.tls_identity {
1209            let key = &storage_configuration
1210                .connection_context
1211                .secrets_reader
1212                .read_string_in_task_if(in_task, tls_identity.key)
1213                .await?;
1214            let cert = tls_identity
1215                .cert
1216                .get_string(
1217                    in_task,
1218                    &storage_configuration.connection_context.secrets_reader,
1219                )
1220                .await?;
1221            let ident = Identity::from_pem(key.as_bytes(), cert.as_bytes())?;
1222            client_config = client_config.identity(ident);
1223        }
1224
1225        if let Some(http_auth) = &self.http_auth {
1226            let username = http_auth
1227                .username
1228                .get_string(
1229                    in_task,
1230                    &storage_configuration.connection_context.secrets_reader,
1231                )
1232                .await?;
1233            let password = match http_auth.password {
1234                None => None,
1235                Some(password) => Some(
1236                    storage_configuration
1237                        .connection_context
1238                        .secrets_reader
1239                        .read_string_in_task_if(in_task, password)
1240                        .await?,
1241                ),
1242            };
1243            client_config = client_config.auth(username, password);
1244        }
1245
1246        // TODO: use types to enforce that the URL has a string hostname.
1247        let host = self
1248            .url
1249            .host_str()
1250            .ok_or_else(|| anyhow!("url missing host"))?;
1251        match &self.tunnel {
1252            Tunnel::Direct => {
1253                // Ensure any host we connect to is resolved to an external address.
1254                let resolved = resolve_address(
1255                    host,
1256                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1257                )
1258                .await?;
1259                client_config = client_config.resolve_to_addrs(
1260                    host,
1261                    &resolved
1262                        .iter()
1263                        .map(|addr| SocketAddr::new(*addr, DUMMY_DNS_PORT))
1264                        .collect::<Vec<_>>(),
1265                )
1266            }
1267            Tunnel::Ssh(ssh_tunnel) => {
1268                let ssh_tunnel = ssh_tunnel
1269                    .connect(
1270                        storage_configuration,
1271                        host,
1272                        // Default to the default http port, but this
1273                        // could default to 8081...
1274                        self.url.port().unwrap_or(80),
1275                        in_task,
1276                    )
1277                    .await
1278                    .map_err(CsrConnectError::Ssh)?;
1279
1280                // Carefully inject the SSH tunnel into the client
1281                // configuration. This is delicate because we need TLS
1282                // verification to continue to use the remote hostname rather
1283                // than the tunnel hostname.
1284
1285                client_config = client_config
1286                    // `resolve_to_addrs` allows us to rewrite the hostname
1287                    // at the DNS level, which means the TCP connection is
1288                    // correctly routed through the tunnel, but TLS verification
1289                    // is still performed against the remote hostname.
1290                    // Unfortunately the port here is ignored...
1291                    .resolve_to_addrs(
1292                        host,
1293                        &[SocketAddr::new(
1294                            ssh_tunnel.local_addr().ip(),
1295                            DUMMY_DNS_PORT,
1296                        )],
1297                    )
1298                    // ...so we also dynamically rewrite the URL to use the
1299                    // current port for the SSH tunnel.
1300                    //
1301                    // WARNING: this is brittle, because we only dynamically
1302                    // update the client configuration with the tunnel *port*,
1303                    // and not the hostname This works fine in practice, because
1304                    // only the SSH tunnel port will change if the tunnel fails
1305                    // and has to be restarted (the hostname is always
1306                    // 127.0.0.1)--but this is an an implementation detail of
1307                    // the SSH tunnel code that we're relying on.
1308                    .dynamic_url({
1309                        let remote_url = self.url.clone();
1310                        move || {
1311                            let mut url = remote_url.clone();
1312                            url.set_port(Some(ssh_tunnel.local_addr().port()))
1313                                .expect("cannot fail");
1314                            url
1315                        }
1316                    });
1317            }
1318            Tunnel::AwsPrivatelink(connection) => {
1319                assert_none!(connection.port);
1320
1321                let privatelink_host = mz_cloud_resources::vpc_endpoint_host(
1322                    connection.connection_id,
1323                    connection.availability_zone.as_deref(),
1324                );
1325                let addrs: Vec<_> = net::lookup_host((privatelink_host, DUMMY_DNS_PORT))
1326                    .await
1327                    .context("resolving PrivateLink host")?
1328                    .collect();
1329                client_config = client_config.resolve_to_addrs(host, &addrs)
1330            }
1331        }
1332
1333        Ok(client_config.build()?)
1334    }
1335
1336    async fn validate(
1337        &self,
1338        _id: CatalogItemId,
1339        storage_configuration: &StorageConfiguration,
1340    ) -> Result<(), anyhow::Error> {
1341        let client = self
1342            .connect(
1343                storage_configuration,
1344                // We are in a normal tokio context during validation, already.
1345                InTask::No,
1346            )
1347            .await?;
1348        client.list_subjects().await?;
1349        Ok(())
1350    }
1351}
1352
1353impl<C: ConnectionAccess> AlterCompatible for CsrConnection<C> {
1354    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1355        let CsrConnection {
1356            tunnel,
1357            // All non-tunnel fields may change
1358            url: _,
1359            tls_root_cert: _,
1360            tls_identity: _,
1361            http_auth: _,
1362        } = self;
1363
1364        let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
1365
1366        for (compatible, field) in compatibility_checks {
1367            if !compatible {
1368                tracing::warn!(
1369                    "CsrConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1370                    self,
1371                    other
1372                );
1373
1374                return Err(AlterError { id });
1375            }
1376        }
1377        Ok(())
1378    }
1379}
1380
1381/// A TLS key pair used for client identity.
1382#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1383pub struct TlsIdentity {
1384    /// The client's TLS public certificate in PEM format.
1385    pub cert: StringOrSecret,
1386    /// The ID of the secret containing the client's TLS private key in PEM
1387    /// format.
1388    pub key: CatalogItemId,
1389}
1390
1391/// HTTP authentication credentials in a [`CsrConnection`].
1392#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1393pub struct CsrConnectionHttpAuth {
1394    /// The username.
1395    pub username: StringOrSecret,
1396    /// The ID of the secret containing the password, if any.
1397    pub password: Option<CatalogItemId>,
1398}
1399
1400/// A connection to a PostgreSQL server.
1401#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1402pub struct PostgresConnection<C: ConnectionAccess = InlinedConnection> {
1403    /// The hostname of the server.
1404    pub host: String,
1405    /// The port of the server.
1406    pub port: u16,
1407    /// The name of the database to connect to.
1408    pub database: String,
1409    /// The username to authenticate as.
1410    pub user: StringOrSecret,
1411    /// An optional password for authentication.
1412    pub password: Option<CatalogItemId>,
1413    /// A tunnel through which to route traffic.
1414    pub tunnel: Tunnel<C>,
1415    /// Whether to use TLS for encryption, authentication, or both.
1416    pub tls_mode: SslMode,
1417    /// An optional root TLS certificate in PEM format, to verify the server's
1418    /// identity.
1419    pub tls_root_cert: Option<StringOrSecret>,
1420    /// An optional TLS client certificate for authentication.
1421    pub tls_identity: Option<TlsIdentity>,
1422}
1423
1424impl<R: ConnectionResolver> IntoInlineConnection<PostgresConnection, R>
1425    for PostgresConnection<ReferencedConnection>
1426{
1427    fn into_inline_connection(self, r: R) -> PostgresConnection {
1428        let PostgresConnection {
1429            host,
1430            port,
1431            database,
1432            user,
1433            password,
1434            tunnel,
1435            tls_mode,
1436            tls_root_cert,
1437            tls_identity,
1438        } = self;
1439
1440        PostgresConnection {
1441            host,
1442            port,
1443            database,
1444            user,
1445            password,
1446            tunnel: tunnel.into_inline_connection(r),
1447            tls_mode,
1448            tls_root_cert,
1449            tls_identity,
1450        }
1451    }
1452}
1453
1454impl<C: ConnectionAccess> PostgresConnection<C> {
1455    fn validate_by_default(&self) -> bool {
1456        true
1457    }
1458}
1459
1460impl PostgresConnection<InlinedConnection> {
1461    pub async fn config(
1462        &self,
1463        secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
1464        storage_configuration: &StorageConfiguration,
1465        in_task: InTask,
1466    ) -> Result<mz_postgres_util::Config, anyhow::Error> {
1467        let params = &storage_configuration.parameters;
1468
1469        let mut config = tokio_postgres::Config::new();
1470        config
1471            .host(&self.host)
1472            .port(self.port)
1473            .dbname(&self.database)
1474            .user(&self.user.get_string(in_task, secrets_reader).await?)
1475            .ssl_mode(self.tls_mode);
1476        if let Some(password) = self.password {
1477            let password = secrets_reader
1478                .read_string_in_task_if(in_task, password)
1479                .await?;
1480            config.password(password);
1481        }
1482        if let Some(tls_root_cert) = &self.tls_root_cert {
1483            let tls_root_cert = tls_root_cert.get_string(in_task, secrets_reader).await?;
1484            config.ssl_root_cert(tls_root_cert.as_bytes());
1485        }
1486        if let Some(tls_identity) = &self.tls_identity {
1487            let cert = tls_identity
1488                .cert
1489                .get_string(in_task, secrets_reader)
1490                .await?;
1491            let key = secrets_reader
1492                .read_string_in_task_if(in_task, tls_identity.key)
1493                .await?;
1494            config.ssl_cert(cert.as_bytes()).ssl_key(key.as_bytes());
1495        }
1496
1497        if let Some(connect_timeout) = params.pg_source_connect_timeout {
1498            config.connect_timeout(connect_timeout);
1499        }
1500        if let Some(keepalives_retries) = params.pg_source_tcp_keepalives_retries {
1501            config.keepalives_retries(keepalives_retries);
1502        }
1503        if let Some(keepalives_idle) = params.pg_source_tcp_keepalives_idle {
1504            config.keepalives_idle(keepalives_idle);
1505        }
1506        if let Some(keepalives_interval) = params.pg_source_tcp_keepalives_interval {
1507            config.keepalives_interval(keepalives_interval);
1508        }
1509        if let Some(tcp_user_timeout) = params.pg_source_tcp_user_timeout {
1510            config.tcp_user_timeout(tcp_user_timeout);
1511        }
1512
1513        let mut options = vec![];
1514        if let Some(wal_sender_timeout) = params.pg_source_wal_sender_timeout {
1515            options.push(format!(
1516                "--wal_sender_timeout={}",
1517                wal_sender_timeout.as_millis()
1518            ));
1519        };
1520        if params.pg_source_tcp_configure_server {
1521            if let Some(keepalives_retries) = params.pg_source_tcp_keepalives_retries {
1522                options.push(format!("--tcp_keepalives_count={}", keepalives_retries));
1523            }
1524            if let Some(keepalives_idle) = params.pg_source_tcp_keepalives_idle {
1525                options.push(format!(
1526                    "--tcp_keepalives_idle={}",
1527                    keepalives_idle.as_secs()
1528                ));
1529            }
1530            if let Some(keepalives_interval) = params.pg_source_tcp_keepalives_interval {
1531                options.push(format!(
1532                    "--tcp_keepalives_interval={}",
1533                    keepalives_interval.as_secs()
1534                ));
1535            }
1536            if let Some(tcp_user_timeout) = params.pg_source_tcp_user_timeout {
1537                options.push(format!(
1538                    "--tcp_user_timeout={}",
1539                    tcp_user_timeout.as_millis()
1540                ));
1541            }
1542        }
1543        config.options(options.join(" ").as_str());
1544
1545        let tunnel = match &self.tunnel {
1546            Tunnel::Direct => {
1547                // Ensure any host we connect to is resolved to an external address.
1548                let resolved = resolve_address(
1549                    &self.host,
1550                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1551                )
1552                .await?;
1553                mz_postgres_util::TunnelConfig::Direct {
1554                    resolved_ips: Some(resolved),
1555                }
1556            }
1557            Tunnel::Ssh(SshTunnel {
1558                connection_id,
1559                connection,
1560            }) => {
1561                let secret = secrets_reader
1562                    .read_in_task_if(in_task, *connection_id)
1563                    .await?;
1564                let key_pair = SshKeyPair::from_bytes(&secret)?;
1565                // Ensure any ssh-bastion host we connect to is resolved to an external address.
1566                let resolved = resolve_address(
1567                    &connection.host,
1568                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1569                )
1570                .await?;
1571                mz_postgres_util::TunnelConfig::Ssh {
1572                    config: SshTunnelConfig {
1573                        host: resolved
1574                            .iter()
1575                            .map(|a| a.to_string())
1576                            .collect::<BTreeSet<_>>(),
1577                        port: connection.port,
1578                        user: connection.user.clone(),
1579                        key_pair,
1580                    },
1581                }
1582            }
1583            Tunnel::AwsPrivatelink(connection) => {
1584                assert_none!(connection.port);
1585                mz_postgres_util::TunnelConfig::AwsPrivatelink {
1586                    connection_id: connection.connection_id,
1587                }
1588            }
1589        };
1590
1591        Ok(mz_postgres_util::Config::new(
1592            config,
1593            tunnel,
1594            params.ssh_timeout_config,
1595            in_task,
1596        )?)
1597    }
1598
1599    pub async fn validate(
1600        &self,
1601        _id: CatalogItemId,
1602        storage_configuration: &StorageConfiguration,
1603    ) -> Result<mz_postgres_util::Client, anyhow::Error> {
1604        let config = self
1605            .config(
1606                &storage_configuration.connection_context.secrets_reader,
1607                storage_configuration,
1608                // We are in a normal tokio context during validation, already.
1609                InTask::No,
1610            )
1611            .await?;
1612        let client = config
1613            .connect(
1614                "connection validation",
1615                &storage_configuration.connection_context.ssh_tunnel_manager,
1616            )
1617            .await?;
1618
1619        let wal_level = mz_postgres_util::get_wal_level(&client).await?;
1620
1621        if wal_level < mz_postgres_util::replication::WalLevel::Logical {
1622            Err(PostgresConnectionValidationError::InsufficientWalLevel { wal_level })?;
1623        }
1624
1625        let max_wal_senders = mz_postgres_util::get_max_wal_senders(&client).await?;
1626
1627        if max_wal_senders < 1 {
1628            Err(PostgresConnectionValidationError::ReplicationDisabled)?;
1629        }
1630
1631        let available_replication_slots =
1632            mz_postgres_util::available_replication_slots(&client).await?;
1633
1634        // We need 1 replication slot for the snapshots and 1 for the continuing replication
1635        if available_replication_slots < 2 {
1636            Err(
1637                PostgresConnectionValidationError::InsufficientReplicationSlotsAvailable {
1638                    count: 2,
1639                },
1640            )?;
1641        }
1642
1643        Ok(client)
1644    }
1645}
1646
1647#[derive(Debug, Clone, thiserror::Error)]
1648pub enum PostgresConnectionValidationError {
1649    #[error("PostgreSQL server has insufficient number of replication slots available")]
1650    InsufficientReplicationSlotsAvailable { count: usize },
1651    #[error("server must have wal_level >= logical, but has {wal_level}")]
1652    InsufficientWalLevel {
1653        wal_level: mz_postgres_util::replication::WalLevel,
1654    },
1655    #[error("replication disabled on server")]
1656    ReplicationDisabled,
1657}
1658
1659impl PostgresConnectionValidationError {
1660    pub fn detail(&self) -> Option<String> {
1661        match self {
1662            Self::InsufficientReplicationSlotsAvailable { count } => Some(format!(
1663                "executing this statement requires {} replication slot{}",
1664                count,
1665                if *count == 1 { "" } else { "s" }
1666            )),
1667            _ => None,
1668        }
1669    }
1670
1671    pub fn hint(&self) -> Option<String> {
1672        match self {
1673            Self::InsufficientReplicationSlotsAvailable { .. } => Some(
1674                "you might be able to wait for other sources to finish snapshotting and try again"
1675                    .into(),
1676            ),
1677            Self::ReplicationDisabled => Some("set max_wal_senders to a value > 0".into()),
1678            _ => None,
1679        }
1680    }
1681}
1682
1683impl<C: ConnectionAccess> AlterCompatible for PostgresConnection<C> {
1684    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1685        let PostgresConnection {
1686            tunnel,
1687            // All non-tunnel options may change arbitrarily
1688            host: _,
1689            port: _,
1690            database: _,
1691            user: _,
1692            password: _,
1693            tls_mode: _,
1694            tls_root_cert: _,
1695            tls_identity: _,
1696        } = self;
1697
1698        let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
1699
1700        for (compatible, field) in compatibility_checks {
1701            if !compatible {
1702                tracing::warn!(
1703                    "PostgresConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1704                    self,
1705                    other
1706                );
1707
1708                return Err(AlterError { id });
1709            }
1710        }
1711        Ok(())
1712    }
1713}
1714
1715/// Specifies how to tunnel a connection.
1716#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1717pub enum Tunnel<C: ConnectionAccess = InlinedConnection> {
1718    /// No tunneling.
1719    Direct,
1720    /// Via the specified SSH tunnel connection.
1721    Ssh(SshTunnel<C>),
1722    /// Via the specified AWS PrivateLink connection.
1723    AwsPrivatelink(AwsPrivatelink),
1724}
1725
1726impl<R: ConnectionResolver> IntoInlineConnection<Tunnel, R> for Tunnel<ReferencedConnection> {
1727    fn into_inline_connection(self, r: R) -> Tunnel {
1728        match self {
1729            Tunnel::Direct => Tunnel::Direct,
1730            Tunnel::Ssh(ssh) => Tunnel::Ssh(ssh.into_inline_connection(r)),
1731            Tunnel::AwsPrivatelink(awspl) => Tunnel::AwsPrivatelink(awspl),
1732        }
1733    }
1734}
1735
1736impl<C: ConnectionAccess> AlterCompatible for Tunnel<C> {
1737    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1738        let compatible = match (self, other) {
1739            (Self::Ssh(s), Self::Ssh(o)) => s.alter_compatible(id, o).is_ok(),
1740            (s, o) => s == o,
1741        };
1742
1743        if !compatible {
1744            tracing::warn!(
1745                "Tunnel incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
1746                self,
1747                other
1748            );
1749
1750            return Err(AlterError { id });
1751        }
1752
1753        Ok(())
1754    }
1755}
1756
1757/// Specifies which MySQL SSL Mode to use:
1758/// <https://dev.mysql.com/doc/refman/8.0/en/connection-options.html#option_general_ssl-mode>
1759/// This is not available as an enum in the mysql-async crate, so we define our own.
1760#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1761pub enum MySqlSslMode {
1762    Disabled,
1763    Required,
1764    VerifyCa,
1765    VerifyIdentity,
1766}
1767
1768/// A connection to a MySQL server.
1769#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1770pub struct MySqlConnection<C: ConnectionAccess = InlinedConnection> {
1771    /// The hostname of the server.
1772    pub host: String,
1773    /// The port of the server.
1774    pub port: u16,
1775    /// The username to authenticate as.
1776    pub user: StringOrSecret,
1777    /// An optional password for authentication.
1778    pub password: Option<CatalogItemId>,
1779    /// A tunnel through which to route traffic.
1780    pub tunnel: Tunnel<C>,
1781    /// Whether to use TLS for encryption, verify the server's certificate, and identity.
1782    pub tls_mode: MySqlSslMode,
1783    /// An optional root TLS certificate in PEM format, to verify the server's
1784    /// identity.
1785    pub tls_root_cert: Option<StringOrSecret>,
1786    /// An optional TLS client certificate for authentication.
1787    pub tls_identity: Option<TlsIdentity>,
1788    /// Reference to the AWS connection information to be used for IAM authenitcation and
1789    /// assuming AWS roles.
1790    pub aws_connection: Option<AwsConnectionReference<C>>,
1791}
1792
1793impl<R: ConnectionResolver> IntoInlineConnection<MySqlConnection, R>
1794    for MySqlConnection<ReferencedConnection>
1795{
1796    fn into_inline_connection(self, r: R) -> MySqlConnection {
1797        let MySqlConnection {
1798            host,
1799            port,
1800            user,
1801            password,
1802            tunnel,
1803            tls_mode,
1804            tls_root_cert,
1805            tls_identity,
1806            aws_connection,
1807        } = self;
1808
1809        MySqlConnection {
1810            host,
1811            port,
1812            user,
1813            password,
1814            tunnel: tunnel.into_inline_connection(&r),
1815            tls_mode,
1816            tls_root_cert,
1817            tls_identity,
1818            aws_connection: aws_connection.map(|aws| aws.into_inline_connection(&r)),
1819        }
1820    }
1821}
1822
1823impl<C: ConnectionAccess> MySqlConnection<C> {
1824    fn validate_by_default(&self) -> bool {
1825        true
1826    }
1827}
1828
1829impl MySqlConnection<InlinedConnection> {
1830    pub async fn config(
1831        &self,
1832        secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
1833        storage_configuration: &StorageConfiguration,
1834        in_task: InTask,
1835    ) -> Result<mz_mysql_util::Config, anyhow::Error> {
1836        // TODO(roshan): Set appropriate connection timeouts
1837        let mut opts = mysql_async::OptsBuilder::default()
1838            .ip_or_hostname(&self.host)
1839            .tcp_port(self.port)
1840            .user(Some(&self.user.get_string(in_task, secrets_reader).await?));
1841
1842        if let Some(password) = self.password {
1843            let password = secrets_reader
1844                .read_string_in_task_if(in_task, password)
1845                .await?;
1846            opts = opts.pass(Some(password));
1847        }
1848
1849        // Our `MySqlSslMode` enum matches the official MySQL Client `--ssl-mode` parameter values
1850        // which uses opt-in security features (SSL, CA verification, & Identity verification).
1851        // The mysql_async crate `SslOpts` struct uses an opt-out mechanism for each of these, so
1852        // we need to appropriately disable features to match the intent of each enum value.
1853        let mut ssl_opts = match self.tls_mode {
1854            MySqlSslMode::Disabled => None,
1855            MySqlSslMode::Required => Some(
1856                mysql_async::SslOpts::default()
1857                    .with_danger_accept_invalid_certs(true)
1858                    .with_danger_skip_domain_validation(true),
1859            ),
1860            MySqlSslMode::VerifyCa => {
1861                Some(mysql_async::SslOpts::default().with_danger_skip_domain_validation(true))
1862            }
1863            MySqlSslMode::VerifyIdentity => Some(mysql_async::SslOpts::default()),
1864        };
1865
1866        if matches!(
1867            self.tls_mode,
1868            MySqlSslMode::VerifyCa | MySqlSslMode::VerifyIdentity
1869        ) {
1870            if let Some(tls_root_cert) = &self.tls_root_cert {
1871                let tls_root_cert = tls_root_cert.get_string(in_task, secrets_reader).await?;
1872                ssl_opts = ssl_opts.map(|opts| {
1873                    opts.with_root_certs(vec![tls_root_cert.as_bytes().to_vec().into()])
1874                });
1875            }
1876        }
1877
1878        if let Some(identity) = &self.tls_identity {
1879            let key = secrets_reader
1880                .read_string_in_task_if(in_task, identity.key)
1881                .await?;
1882            let cert = identity.cert.get_string(in_task, secrets_reader).await?;
1883            let Pkcs12Archive { der, pass } =
1884                mz_tls_util::pkcs12der_from_pem(key.as_bytes(), cert.as_bytes())?;
1885
1886            // Add client identity to SSLOpts
1887            ssl_opts = ssl_opts.map(|opts| {
1888                opts.with_client_identity(Some(
1889                    mysql_async::ClientIdentity::new(der.into()).with_password(pass),
1890                ))
1891            });
1892        }
1893
1894        opts = opts.ssl_opts(ssl_opts);
1895
1896        let tunnel = match &self.tunnel {
1897            Tunnel::Direct => {
1898                // Ensure any host we connect to is resolved to an external address.
1899                let resolved = resolve_address(
1900                    &self.host,
1901                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1902                )
1903                .await?;
1904                mz_mysql_util::TunnelConfig::Direct {
1905                    resolved_ips: Some(resolved),
1906                }
1907            }
1908            Tunnel::Ssh(SshTunnel {
1909                connection_id,
1910                connection,
1911            }) => {
1912                let secret = secrets_reader
1913                    .read_in_task_if(in_task, *connection_id)
1914                    .await?;
1915                let key_pair = SshKeyPair::from_bytes(&secret)?;
1916                // Ensure any ssh-bastion host we connect to is resolved to an external address.
1917                let resolved = resolve_address(
1918                    &connection.host,
1919                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1920                )
1921                .await?;
1922                mz_mysql_util::TunnelConfig::Ssh {
1923                    config: SshTunnelConfig {
1924                        host: resolved
1925                            .iter()
1926                            .map(|a| a.to_string())
1927                            .collect::<BTreeSet<_>>(),
1928                        port: connection.port,
1929                        user: connection.user.clone(),
1930                        key_pair,
1931                    },
1932                }
1933            }
1934            Tunnel::AwsPrivatelink(connection) => {
1935                assert_none!(connection.port);
1936                mz_mysql_util::TunnelConfig::AwsPrivatelink {
1937                    connection_id: connection.connection_id,
1938                }
1939            }
1940        };
1941
1942        let aws_config = match self.aws_connection.as_ref() {
1943            None => None,
1944            Some(aws_ref) => Some(
1945                aws_ref
1946                    .connection
1947                    .load_sdk_config(
1948                        &storage_configuration.connection_context,
1949                        aws_ref.connection_id,
1950                        in_task,
1951                    )
1952                    .await?,
1953            ),
1954        };
1955
1956        Ok(mz_mysql_util::Config::new(
1957            opts,
1958            tunnel,
1959            storage_configuration.parameters.ssh_timeout_config,
1960            in_task,
1961            storage_configuration
1962                .parameters
1963                .mysql_source_timeouts
1964                .clone(),
1965            aws_config,
1966        )?)
1967    }
1968
1969    pub async fn validate(
1970        &self,
1971        _id: CatalogItemId,
1972        storage_configuration: &StorageConfiguration,
1973    ) -> Result<MySqlConn, MySqlConnectionValidationError> {
1974        let config = self
1975            .config(
1976                &storage_configuration.connection_context.secrets_reader,
1977                storage_configuration,
1978                // We are in a normal tokio context during validation, already.
1979                InTask::No,
1980            )
1981            .await?;
1982        let mut conn = config
1983            .connect(
1984                "connection validation",
1985                &storage_configuration.connection_context.ssh_tunnel_manager,
1986            )
1987            .await?;
1988
1989        // Check if the MySQL database is configured to allow row-based consistent GTID replication
1990        let mut setting_errors = vec![];
1991        let gtid_res = mz_mysql_util::ensure_gtid_consistency(&mut conn).await;
1992        let binlog_res = mz_mysql_util::ensure_full_row_binlog_format(&mut conn).await;
1993        let order_res = mz_mysql_util::ensure_replication_commit_order(&mut conn).await;
1994        for res in [gtid_res, binlog_res, order_res] {
1995            match res {
1996                Err(MySqlError::InvalidSystemSetting {
1997                    setting,
1998                    expected,
1999                    actual,
2000                }) => {
2001                    setting_errors.push((setting, expected, actual));
2002                }
2003                Err(err) => Err(err)?,
2004                Ok(()) => {}
2005            }
2006        }
2007        if !setting_errors.is_empty() {
2008            Err(MySqlConnectionValidationError::ReplicationSettingsError(
2009                setting_errors,
2010            ))?;
2011        }
2012
2013        Ok(conn)
2014    }
2015}
2016
2017#[derive(Debug, thiserror::Error)]
2018pub enum MySqlConnectionValidationError {
2019    #[error("Invalid MySQL system replication settings")]
2020    ReplicationSettingsError(Vec<(String, String, String)>),
2021    #[error(transparent)]
2022    Client(#[from] MySqlError),
2023    #[error("{}", .0.display_with_causes())]
2024    Other(#[from] anyhow::Error),
2025}
2026
2027impl MySqlConnectionValidationError {
2028    pub fn detail(&self) -> Option<String> {
2029        match self {
2030            Self::ReplicationSettingsError(settings) => Some(format!(
2031                "Invalid MySQL system replication settings: {}",
2032                itertools::join(
2033                    settings.iter().map(|(setting, expected, actual)| format!(
2034                        "{}: expected {}, got {}",
2035                        setting, expected, actual
2036                    )),
2037                    "; "
2038                )
2039            )),
2040            _ => None,
2041        }
2042    }
2043
2044    pub fn hint(&self) -> Option<String> {
2045        match self {
2046            Self::ReplicationSettingsError(_) => {
2047                Some("Set the necessary MySQL database system settings.".into())
2048            }
2049            _ => None,
2050        }
2051    }
2052}
2053
2054impl<C: ConnectionAccess> AlterCompatible for MySqlConnection<C> {
2055    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2056        let MySqlConnection {
2057            tunnel,
2058            // All non-tunnel options may change arbitrarily
2059            host: _,
2060            port: _,
2061            user: _,
2062            password: _,
2063            tls_mode: _,
2064            tls_root_cert: _,
2065            tls_identity: _,
2066            aws_connection: _,
2067        } = self;
2068
2069        let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
2070
2071        for (compatible, field) in compatibility_checks {
2072            if !compatible {
2073                tracing::warn!(
2074                    "MySqlConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2075                    self,
2076                    other
2077                );
2078
2079                return Err(AlterError { id });
2080            }
2081        }
2082        Ok(())
2083    }
2084}
2085
2086/// Details how to connect to an instance of Microsoft SQL Server.
2087///
2088/// For specifics of connecting to SQL Server for purposes of creating a
2089/// Materialize Source, see [`SqlServerSourceConnection`] which wraps this type.
2090///
2091/// [`SqlServerSourceConnection`]: crate::sources::SqlServerSourceConnection
2092#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2093pub struct SqlServerConnectionDetails<C: ConnectionAccess = InlinedConnection> {
2094    /// The hostname of the server.
2095    pub host: String,
2096    /// The port of the server.
2097    pub port: u16,
2098    /// Database we should connect to.
2099    pub database: String,
2100    /// The username to authenticate as.
2101    pub user: StringOrSecret,
2102    /// Password used for authentication.
2103    pub password: CatalogItemId,
2104    /// A tunnel through which to route traffic.
2105    pub tunnel: Tunnel<C>,
2106    /// Level of encryption to use for the connection.
2107    pub encryption: mz_sql_server_util::config::EncryptionLevel,
2108    /// Certificate validation policy
2109    pub certificate_validation_policy: mz_sql_server_util::config::CertificateValidationPolicy,
2110    /// TLS CA Certifiecate in PEM format
2111    pub tls_root_cert: Option<StringOrSecret>,
2112}
2113
2114impl<C: ConnectionAccess> SqlServerConnectionDetails<C> {
2115    fn validate_by_default(&self) -> bool {
2116        true
2117    }
2118}
2119
2120impl SqlServerConnectionDetails<InlinedConnection> {
2121    /// Attempts to open a connection to the upstream SQL Server instance.
2122    pub async fn validate(
2123        &self,
2124        _id: CatalogItemId,
2125        storage_configuration: &StorageConfiguration,
2126    ) -> Result<mz_sql_server_util::Client, anyhow::Error> {
2127        let config = self
2128            .resolve_config(
2129                &storage_configuration.connection_context.secrets_reader,
2130                storage_configuration,
2131                InTask::No,
2132            )
2133            .await?;
2134        tracing::debug!(?config, "Validating SQL Server connection");
2135
2136        let mut client = mz_sql_server_util::Client::connect(config).await?;
2137
2138        // Ensure the upstream SQL Server instance is configured to allow CDC.
2139        //
2140        // Run all of the checks necessary and collect the errors to provide the best
2141        // guidance as to which system settings need to be enabled.
2142        let mut replication_errors = vec![];
2143        for error in [
2144            mz_sql_server_util::inspect::ensure_database_cdc_enabled(&mut client).await,
2145            mz_sql_server_util::inspect::ensure_snapshot_isolation_enabled(&mut client).await,
2146            mz_sql_server_util::inspect::ensure_sql_server_agent_running(&mut client).await,
2147        ] {
2148            match error {
2149                Err(mz_sql_server_util::SqlServerError::InvalidSystemSetting {
2150                    name,
2151                    expected,
2152                    actual,
2153                }) => replication_errors.push((name, expected, actual)),
2154                Err(other) => Err(other)?,
2155                Ok(()) => (),
2156            }
2157        }
2158        if !replication_errors.is_empty() {
2159            Err(SqlServerConnectionValidationError::ReplicationSettingsError(replication_errors))?;
2160        }
2161
2162        Ok(client)
2163    }
2164
2165    /// Resolve all of the connection details (e.g. read from the [`SecretsReader`])
2166    /// so the returned [`Config`] can be used to open a connection with the
2167    /// upstream system.
2168    ///
2169    /// The provided [`InTask`] argument determines whether any I/O is run in an
2170    /// [`mz_ore::task`] (i.e. a different thread) or directly in the returned
2171    /// future. The main goal here is to prevent running I/O in timely threads.
2172    ///
2173    /// [`Config`]: mz_sql_server_util::Config
2174    pub async fn resolve_config(
2175        &self,
2176        secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
2177        storage_configuration: &StorageConfiguration,
2178        in_task: InTask,
2179    ) -> Result<mz_sql_server_util::Config, anyhow::Error> {
2180        let dyncfg = storage_configuration.config_set();
2181        let mut inner_config = tiberius::Config::new();
2182
2183        // Setup default connection params.
2184        inner_config.host(&self.host);
2185        inner_config.port(self.port);
2186        inner_config.database(self.database.clone());
2187        inner_config.encryption(self.encryption.into());
2188        match self.certificate_validation_policy {
2189            mz_sql_server_util::config::CertificateValidationPolicy::TrustAll => {
2190                inner_config.trust_cert()
2191            }
2192            mz_sql_server_util::config::CertificateValidationPolicy::VerifyCA => {
2193                inner_config.trust_cert_ca_pem(
2194                    self.tls_root_cert
2195                        .as_ref()
2196                        .unwrap()
2197                        .get_string(in_task, secrets_reader)
2198                        .await
2199                        .context("ca certificate")?,
2200                );
2201            }
2202            mz_sql_server_util::config::CertificateValidationPolicy::VerifySystem => (), // no-op
2203        }
2204
2205        inner_config.application_name("materialize");
2206
2207        // Read our auth settings from
2208        let user = self
2209            .user
2210            .get_string(in_task, secrets_reader)
2211            .await
2212            .context("username")?;
2213        let password = secrets_reader
2214            .read_string_in_task_if(in_task, self.password)
2215            .await
2216            .context("password")?;
2217        // TODO(sql_server3): Support other methods of authentication besides
2218        // username and password.
2219        inner_config.authentication(tiberius::AuthMethod::sql_server(user, password));
2220
2221        // Prevent users from probing our internal network ports by trying to
2222        // connect to localhost, or another non-external IP.
2223        let enfoce_external_addresses = ENFORCE_EXTERNAL_ADDRESSES.get(dyncfg);
2224
2225        let tunnel = match &self.tunnel {
2226            Tunnel::Direct => mz_sql_server_util::config::TunnelConfig::Direct,
2227            Tunnel::Ssh(SshTunnel {
2228                connection_id,
2229                connection: ssh_connection,
2230            }) => {
2231                let secret = secrets_reader
2232                    .read_in_task_if(in_task, *connection_id)
2233                    .await
2234                    .context("ssh secret")?;
2235                let key_pair = SshKeyPair::from_bytes(&secret).context("ssh key pair")?;
2236                // Ensure any SSH-bastion host we connect to is resolved to an
2237                // external address.
2238                let addresses = resolve_address(&ssh_connection.host, enfoce_external_addresses)
2239                    .await
2240                    .context("ssh tunnel")?;
2241
2242                let config = SshTunnelConfig {
2243                    host: addresses.into_iter().map(|a| a.to_string()).collect(),
2244                    port: ssh_connection.port,
2245                    user: ssh_connection.user.clone(),
2246                    key_pair,
2247                };
2248                mz_sql_server_util::config::TunnelConfig::Ssh {
2249                    config,
2250                    manager: storage_configuration
2251                        .connection_context
2252                        .ssh_tunnel_manager
2253                        .clone(),
2254                    timeout: storage_configuration.parameters.ssh_timeout_config.clone(),
2255                    host: self.host.clone(),
2256                    port: self.port,
2257                }
2258            }
2259            Tunnel::AwsPrivatelink(private_link_connection) => {
2260                assert_none!(private_link_connection.port);
2261                mz_sql_server_util::config::TunnelConfig::AwsPrivatelink {
2262                    connection_id: private_link_connection.connection_id,
2263                    port: self.port,
2264                }
2265            }
2266        };
2267
2268        Ok(mz_sql_server_util::Config::new(
2269            inner_config,
2270            tunnel,
2271            in_task,
2272        ))
2273    }
2274}
2275
2276#[derive(Debug, Clone, thiserror::Error)]
2277pub enum SqlServerConnectionValidationError {
2278    #[error("Invalid SQL Server system replication settings")]
2279    ReplicationSettingsError(Vec<(String, String, String)>),
2280}
2281
2282impl SqlServerConnectionValidationError {
2283    pub fn detail(&self) -> Option<String> {
2284        match self {
2285            Self::ReplicationSettingsError(settings) => Some(format!(
2286                "Invalid SQL Server system replication settings: {}",
2287                itertools::join(
2288                    settings.iter().map(|(setting, expected, actual)| format!(
2289                        "{}: expected {}, got {}",
2290                        setting, expected, actual
2291                    )),
2292                    "; "
2293                )
2294            )),
2295        }
2296    }
2297
2298    pub fn hint(&self) -> Option<String> {
2299        match self {
2300            _ => None,
2301        }
2302    }
2303}
2304
2305impl<R: ConnectionResolver> IntoInlineConnection<SqlServerConnectionDetails, R>
2306    for SqlServerConnectionDetails<ReferencedConnection>
2307{
2308    fn into_inline_connection(self, r: R) -> SqlServerConnectionDetails {
2309        let SqlServerConnectionDetails {
2310            host,
2311            port,
2312            database,
2313            user,
2314            password,
2315            tunnel,
2316            encryption,
2317            certificate_validation_policy,
2318            tls_root_cert,
2319        } = self;
2320
2321        SqlServerConnectionDetails {
2322            host,
2323            port,
2324            database,
2325            user,
2326            password,
2327            tunnel: tunnel.into_inline_connection(&r),
2328            encryption,
2329            certificate_validation_policy,
2330            tls_root_cert,
2331        }
2332    }
2333}
2334
2335impl<C: ConnectionAccess> AlterCompatible for SqlServerConnectionDetails<C> {
2336    fn alter_compatible(
2337        &self,
2338        id: mz_repr::GlobalId,
2339        other: &Self,
2340    ) -> Result<(), crate::controller::AlterError> {
2341        let SqlServerConnectionDetails {
2342            tunnel,
2343            // TODO(sql_server2): Figure out how these variables are allowed to change.
2344            host: _,
2345            port: _,
2346            database: _,
2347            user: _,
2348            password: _,
2349            encryption: _,
2350            certificate_validation_policy: _,
2351            tls_root_cert: _,
2352        } = self;
2353
2354        let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
2355
2356        for (compatible, field) in compatibility_checks {
2357            if !compatible {
2358                tracing::warn!(
2359                    "SqlServerConnectionDetails incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2360                    self,
2361                    other
2362                );
2363
2364                return Err(AlterError { id });
2365            }
2366        }
2367        Ok(())
2368    }
2369}
2370
2371/// A connection to an SSH tunnel.
2372#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2373pub struct SshConnection {
2374    pub host: String,
2375    pub port: u16,
2376    pub user: String,
2377}
2378
2379use self::inline::{
2380    ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
2381    ReferencedConnection,
2382};
2383
2384impl AlterCompatible for SshConnection {
2385    fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
2386        // Every element of the SSH connection is configurable.
2387        Ok(())
2388    }
2389}
2390
2391/// Specifies an AWS PrivateLink service for a [`Tunnel`].
2392#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2393pub struct AwsPrivatelink {
2394    /// The ID of the connection to the AWS PrivateLink service.
2395    pub connection_id: CatalogItemId,
2396    // The availability zone to use when connecting to the AWS PrivateLink service.
2397    pub availability_zone: Option<String>,
2398    /// The port to use when connecting to the AWS PrivateLink service, if
2399    /// different from the port in [`KafkaBroker::address`].
2400    pub port: Option<u16>,
2401}
2402
2403impl AlterCompatible for AwsPrivatelink {
2404    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2405        let AwsPrivatelink {
2406            connection_id,
2407            availability_zone: _,
2408            port: _,
2409        } = self;
2410
2411        let compatibility_checks = [(connection_id == &other.connection_id, "connection_id")];
2412
2413        for (compatible, field) in compatibility_checks {
2414            if !compatible {
2415                tracing::warn!(
2416                    "AwsPrivatelink incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2417                    self,
2418                    other
2419                );
2420
2421                return Err(AlterError { id });
2422            }
2423        }
2424
2425        Ok(())
2426    }
2427}
2428
2429/// Specifies an SSH tunnel connection.
2430#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2431pub struct SshTunnel<C: ConnectionAccess = InlinedConnection> {
2432    /// id of the ssh connection
2433    pub connection_id: CatalogItemId,
2434    /// ssh connection object
2435    pub connection: C::Ssh,
2436}
2437
2438impl<R: ConnectionResolver> IntoInlineConnection<SshTunnel, R> for SshTunnel<ReferencedConnection> {
2439    fn into_inline_connection(self, r: R) -> SshTunnel {
2440        let SshTunnel {
2441            connection,
2442            connection_id,
2443        } = self;
2444
2445        SshTunnel {
2446            connection: r.resolve_connection(connection).unwrap_ssh(),
2447            connection_id,
2448        }
2449    }
2450}
2451
2452impl SshTunnel<InlinedConnection> {
2453    /// Like [`SshTunnelConfig::connect`], but the SSH key is loaded from a
2454    /// secret.
2455    async fn connect(
2456        &self,
2457        storage_configuration: &StorageConfiguration,
2458        remote_host: &str,
2459        remote_port: u16,
2460        in_task: InTask,
2461    ) -> Result<ManagedSshTunnelHandle, anyhow::Error> {
2462        // Ensure any ssh-bastion host we connect to is resolved to an external address.
2463        let resolved = resolve_address(
2464            &self.connection.host,
2465            ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2466        )
2467        .await?;
2468        storage_configuration
2469            .connection_context
2470            .ssh_tunnel_manager
2471            .connect(
2472                SshTunnelConfig {
2473                    host: resolved
2474                        .iter()
2475                        .map(|a| a.to_string())
2476                        .collect::<BTreeSet<_>>(),
2477                    port: self.connection.port,
2478                    user: self.connection.user.clone(),
2479                    key_pair: SshKeyPair::from_bytes(
2480                        &storage_configuration
2481                            .connection_context
2482                            .secrets_reader
2483                            .read_in_task_if(in_task, self.connection_id)
2484                            .await?,
2485                    )?,
2486                },
2487                remote_host,
2488                remote_port,
2489                storage_configuration.parameters.ssh_timeout_config,
2490                in_task,
2491            )
2492            .await
2493    }
2494}
2495
2496impl<C: ConnectionAccess> AlterCompatible for SshTunnel<C> {
2497    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2498        let SshTunnel {
2499            connection_id,
2500            connection,
2501        } = self;
2502
2503        let compatibility_checks = [
2504            (connection_id == &other.connection_id, "connection_id"),
2505            (
2506                connection.alter_compatible(id, &other.connection).is_ok(),
2507                "connection",
2508            ),
2509        ];
2510
2511        for (compatible, field) in compatibility_checks {
2512            if !compatible {
2513                tracing::warn!(
2514                    "SshTunnel incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2515                    self,
2516                    other
2517                );
2518
2519                return Err(AlterError { id });
2520            }
2521        }
2522
2523        Ok(())
2524    }
2525}
2526
2527impl SshConnection {
2528    #[allow(clippy::unused_async)]
2529    async fn validate(
2530        &self,
2531        id: CatalogItemId,
2532        storage_configuration: &StorageConfiguration,
2533    ) -> Result<(), anyhow::Error> {
2534        let secret = storage_configuration
2535            .connection_context
2536            .secrets_reader
2537            .read_in_task_if(
2538                // We are in a normal tokio context during validation, already.
2539                InTask::No,
2540                id,
2541            )
2542            .await?;
2543        let key_pair = SshKeyPair::from_bytes(&secret)?;
2544
2545        // Ensure any ssh-bastion host we connect to is resolved to an external address.
2546        let resolved = resolve_address(
2547            &self.host,
2548            ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2549        )
2550        .await?;
2551
2552        let config = SshTunnelConfig {
2553            host: resolved
2554                .iter()
2555                .map(|a| a.to_string())
2556                .collect::<BTreeSet<_>>(),
2557            port: self.port,
2558            user: self.user.clone(),
2559            key_pair,
2560        };
2561        // Note that we do NOT use the `SshTunnelManager` here, as we want to validate that we
2562        // can actually create a new connection to the ssh bastion, without tunneling.
2563        config
2564            .validate(storage_configuration.parameters.ssh_timeout_config)
2565            .await
2566    }
2567
2568    fn validate_by_default(&self) -> bool {
2569        false
2570    }
2571}
2572
2573impl AwsPrivatelinkConnection {
2574    #[allow(clippy::unused_async)]
2575    async fn validate(
2576        &self,
2577        id: CatalogItemId,
2578        storage_configuration: &StorageConfiguration,
2579    ) -> Result<(), anyhow::Error> {
2580        let Some(ref cloud_resource_reader) = storage_configuration
2581            .connection_context
2582            .cloud_resource_reader
2583        else {
2584            return Err(anyhow!("AWS PrivateLink connections are unsupported"));
2585        };
2586
2587        // No need to optionally run this in a task, as we are just validating from envd.
2588        let status = cloud_resource_reader.read(id).await?;
2589
2590        let availability = status
2591            .conditions
2592            .as_ref()
2593            .and_then(|conditions| conditions.iter().find(|c| c.type_ == "Available"));
2594
2595        match availability {
2596            Some(condition) if condition.status == "True" => Ok(()),
2597            Some(condition) => Err(anyhow!("{}", condition.message)),
2598            None => Err(anyhow!("Endpoint availability is unknown")),
2599        }
2600    }
2601
2602    fn validate_by_default(&self) -> bool {
2603        false
2604    }
2605}