1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::net::SocketAddr;
15use std::sync::Arc;
16
17use anyhow::{Context, anyhow};
18use async_trait::async_trait;
19use aws_credential_types::provider::ProvideCredentials;
20use iceberg::Catalog;
21use iceberg::CatalogBuilder;
22use iceberg::io::{
23 AwsCredential, AwsCredentialLoad, CustomAwsCredentialLoader, S3_ACCESS_KEY_ID,
24 S3_DISABLE_EC2_METADATA, S3_REGION, S3_SECRET_ACCESS_KEY,
25};
26use iceberg_catalog_rest::{
27 REST_CATALOG_PROP_URI, REST_CATALOG_PROP_WAREHOUSE, RestCatalogBuilder,
28};
29use itertools::Itertools;
30use mz_ccsr::tls::{Certificate, Identity};
31use mz_cloud_resources::{AwsExternalIdPrefix, CloudResourceReader, vpc_endpoint_host};
32use mz_dyncfg::ConfigSet;
33use mz_kafka_util::client::{
34 BrokerAddr, BrokerRewrite, MzClientContext, MzKafkaError, TunnelConfig, TunnelingClientContext,
35};
36use mz_mysql_util::{MySqlConn, MySqlError};
37use mz_ore::assert_none;
38use mz_ore::error::ErrorExt;
39use mz_ore::future::{InTask, OreFutureExt};
40use mz_ore::netio::resolve_address;
41use mz_ore::num::NonNeg;
42use mz_repr::{CatalogItemId, GlobalId};
43use mz_secrets::SecretsReader;
44use mz_ssh_util::keys::SshKeyPair;
45use mz_ssh_util::tunnel::SshTunnelConfig;
46use mz_ssh_util::tunnel_manager::{ManagedSshTunnelHandle, SshTunnelManager};
47use mz_tls_util::Pkcs12Archive;
48use mz_tracing::CloneableEnvFilter;
49use rdkafka::ClientContext;
50use rdkafka::config::FromClientConfigAndContext;
51use rdkafka::consumer::{BaseConsumer, Consumer};
52use regex::Regex;
53use serde::{Deserialize, Deserializer, Serialize};
54use tokio::net;
55use tokio::runtime::Handle;
56use tokio_postgres::config::SslMode;
57use tracing::{debug, warn};
58use url::Url;
59
60use crate::AlterCompatible;
61use crate::configuration::StorageConfiguration;
62use crate::connections::aws::{
63 AwsAuth, AwsConnection, AwsConnectionReference, AwsConnectionValidationError,
64};
65use crate::connections::string_or_secret::StringOrSecret;
66use crate::controller::AlterError;
67use crate::dyncfgs::{
68 ENFORCE_EXTERNAL_ADDRESSES, KAFKA_CLIENT_ID_ENRICHMENT_RULES,
69 KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM, KAFKA_RECONNECT_BACKOFF,
70 KAFKA_RECONNECT_BACKOFF_MAX, KAFKA_RETRY_BACKOFF, KAFKA_RETRY_BACKOFF_MAX,
71};
72use crate::errors::{ContextCreationError, CsrConnectError};
73
74pub mod aws;
75pub mod inline;
76pub mod string_or_secret;
77
78const REST_CATALOG_PROP_SCOPE: &str = "scope";
79const REST_CATALOG_PROP_CREDENTIAL: &str = "credential";
80
81struct AwsSdkCredentialLoader {
89 provider: aws_credential_types::provider::SharedCredentialsProvider,
92}
93
94impl AwsSdkCredentialLoader {
95 fn new(provider: aws_credential_types::provider::SharedCredentialsProvider) -> Self {
96 Self { provider }
97 }
98}
99
100#[async_trait]
101impl AwsCredentialLoad for AwsSdkCredentialLoader {
102 async fn load_credential(
103 &self,
104 _client: reqwest::Client,
105 ) -> anyhow::Result<Option<AwsCredential>> {
106 let creds = self
107 .provider
108 .provide_credentials()
109 .await
110 .context("failed to load AWS credentials from SDK provider")?;
111
112 Ok(Some(AwsCredential {
113 access_key_id: creds.access_key_id().to_string(),
114 secret_access_key: creds.secret_access_key().to_string(),
115 session_token: creds.session_token().map(|s| s.to_string()),
116 expires_in: creds.expiry().map(|t| t.into()),
117 }))
118 }
119}
120
121#[async_trait::async_trait]
123trait SecretsReaderExt {
124 async fn read_in_task_if(
126 &self,
127 in_task: InTask,
128 id: CatalogItemId,
129 ) -> Result<Vec<u8>, anyhow::Error>;
130
131 async fn read_string_in_task_if(
133 &self,
134 in_task: InTask,
135 id: CatalogItemId,
136 ) -> Result<String, anyhow::Error>;
137}
138
139#[async_trait::async_trait]
140impl SecretsReaderExt for Arc<dyn SecretsReader> {
141 async fn read_in_task_if(
142 &self,
143 in_task: InTask,
144 id: CatalogItemId,
145 ) -> Result<Vec<u8>, anyhow::Error> {
146 let sr = Arc::clone(self);
147 async move { sr.read(id).await }
148 .run_in_task_if(in_task, || "secrets_reader_read".to_string())
149 .await
150 }
151 async fn read_string_in_task_if(
152 &self,
153 in_task: InTask,
154 id: CatalogItemId,
155 ) -> Result<String, anyhow::Error> {
156 let sr = Arc::clone(self);
157 async move { sr.read_string(id).await }
158 .run_in_task_if(in_task, || "secrets_reader_read".to_string())
159 .await
160 }
161}
162
163#[derive(Debug, Clone)]
168pub struct ConnectionContext {
169 pub environment_id: String,
176 pub librdkafka_log_level: tracing::Level,
178 pub aws_external_id_prefix: Option<AwsExternalIdPrefix>,
180 pub aws_connection_role_arn: Option<String>,
183 pub secrets_reader: Arc<dyn SecretsReader>,
185 pub cloud_resource_reader: Option<Arc<dyn CloudResourceReader>>,
187 pub ssh_tunnel_manager: SshTunnelManager,
189}
190
191impl ConnectionContext {
192 pub fn from_cli_args(
200 environment_id: String,
201 startup_log_level: &CloneableEnvFilter,
202 aws_external_id_prefix: Option<AwsExternalIdPrefix>,
203 aws_connection_role_arn: Option<String>,
204 secrets_reader: Arc<dyn SecretsReader>,
205 cloud_resource_reader: Option<Arc<dyn CloudResourceReader>>,
206 ) -> ConnectionContext {
207 ConnectionContext {
208 environment_id,
209 librdkafka_log_level: mz_ore::tracing::crate_level(
210 &startup_log_level.clone().into(),
211 "librdkafka",
212 ),
213 aws_external_id_prefix,
214 aws_connection_role_arn,
215 secrets_reader,
216 cloud_resource_reader,
217 ssh_tunnel_manager: SshTunnelManager::default(),
218 }
219 }
220
221 pub fn for_tests(secrets_reader: Arc<dyn SecretsReader>) -> ConnectionContext {
223 ConnectionContext {
224 environment_id: "test-environment-id".into(),
225 librdkafka_log_level: tracing::Level::INFO,
226 aws_external_id_prefix: Some(
227 AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable(
228 "test-aws-external-id-prefix",
229 )
230 .expect("infallible"),
231 ),
232 aws_connection_role_arn: Some(
233 "arn:aws:iam::123456789000:role/MaterializeConnection".into(),
234 ),
235 secrets_reader,
236 cloud_resource_reader: None,
237 ssh_tunnel_manager: SshTunnelManager::default(),
238 }
239 }
240}
241
242#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
243pub enum Connection<C: ConnectionAccess = InlinedConnection> {
244 Kafka(KafkaConnection<C>),
245 Csr(CsrConnection<C>),
246 Postgres(PostgresConnection<C>),
247 Ssh(SshConnection),
248 Aws(AwsConnection),
249 AwsPrivatelink(AwsPrivatelinkConnection),
250 MySql(MySqlConnection<C>),
251 SqlServer(SqlServerConnectionDetails<C>),
252 IcebergCatalog(IcebergCatalogConnection<C>),
253}
254
255impl<R: ConnectionResolver> IntoInlineConnection<Connection, R>
256 for Connection<ReferencedConnection>
257{
258 fn into_inline_connection(self, r: R) -> Connection {
259 match self {
260 Connection::Kafka(kafka) => Connection::Kafka(kafka.into_inline_connection(r)),
261 Connection::Csr(csr) => Connection::Csr(csr.into_inline_connection(r)),
262 Connection::Postgres(pg) => Connection::Postgres(pg.into_inline_connection(r)),
263 Connection::Ssh(ssh) => Connection::Ssh(ssh),
264 Connection::Aws(aws) => Connection::Aws(aws),
265 Connection::AwsPrivatelink(awspl) => Connection::AwsPrivatelink(awspl),
266 Connection::MySql(mysql) => Connection::MySql(mysql.into_inline_connection(r)),
267 Connection::SqlServer(sql_server) => {
268 Connection::SqlServer(sql_server.into_inline_connection(r))
269 }
270 Connection::IcebergCatalog(iceberg) => {
271 Connection::IcebergCatalog(iceberg.into_inline_connection(r))
272 }
273 }
274 }
275}
276
277impl<C: ConnectionAccess> Connection<C> {
278 pub fn validate_by_default(&self) -> bool {
280 match self {
281 Connection::Kafka(conn) => conn.validate_by_default(),
282 Connection::Csr(conn) => conn.validate_by_default(),
283 Connection::Postgres(conn) => conn.validate_by_default(),
284 Connection::Ssh(conn) => conn.validate_by_default(),
285 Connection::Aws(conn) => conn.validate_by_default(),
286 Connection::AwsPrivatelink(conn) => conn.validate_by_default(),
287 Connection::MySql(conn) => conn.validate_by_default(),
288 Connection::SqlServer(conn) => conn.validate_by_default(),
289 Connection::IcebergCatalog(conn) => conn.validate_by_default(),
290 }
291 }
292}
293
294impl Connection<InlinedConnection> {
295 pub async fn validate(
297 &self,
298 id: CatalogItemId,
299 storage_configuration: &StorageConfiguration,
300 ) -> Result<(), ConnectionValidationError> {
301 match self {
302 Connection::Kafka(conn) => conn.validate(id, storage_configuration).await?,
303 Connection::Csr(conn) => conn.validate(id, storage_configuration).await?,
304 Connection::Postgres(conn) => {
305 conn.validate(id, storage_configuration).await?;
306 }
307 Connection::Ssh(conn) => conn.validate(id, storage_configuration).await?,
308 Connection::Aws(conn) => conn.validate(id, storage_configuration).await?,
309 Connection::AwsPrivatelink(conn) => conn.validate(id, storage_configuration).await?,
310 Connection::MySql(conn) => {
311 conn.validate(id, storage_configuration).await?;
312 }
313 Connection::SqlServer(conn) => {
314 conn.validate(id, storage_configuration).await?;
315 }
316 Connection::IcebergCatalog(conn) => conn.validate(id, storage_configuration).await?,
317 }
318 Ok(())
319 }
320
321 pub fn unwrap_kafka(self) -> <InlinedConnection as ConnectionAccess>::Kafka {
322 match self {
323 Self::Kafka(conn) => conn,
324 o => unreachable!("{o:?} is not a Kafka connection"),
325 }
326 }
327
328 pub fn unwrap_pg(self) -> <InlinedConnection as ConnectionAccess>::Pg {
329 match self {
330 Self::Postgres(conn) => conn,
331 o => unreachable!("{o:?} is not a Postgres connection"),
332 }
333 }
334
335 pub fn unwrap_mysql(self) -> <InlinedConnection as ConnectionAccess>::MySql {
336 match self {
337 Self::MySql(conn) => conn,
338 o => unreachable!("{o:?} is not a MySQL connection"),
339 }
340 }
341
342 pub fn unwrap_sql_server(self) -> <InlinedConnection as ConnectionAccess>::SqlServer {
343 match self {
344 Self::SqlServer(conn) => conn,
345 o => unreachable!("{o:?} is not a SQL Server connection"),
346 }
347 }
348
349 pub fn unwrap_aws(self) -> <InlinedConnection as ConnectionAccess>::Aws {
350 match self {
351 Self::Aws(conn) => conn,
352 o => unreachable!("{o:?} is not an AWS connection"),
353 }
354 }
355
356 pub fn unwrap_ssh(self) -> <InlinedConnection as ConnectionAccess>::Ssh {
357 match self {
358 Self::Ssh(conn) => conn,
359 o => unreachable!("{o:?} is not an SSH connection"),
360 }
361 }
362
363 pub fn unwrap_csr(self) -> <InlinedConnection as ConnectionAccess>::Csr {
364 match self {
365 Self::Csr(conn) => conn,
366 o => unreachable!("{o:?} is not a Kafka connection"),
367 }
368 }
369
370 pub fn unwrap_iceberg_catalog(self) -> <InlinedConnection as ConnectionAccess>::IcebergCatalog {
371 match self {
372 Self::IcebergCatalog(conn) => conn,
373 o => unreachable!("{o:?} is not an Iceberg catalog connection"),
374 }
375 }
376}
377
378#[derive(thiserror::Error, Debug)]
380pub enum ConnectionValidationError {
381 #[error(transparent)]
382 Postgres(#[from] PostgresConnectionValidationError),
383 #[error(transparent)]
384 MySql(#[from] MySqlConnectionValidationError),
385 #[error(transparent)]
386 SqlServer(#[from] SqlServerConnectionValidationError),
387 #[error(transparent)]
388 Aws(#[from] AwsConnectionValidationError),
389 #[error("{}", .0.display_with_causes())]
390 Other(#[from] anyhow::Error),
391}
392
393impl ConnectionValidationError {
394 pub fn detail(&self) -> Option<String> {
396 match self {
397 ConnectionValidationError::Postgres(e) => e.detail(),
398 ConnectionValidationError::MySql(e) => e.detail(),
399 ConnectionValidationError::SqlServer(e) => e.detail(),
400 ConnectionValidationError::Aws(e) => e.detail(),
401 ConnectionValidationError::Other(_) => None,
402 }
403 }
404
405 pub fn hint(&self) -> Option<String> {
407 match self {
408 ConnectionValidationError::Postgres(e) => e.hint(),
409 ConnectionValidationError::MySql(e) => e.hint(),
410 ConnectionValidationError::SqlServer(e) => e.hint(),
411 ConnectionValidationError::Aws(e) => e.hint(),
412 ConnectionValidationError::Other(_) => None,
413 }
414 }
415}
416
417impl<C: ConnectionAccess> AlterCompatible for Connection<C> {
418 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
419 match (self, other) {
420 (Self::Aws(s), Self::Aws(o)) => s.alter_compatible(id, o),
421 (Self::AwsPrivatelink(s), Self::AwsPrivatelink(o)) => s.alter_compatible(id, o),
422 (Self::Ssh(s), Self::Ssh(o)) => s.alter_compatible(id, o),
423 (Self::Csr(s), Self::Csr(o)) => s.alter_compatible(id, o),
424 (Self::Kafka(s), Self::Kafka(o)) => s.alter_compatible(id, o),
425 (Self::Postgres(s), Self::Postgres(o)) => s.alter_compatible(id, o),
426 (Self::MySql(s), Self::MySql(o)) => s.alter_compatible(id, o),
427 _ => {
428 tracing::warn!(
429 "Connection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
430 self,
431 other
432 );
433 Err(AlterError { id })
434 }
435 }
436 }
437}
438
439#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
440pub struct RestIcebergCatalog {
441 pub credential: StringOrSecret,
443 pub scope: Option<String>,
445 pub warehouse: Option<String>,
447}
448
449#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
450pub struct S3TablesRestIcebergCatalog<C: ConnectionAccess = InlinedConnection> {
451 pub aws_connection: AwsConnectionReference<C>,
453 pub warehouse: String,
455}
456
457impl<R: ConnectionResolver> IntoInlineConnection<S3TablesRestIcebergCatalog, R>
458 for S3TablesRestIcebergCatalog<ReferencedConnection>
459{
460 fn into_inline_connection(self, r: R) -> S3TablesRestIcebergCatalog {
461 S3TablesRestIcebergCatalog {
462 aws_connection: self.aws_connection.into_inline_connection(&r),
463 warehouse: self.warehouse,
464 }
465 }
466}
467
468#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
469pub enum IcebergCatalogType {
470 Rest,
471 S3TablesRest,
472}
473
474#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
475pub enum IcebergCatalogImpl<C: ConnectionAccess = InlinedConnection> {
476 Rest(RestIcebergCatalog),
477 S3TablesRest(S3TablesRestIcebergCatalog<C>),
478}
479
480impl<R: ConnectionResolver> IntoInlineConnection<IcebergCatalogImpl, R>
481 for IcebergCatalogImpl<ReferencedConnection>
482{
483 fn into_inline_connection(self, r: R) -> IcebergCatalogImpl {
484 match self {
485 IcebergCatalogImpl::Rest(rest) => IcebergCatalogImpl::Rest(rest),
486 IcebergCatalogImpl::S3TablesRest(s3tables) => {
487 IcebergCatalogImpl::S3TablesRest(s3tables.into_inline_connection(r))
488 }
489 }
490 }
491}
492
493#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
494pub struct IcebergCatalogConnection<C: ConnectionAccess = InlinedConnection> {
495 pub catalog: IcebergCatalogImpl<C>,
497 pub uri: reqwest::Url,
499}
500
501impl AlterCompatible for IcebergCatalogConnection {
502 fn alter_compatible(&self, id: GlobalId, _other: &Self) -> Result<(), AlterError> {
503 Err(AlterError { id })
504 }
505}
506
507impl<R: ConnectionResolver> IntoInlineConnection<IcebergCatalogConnection, R>
508 for IcebergCatalogConnection<ReferencedConnection>
509{
510 fn into_inline_connection(self, r: R) -> IcebergCatalogConnection {
511 IcebergCatalogConnection {
512 catalog: self.catalog.into_inline_connection(&r),
513 uri: self.uri,
514 }
515 }
516}
517
518impl<C: ConnectionAccess> IcebergCatalogConnection<C> {
519 fn validate_by_default(&self) -> bool {
520 true
521 }
522}
523
524impl IcebergCatalogConnection<InlinedConnection> {
525 pub async fn connect(
526 &self,
527 storage_configuration: &StorageConfiguration,
528 in_task: InTask,
529 ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
530 match self.catalog {
531 IcebergCatalogImpl::S3TablesRest(ref s3tables) => {
532 self.connect_s3tables(s3tables, storage_configuration, in_task)
533 .await
534 }
535 IcebergCatalogImpl::Rest(ref rest) => {
536 self.connect_rest(rest, storage_configuration, in_task)
537 .await
538 }
539 }
540 }
541
542 pub fn catalog_type(&self) -> IcebergCatalogType {
543 match self.catalog {
544 IcebergCatalogImpl::S3TablesRest(_) => IcebergCatalogType::S3TablesRest,
545 IcebergCatalogImpl::Rest(_) => IcebergCatalogType::Rest,
546 }
547 }
548
549 pub fn s3tables_catalog(&self) -> Option<&S3TablesRestIcebergCatalog> {
550 match &self.catalog {
551 IcebergCatalogImpl::S3TablesRest(s3tables) => Some(s3tables),
552 _ => None,
553 }
554 }
555
556 pub fn rest_catalog(&self) -> Option<&RestIcebergCatalog> {
557 match &self.catalog {
558 IcebergCatalogImpl::Rest(rest) => Some(rest),
559 _ => None,
560 }
561 }
562
563 async fn connect_s3tables(
564 &self,
565 s3tables: &S3TablesRestIcebergCatalog,
566 storage_configuration: &StorageConfiguration,
567 in_task: InTask,
568 ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
569 let secret_reader = &storage_configuration.connection_context.secrets_reader;
570 let aws_ref = &s3tables.aws_connection;
571 let aws_config = aws_ref
572 .connection
573 .load_sdk_config(
574 &storage_configuration.connection_context,
575 aws_ref.connection_id,
576 in_task,
577 )
578 .await?;
579
580 let aws_region = aws_ref
581 .connection
582 .region
583 .clone()
584 .unwrap_or_else(|| "us-east-1".to_string());
585
586 let mut props = vec![
587 (S3_REGION.to_string(), aws_region),
588 (S3_DISABLE_EC2_METADATA.to_string(), "true".to_string()),
589 (
590 REST_CATALOG_PROP_WAREHOUSE.to_string(),
591 s3tables.warehouse.clone(),
592 ),
593 (REST_CATALOG_PROP_URI.to_string(), self.uri.to_string()),
594 ];
595
596 let aws_auth = aws_ref.connection.auth.clone();
597
598 if let AwsAuth::Credentials(creds) = &aws_auth {
599 props.push((
600 S3_ACCESS_KEY_ID.to_string(),
601 creds
602 .access_key_id
603 .get_string(in_task, secret_reader)
604 .await?,
605 ));
606 props.push((
607 S3_SECRET_ACCESS_KEY.to_string(),
608 secret_reader.read_string(creds.secret_access_key).await?,
609 ));
610 }
611
612 let catalog = RestCatalogBuilder::default()
616 .with_aws_config(aws_config.clone())
617 .load("IcebergCatalog", props.into_iter().collect())
618 .await?;
619
620 let catalog = if matches!(aws_auth, AwsAuth::AssumeRole(_)) {
621 let credentials_provider = aws_config
622 .credentials_provider()
623 .ok_or_else(|| anyhow!("aws_config missing credentials provider"))?;
624 let file_io_loader = CustomAwsCredentialLoader::new(Arc::new(
625 AwsSdkCredentialLoader::new(credentials_provider),
626 ));
627 catalog.with_file_io_extension(file_io_loader)
628 } else {
629 catalog
630 };
631
632 Ok(Arc::new(catalog))
633 }
634
635 async fn connect_rest(
636 &self,
637 rest: &RestIcebergCatalog,
638 storage_configuration: &StorageConfiguration,
639 in_task: InTask,
640 ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
641 let mut props = BTreeMap::from([(
642 REST_CATALOG_PROP_URI.to_string(),
643 self.uri.to_string().clone(),
644 )]);
645
646 if let Some(warehouse) = &rest.warehouse {
647 props.insert(REST_CATALOG_PROP_WAREHOUSE.to_string(), warehouse.clone());
648 }
649
650 let credential = rest
651 .credential
652 .get_string(
653 in_task,
654 &storage_configuration.connection_context.secrets_reader,
655 )
656 .await
657 .map_err(|e| anyhow!("failed to read Iceberg catalog credential: {e}"))?;
658 props.insert(REST_CATALOG_PROP_CREDENTIAL.to_string(), credential);
659
660 if let Some(scope) = &rest.scope {
661 props.insert(REST_CATALOG_PROP_SCOPE.to_string(), scope.clone());
662 }
663
664 let catalog = RestCatalogBuilder::default()
665 .load("IcebergCatalog", props.into_iter().collect())
666 .await
667 .map_err(|e| anyhow!("failed to create Iceberg catalog: {e}"))?;
668 Ok(Arc::new(catalog))
669 }
670
671 async fn validate(
672 &self,
673 _id: CatalogItemId,
674 storage_configuration: &StorageConfiguration,
675 ) -> Result<(), ConnectionValidationError> {
676 let catalog = self
677 .connect(storage_configuration, InTask::No)
678 .await
679 .map_err(|e| {
680 ConnectionValidationError::Other(anyhow!("failed to connect to catalog: {e}"))
681 })?;
682
683 catalog.list_namespaces(None).await.map_err(|e| {
685 ConnectionValidationError::Other(anyhow!("failed to list namespaces: {e}"))
686 })?;
687
688 Ok(())
689 }
690}
691
692#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
693pub struct AwsPrivatelinkConnection {
694 pub service_name: String,
695 pub availability_zones: Vec<String>,
696}
697
698impl AlterCompatible for AwsPrivatelinkConnection {
699 fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
700 Ok(())
702 }
703}
704
705#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
706pub struct KafkaTlsConfig {
707 pub identity: Option<TlsIdentity>,
708 pub root_cert: Option<StringOrSecret>,
709}
710
711#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
712pub struct KafkaSaslConfig<C: ConnectionAccess = InlinedConnection> {
713 pub mechanism: String,
714 pub username: StringOrSecret,
715 pub password: Option<CatalogItemId>,
716 pub aws: Option<AwsConnectionReference<C>>,
717}
718
719impl<R: ConnectionResolver> IntoInlineConnection<KafkaSaslConfig, R>
720 for KafkaSaslConfig<ReferencedConnection>
721{
722 fn into_inline_connection(self, r: R) -> KafkaSaslConfig {
723 KafkaSaslConfig {
724 mechanism: self.mechanism,
725 username: self.username,
726 password: self.password,
727 aws: self.aws.map(|aws| aws.into_inline_connection(&r)),
728 }
729 }
730}
731
732#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
734pub struct KafkaBroker<C: ConnectionAccess = InlinedConnection> {
735 pub address: String,
737 pub tunnel: Tunnel<C>,
739}
740
741impl<R: ConnectionResolver> IntoInlineConnection<KafkaBroker, R>
742 for KafkaBroker<ReferencedConnection>
743{
744 fn into_inline_connection(self, r: R) -> KafkaBroker {
745 let KafkaBroker { address, tunnel } = self;
746 KafkaBroker {
747 address,
748 tunnel: tunnel.into_inline_connection(r),
749 }
750 }
751}
752
753#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Default)]
754pub struct KafkaTopicOptions {
755 pub replication_factor: Option<NonNeg<i32>>,
758 pub partition_count: Option<NonNeg<i32>>,
761 pub topic_config: BTreeMap<String, String>,
763}
764
765#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
766pub struct KafkaConnection<C: ConnectionAccess = InlinedConnection> {
767 pub brokers: Vec<KafkaBroker<C>>,
768 pub default_tunnel: Tunnel<C>,
772 pub progress_topic: Option<String>,
773 pub progress_topic_options: KafkaTopicOptions,
774 pub options: BTreeMap<String, StringOrSecret>,
775 pub tls: Option<KafkaTlsConfig>,
776 pub sasl: Option<KafkaSaslConfig<C>>,
777}
778
779impl<R: ConnectionResolver> IntoInlineConnection<KafkaConnection, R>
780 for KafkaConnection<ReferencedConnection>
781{
782 fn into_inline_connection(self, r: R) -> KafkaConnection {
783 let KafkaConnection {
784 brokers,
785 progress_topic,
786 progress_topic_options,
787 default_tunnel,
788 options,
789 tls,
790 sasl,
791 } = self;
792
793 let brokers = brokers
794 .into_iter()
795 .map(|broker| broker.into_inline_connection(&r))
796 .collect();
797
798 KafkaConnection {
799 brokers,
800 progress_topic,
801 progress_topic_options,
802 default_tunnel: default_tunnel.into_inline_connection(&r),
803 options,
804 tls,
805 sasl: sasl.map(|sasl| sasl.into_inline_connection(&r)),
806 }
807 }
808}
809
810impl<C: ConnectionAccess> KafkaConnection<C> {
811 pub fn progress_topic(
816 &self,
817 connection_context: &ConnectionContext,
818 connection_id: CatalogItemId,
819 ) -> Cow<'_, str> {
820 if let Some(progress_topic) = &self.progress_topic {
821 Cow::Borrowed(progress_topic)
822 } else {
823 Cow::Owned(format!(
824 "_materialize-progress-{}-{}",
825 connection_context.environment_id, connection_id,
826 ))
827 }
828 }
829
830 fn validate_by_default(&self) -> bool {
831 true
832 }
833}
834
835impl KafkaConnection {
836 pub fn id_base(
840 connection_context: &ConnectionContext,
841 connection_id: CatalogItemId,
842 object_id: GlobalId,
843 ) -> String {
844 format!(
845 "materialize-{}-{}-{}",
846 connection_context.environment_id, connection_id, object_id,
847 )
848 }
849
850 pub fn enrich_client_id(&self, configs: &ConfigSet, client_id: &mut String) {
853 #[derive(Debug, Deserialize)]
854 struct EnrichmentRule {
855 #[serde(deserialize_with = "deserialize_regex")]
856 pattern: Regex,
857 payload: String,
858 }
859
860 fn deserialize_regex<'de, D>(deserializer: D) -> Result<Regex, D::Error>
861 where
862 D: Deserializer<'de>,
863 {
864 let buf = String::deserialize(deserializer)?;
865 Regex::new(&buf).map_err(serde::de::Error::custom)
866 }
867
868 let rules = KAFKA_CLIENT_ID_ENRICHMENT_RULES.get(configs);
869 let rules = match serde_json::from_value::<Vec<EnrichmentRule>>(rules) {
870 Ok(rules) => rules,
871 Err(e) => {
872 warn!(%e, "failed to decode kafka_client_id_enrichment_rules");
873 return;
874 }
875 };
876
877 debug!(?self.brokers, "evaluating client ID enrichment rules");
882 for rule in rules {
883 let is_match = self
884 .brokers
885 .iter()
886 .any(|b| rule.pattern.is_match(&b.address));
887 debug!(?rule, is_match, "evaluated client ID enrichment rule");
888 if is_match {
889 client_id.push('-');
890 client_id.push_str(&rule.payload);
891 }
892 }
893 }
894
895 pub async fn create_with_context<C, T>(
897 &self,
898 storage_configuration: &StorageConfiguration,
899 context: C,
900 extra_options: &BTreeMap<&str, String>,
901 in_task: InTask,
902 ) -> Result<T, ContextCreationError>
903 where
904 C: ClientContext,
905 T: FromClientConfigAndContext<TunnelingClientContext<C>>,
906 {
907 let mut options = self.options.clone();
908
909 options.insert("allow.auto.create.topics".into(), "false".into());
914
915 let brokers = match &self.default_tunnel {
916 Tunnel::AwsPrivatelink(t) => {
917 assert!(&self.brokers.is_empty());
918
919 let algo = KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM
920 .get(storage_configuration.config_set());
921 options.insert("ssl.endpoint.identification.algorithm".into(), algo.into());
922
923 format!(
926 "{}:{}",
927 vpc_endpoint_host(
928 t.connection_id,
929 None, ),
931 t.port.unwrap_or(9092)
932 )
933 }
934 _ => self.brokers.iter().map(|b| &b.address).join(","),
935 };
936 options.insert("bootstrap.servers".into(), brokers.into());
937 let security_protocol = match (self.tls.is_some(), self.sasl.is_some()) {
938 (false, false) => "PLAINTEXT",
939 (true, false) => "SSL",
940 (false, true) => "SASL_PLAINTEXT",
941 (true, true) => "SASL_SSL",
942 };
943 options.insert("security.protocol".into(), security_protocol.into());
944 if let Some(tls) = &self.tls {
945 if let Some(root_cert) = &tls.root_cert {
946 options.insert("ssl.ca.pem".into(), root_cert.clone());
947 }
948 if let Some(identity) = &tls.identity {
949 options.insert("ssl.key.pem".into(), StringOrSecret::Secret(identity.key));
950 options.insert("ssl.certificate.pem".into(), identity.cert.clone());
951 }
952 }
953 if let Some(sasl) = &self.sasl {
954 options.insert("sasl.mechanisms".into(), (&sasl.mechanism).into());
955 options.insert("sasl.username".into(), sasl.username.clone());
956 if let Some(password) = sasl.password {
957 options.insert("sasl.password".into(), StringOrSecret::Secret(password));
958 }
959 }
960
961 options.insert(
962 "retry.backoff.ms".into(),
963 KAFKA_RETRY_BACKOFF
964 .get(storage_configuration.config_set())
965 .as_millis()
966 .into(),
967 );
968 options.insert(
969 "retry.backoff.max.ms".into(),
970 KAFKA_RETRY_BACKOFF_MAX
971 .get(storage_configuration.config_set())
972 .as_millis()
973 .into(),
974 );
975 options.insert(
976 "reconnect.backoff.ms".into(),
977 KAFKA_RECONNECT_BACKOFF
978 .get(storage_configuration.config_set())
979 .as_millis()
980 .into(),
981 );
982 options.insert(
983 "reconnect.backoff.max.ms".into(),
984 KAFKA_RECONNECT_BACKOFF_MAX
985 .get(storage_configuration.config_set())
986 .as_millis()
987 .into(),
988 );
989
990 let mut config = mz_kafka_util::client::create_new_client_config(
991 storage_configuration
992 .connection_context
993 .librdkafka_log_level,
994 storage_configuration.parameters.kafka_timeout_config,
995 );
996 for (k, v) in options {
997 config.set(
998 k,
999 v.get_string(
1000 in_task,
1001 &storage_configuration.connection_context.secrets_reader,
1002 )
1003 .await
1004 .context("reading kafka secret")?,
1005 );
1006 }
1007 for (k, v) in extra_options {
1008 config.set(*k, v);
1009 }
1010
1011 let aws_config = match self.sasl.as_ref().and_then(|sasl| sasl.aws.as_ref()) {
1012 None => None,
1013 Some(aws) => Some(
1014 aws.connection
1015 .load_sdk_config(
1016 &storage_configuration.connection_context,
1017 aws.connection_id,
1018 in_task,
1019 )
1020 .await?,
1021 ),
1022 };
1023
1024 let mut context = TunnelingClientContext::new(
1028 context,
1029 Handle::current(),
1030 storage_configuration
1031 .connection_context
1032 .ssh_tunnel_manager
1033 .clone(),
1034 storage_configuration.parameters.ssh_timeout_config,
1035 aws_config,
1036 in_task,
1037 );
1038
1039 match &self.default_tunnel {
1040 Tunnel::Direct => {
1041 }
1043 Tunnel::AwsPrivatelink(pl) => {
1044 context.set_default_tunnel(TunnelConfig::StaticHost(vpc_endpoint_host(
1045 pl.connection_id,
1046 None, )));
1048 }
1049 Tunnel::Ssh(ssh_tunnel) => {
1050 let secret = storage_configuration
1051 .connection_context
1052 .secrets_reader
1053 .read_in_task_if(in_task, ssh_tunnel.connection_id)
1054 .await?;
1055 let key_pair = SshKeyPair::from_bytes(&secret)?;
1056
1057 let resolved = resolve_address(
1059 &ssh_tunnel.connection.host,
1060 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1061 )
1062 .await?;
1063 context.set_default_tunnel(TunnelConfig::Ssh(SshTunnelConfig {
1064 host: resolved
1065 .iter()
1066 .map(|a| a.to_string())
1067 .collect::<BTreeSet<_>>(),
1068 port: ssh_tunnel.connection.port,
1069 user: ssh_tunnel.connection.user.clone(),
1070 key_pair,
1071 }));
1072 }
1073 }
1074
1075 for broker in &self.brokers {
1076 let mut addr_parts = broker.address.splitn(2, ':');
1077 let addr = BrokerAddr {
1078 host: addr_parts
1079 .next()
1080 .context("BROKER is not address:port")?
1081 .into(),
1082 port: addr_parts
1083 .next()
1084 .unwrap_or("9092")
1085 .parse()
1086 .context("parsing BROKER port")?,
1087 };
1088 match &broker.tunnel {
1089 Tunnel::Direct => {
1090 }
1100 Tunnel::AwsPrivatelink(aws_privatelink) => {
1101 let host = mz_cloud_resources::vpc_endpoint_host(
1102 aws_privatelink.connection_id,
1103 aws_privatelink.availability_zone.as_deref(),
1104 );
1105 let port = aws_privatelink.port;
1106 context.add_broker_rewrite(
1107 addr,
1108 BrokerRewrite {
1109 host: host.clone(),
1110 port,
1111 },
1112 );
1113 }
1114 Tunnel::Ssh(ssh_tunnel) => {
1115 let ssh_host_resolved = resolve_address(
1117 &ssh_tunnel.connection.host,
1118 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1119 )
1120 .await?;
1121 context
1122 .add_ssh_tunnel(
1123 addr,
1124 SshTunnelConfig {
1125 host: ssh_host_resolved
1126 .iter()
1127 .map(|a| a.to_string())
1128 .collect::<BTreeSet<_>>(),
1129 port: ssh_tunnel.connection.port,
1130 user: ssh_tunnel.connection.user.clone(),
1131 key_pair: SshKeyPair::from_bytes(
1132 &storage_configuration
1133 .connection_context
1134 .secrets_reader
1135 .read_in_task_if(in_task, ssh_tunnel.connection_id)
1136 .await?,
1137 )?,
1138 },
1139 )
1140 .await
1141 .map_err(ContextCreationError::Ssh)?;
1142 }
1143 }
1144 }
1145
1146 Ok(config.create_with_context(context)?)
1147 }
1148
1149 async fn validate(
1150 &self,
1151 _id: CatalogItemId,
1152 storage_configuration: &StorageConfiguration,
1153 ) -> Result<(), anyhow::Error> {
1154 let (context, error_rx) = MzClientContext::with_errors();
1155 let consumer: BaseConsumer<_> = self
1156 .create_with_context(
1157 storage_configuration,
1158 context,
1159 &BTreeMap::new(),
1160 InTask::No,
1162 )
1163 .await?;
1164 let consumer = Arc::new(consumer);
1165
1166 let timeout = storage_configuration
1167 .parameters
1168 .kafka_timeout_config
1169 .fetch_metadata_timeout;
1170
1171 let result = mz_ore::task::spawn_blocking(|| "kafka_get_metadata", {
1182 let consumer = Arc::clone(&consumer);
1183 move || consumer.fetch_metadata(None, timeout)
1184 })
1185 .await;
1186 match result {
1187 Ok(_) => Ok(()),
1188 Err(err) => {
1193 let main_err = error_rx.try_iter().reduce(|cur, new| match cur {
1197 MzKafkaError::Internal(_) => new,
1198 _ => cur,
1199 });
1200
1201 drop(consumer);
1205
1206 match main_err {
1207 Some(err) => Err(err.into()),
1208 None => Err(err.into()),
1209 }
1210 }
1211 }
1212 }
1213}
1214
1215impl<C: ConnectionAccess> AlterCompatible for KafkaConnection<C> {
1216 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1217 let KafkaConnection {
1218 brokers: _,
1219 default_tunnel: _,
1220 progress_topic,
1221 progress_topic_options,
1222 options: _,
1223 tls: _,
1224 sasl: _,
1225 } = self;
1226
1227 let compatibility_checks = [
1228 (progress_topic == &other.progress_topic, "progress_topic"),
1229 (
1230 progress_topic_options == &other.progress_topic_options,
1231 "progress_topic_options",
1232 ),
1233 ];
1234
1235 for (compatible, field) in compatibility_checks {
1236 if !compatible {
1237 tracing::warn!(
1238 "KafkaConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1239 self,
1240 other
1241 );
1242
1243 return Err(AlterError { id });
1244 }
1245 }
1246
1247 Ok(())
1248 }
1249}
1250
1251#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1253pub struct CsrConnection<C: ConnectionAccess = InlinedConnection> {
1254 pub url: Url,
1256 pub tls_root_cert: Option<StringOrSecret>,
1258 pub tls_identity: Option<TlsIdentity>,
1261 pub http_auth: Option<CsrConnectionHttpAuth>,
1263 pub tunnel: Tunnel<C>,
1265}
1266
1267impl<R: ConnectionResolver> IntoInlineConnection<CsrConnection, R>
1268 for CsrConnection<ReferencedConnection>
1269{
1270 fn into_inline_connection(self, r: R) -> CsrConnection {
1271 let CsrConnection {
1272 url,
1273 tls_root_cert,
1274 tls_identity,
1275 http_auth,
1276 tunnel,
1277 } = self;
1278 CsrConnection {
1279 url,
1280 tls_root_cert,
1281 tls_identity,
1282 http_auth,
1283 tunnel: tunnel.into_inline_connection(r),
1284 }
1285 }
1286}
1287
1288impl<C: ConnectionAccess> CsrConnection<C> {
1289 fn validate_by_default(&self) -> bool {
1290 true
1291 }
1292}
1293
1294impl CsrConnection {
1295 pub async fn connect(
1297 &self,
1298 storage_configuration: &StorageConfiguration,
1299 in_task: InTask,
1300 ) -> Result<mz_ccsr::Client, CsrConnectError> {
1301 let mut client_config = mz_ccsr::ClientConfig::new(self.url.clone());
1302 if let Some(root_cert) = &self.tls_root_cert {
1303 let root_cert = root_cert
1304 .get_string(
1305 in_task,
1306 &storage_configuration.connection_context.secrets_reader,
1307 )
1308 .await?;
1309 let root_cert = Certificate::from_pem(root_cert.as_bytes())?;
1310 client_config = client_config.add_root_certificate(root_cert);
1311 }
1312
1313 if let Some(tls_identity) = &self.tls_identity {
1314 let key = &storage_configuration
1315 .connection_context
1316 .secrets_reader
1317 .read_string_in_task_if(in_task, tls_identity.key)
1318 .await?;
1319 let cert = tls_identity
1320 .cert
1321 .get_string(
1322 in_task,
1323 &storage_configuration.connection_context.secrets_reader,
1324 )
1325 .await?;
1326 let ident = Identity::from_pem(key.as_bytes(), cert.as_bytes())?;
1327 client_config = client_config.identity(ident);
1328 }
1329
1330 if let Some(http_auth) = &self.http_auth {
1331 let username = http_auth
1332 .username
1333 .get_string(
1334 in_task,
1335 &storage_configuration.connection_context.secrets_reader,
1336 )
1337 .await?;
1338 let password = match http_auth.password {
1339 None => None,
1340 Some(password) => Some(
1341 storage_configuration
1342 .connection_context
1343 .secrets_reader
1344 .read_string_in_task_if(in_task, password)
1345 .await?,
1346 ),
1347 };
1348 client_config = client_config.auth(username, password);
1349 }
1350
1351 let host = self
1353 .url
1354 .host_str()
1355 .ok_or_else(|| anyhow!("url missing host"))?;
1356 match &self.tunnel {
1357 Tunnel::Direct => {
1358 let resolved = resolve_address(
1360 host,
1361 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1362 )
1363 .await?;
1364 client_config = client_config.resolve_to_addrs(
1365 host,
1366 &resolved
1367 .iter()
1368 .map(|addr| SocketAddr::new(*addr, 0))
1369 .collect::<Vec<_>>(),
1370 )
1371 }
1372 Tunnel::Ssh(ssh_tunnel) => {
1373 let ssh_tunnel = ssh_tunnel
1374 .connect(
1375 storage_configuration,
1376 host,
1377 self.url.port().unwrap_or(80),
1380 in_task,
1381 )
1382 .await
1383 .map_err(CsrConnectError::Ssh)?;
1384
1385 client_config = client_config
1391 .resolve_to_addrs(host, &[SocketAddr::new(ssh_tunnel.local_addr().ip(), 0)])
1398 .dynamic_url({
1409 let remote_url = self.url.clone();
1410 move || {
1411 let mut url = remote_url.clone();
1412 url.set_port(Some(ssh_tunnel.local_addr().port()))
1413 .expect("cannot fail");
1414 url
1415 }
1416 });
1417 }
1418 Tunnel::AwsPrivatelink(connection) => {
1419 assert_none!(connection.port);
1420
1421 let privatelink_host = mz_cloud_resources::vpc_endpoint_host(
1422 connection.connection_id,
1423 connection.availability_zone.as_deref(),
1424 );
1425 let addrs: Vec<_> = net::lookup_host((privatelink_host, 0))
1426 .await
1427 .context("resolving PrivateLink host")?
1428 .collect();
1429 client_config = client_config.resolve_to_addrs(host, &addrs)
1430 }
1431 }
1432
1433 Ok(client_config.build()?)
1434 }
1435
1436 async fn validate(
1437 &self,
1438 _id: CatalogItemId,
1439 storage_configuration: &StorageConfiguration,
1440 ) -> Result<(), anyhow::Error> {
1441 let client = self
1442 .connect(
1443 storage_configuration,
1444 InTask::No,
1446 )
1447 .await?;
1448 client.list_subjects().await?;
1449 Ok(())
1450 }
1451}
1452
1453impl<C: ConnectionAccess> AlterCompatible for CsrConnection<C> {
1454 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1455 let CsrConnection {
1456 tunnel,
1457 url: _,
1459 tls_root_cert: _,
1460 tls_identity: _,
1461 http_auth: _,
1462 } = self;
1463
1464 let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
1465
1466 for (compatible, field) in compatibility_checks {
1467 if !compatible {
1468 tracing::warn!(
1469 "CsrConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1470 self,
1471 other
1472 );
1473
1474 return Err(AlterError { id });
1475 }
1476 }
1477 Ok(())
1478 }
1479}
1480
1481#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1483pub struct TlsIdentity {
1484 pub cert: StringOrSecret,
1486 pub key: CatalogItemId,
1489}
1490
1491#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1493pub struct CsrConnectionHttpAuth {
1494 pub username: StringOrSecret,
1496 pub password: Option<CatalogItemId>,
1498}
1499
1500#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1502pub struct PostgresConnection<C: ConnectionAccess = InlinedConnection> {
1503 pub host: String,
1505 pub port: u16,
1507 pub database: String,
1509 pub user: StringOrSecret,
1511 pub password: Option<CatalogItemId>,
1513 pub tunnel: Tunnel<C>,
1515 pub tls_mode: SslMode,
1517 pub tls_root_cert: Option<StringOrSecret>,
1520 pub tls_identity: Option<TlsIdentity>,
1522}
1523
1524impl<R: ConnectionResolver> IntoInlineConnection<PostgresConnection, R>
1525 for PostgresConnection<ReferencedConnection>
1526{
1527 fn into_inline_connection(self, r: R) -> PostgresConnection {
1528 let PostgresConnection {
1529 host,
1530 port,
1531 database,
1532 user,
1533 password,
1534 tunnel,
1535 tls_mode,
1536 tls_root_cert,
1537 tls_identity,
1538 } = self;
1539
1540 PostgresConnection {
1541 host,
1542 port,
1543 database,
1544 user,
1545 password,
1546 tunnel: tunnel.into_inline_connection(r),
1547 tls_mode,
1548 tls_root_cert,
1549 tls_identity,
1550 }
1551 }
1552}
1553
1554impl<C: ConnectionAccess> PostgresConnection<C> {
1555 fn validate_by_default(&self) -> bool {
1556 true
1557 }
1558}
1559
1560impl PostgresConnection<InlinedConnection> {
1561 pub async fn config(
1562 &self,
1563 secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
1564 storage_configuration: &StorageConfiguration,
1565 in_task: InTask,
1566 ) -> Result<mz_postgres_util::Config, anyhow::Error> {
1567 let params = &storage_configuration.parameters;
1568
1569 let mut config = tokio_postgres::Config::new();
1570 config
1571 .host(&self.host)
1572 .port(self.port)
1573 .dbname(&self.database)
1574 .user(&self.user.get_string(in_task, secrets_reader).await?)
1575 .ssl_mode(self.tls_mode);
1576 if let Some(password) = self.password {
1577 let password = secrets_reader
1578 .read_string_in_task_if(in_task, password)
1579 .await?;
1580 config.password(password);
1581 }
1582 if let Some(tls_root_cert) = &self.tls_root_cert {
1583 let tls_root_cert = tls_root_cert.get_string(in_task, secrets_reader).await?;
1584 config.ssl_root_cert(tls_root_cert.as_bytes());
1585 }
1586 if let Some(tls_identity) = &self.tls_identity {
1587 let cert = tls_identity
1588 .cert
1589 .get_string(in_task, secrets_reader)
1590 .await?;
1591 let key = secrets_reader
1592 .read_string_in_task_if(in_task, tls_identity.key)
1593 .await?;
1594 config.ssl_cert(cert.as_bytes()).ssl_key(key.as_bytes());
1595 }
1596
1597 if let Some(connect_timeout) = params.pg_source_connect_timeout {
1598 config.connect_timeout(connect_timeout);
1599 }
1600 if let Some(keepalives_retries) = params.pg_source_tcp_keepalives_retries {
1601 config.keepalives_retries(keepalives_retries);
1602 }
1603 if let Some(keepalives_idle) = params.pg_source_tcp_keepalives_idle {
1604 config.keepalives_idle(keepalives_idle);
1605 }
1606 if let Some(keepalives_interval) = params.pg_source_tcp_keepalives_interval {
1607 config.keepalives_interval(keepalives_interval);
1608 }
1609 if let Some(tcp_user_timeout) = params.pg_source_tcp_user_timeout {
1610 config.tcp_user_timeout(tcp_user_timeout);
1611 }
1612
1613 let mut options = vec![];
1614 if let Some(wal_sender_timeout) = params.pg_source_wal_sender_timeout {
1615 options.push(format!(
1616 "--wal_sender_timeout={}",
1617 wal_sender_timeout.as_millis()
1618 ));
1619 };
1620 if params.pg_source_tcp_configure_server {
1621 if let Some(keepalives_retries) = params.pg_source_tcp_keepalives_retries {
1622 options.push(format!("--tcp_keepalives_count={}", keepalives_retries));
1623 }
1624 if let Some(keepalives_idle) = params.pg_source_tcp_keepalives_idle {
1625 options.push(format!(
1626 "--tcp_keepalives_idle={}",
1627 keepalives_idle.as_secs()
1628 ));
1629 }
1630 if let Some(keepalives_interval) = params.pg_source_tcp_keepalives_interval {
1631 options.push(format!(
1632 "--tcp_keepalives_interval={}",
1633 keepalives_interval.as_secs()
1634 ));
1635 }
1636 if let Some(tcp_user_timeout) = params.pg_source_tcp_user_timeout {
1637 options.push(format!(
1638 "--tcp_user_timeout={}",
1639 tcp_user_timeout.as_millis()
1640 ));
1641 }
1642 }
1643 config.options(options.join(" ").as_str());
1644
1645 let tunnel = match &self.tunnel {
1646 Tunnel::Direct => {
1647 let resolved = resolve_address(
1649 &self.host,
1650 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1651 )
1652 .await?;
1653 mz_postgres_util::TunnelConfig::Direct {
1654 resolved_ips: Some(resolved),
1655 }
1656 }
1657 Tunnel::Ssh(SshTunnel {
1658 connection_id,
1659 connection,
1660 }) => {
1661 let secret = secrets_reader
1662 .read_in_task_if(in_task, *connection_id)
1663 .await?;
1664 let key_pair = SshKeyPair::from_bytes(&secret)?;
1665 let resolved = resolve_address(
1667 &connection.host,
1668 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1669 )
1670 .await?;
1671 mz_postgres_util::TunnelConfig::Ssh {
1672 config: SshTunnelConfig {
1673 host: resolved
1674 .iter()
1675 .map(|a| a.to_string())
1676 .collect::<BTreeSet<_>>(),
1677 port: connection.port,
1678 user: connection.user.clone(),
1679 key_pair,
1680 },
1681 }
1682 }
1683 Tunnel::AwsPrivatelink(connection) => {
1684 assert_none!(connection.port);
1685 mz_postgres_util::TunnelConfig::AwsPrivatelink {
1686 connection_id: connection.connection_id,
1687 }
1688 }
1689 };
1690
1691 Ok(mz_postgres_util::Config::new(
1692 config,
1693 tunnel,
1694 params.ssh_timeout_config,
1695 in_task,
1696 )?)
1697 }
1698
1699 pub async fn validate(
1700 &self,
1701 _id: CatalogItemId,
1702 storage_configuration: &StorageConfiguration,
1703 ) -> Result<mz_postgres_util::Client, anyhow::Error> {
1704 let config = self
1705 .config(
1706 &storage_configuration.connection_context.secrets_reader,
1707 storage_configuration,
1708 InTask::No,
1710 )
1711 .await?;
1712 let client = config
1713 .connect(
1714 "connection validation",
1715 &storage_configuration.connection_context.ssh_tunnel_manager,
1716 )
1717 .await?;
1718
1719 let wal_level = mz_postgres_util::get_wal_level(&client).await?;
1720
1721 if wal_level < mz_postgres_util::replication::WalLevel::Logical {
1722 Err(PostgresConnectionValidationError::InsufficientWalLevel { wal_level })?;
1723 }
1724
1725 let max_wal_senders = mz_postgres_util::get_max_wal_senders(&client).await?;
1726
1727 if max_wal_senders < 1 {
1728 Err(PostgresConnectionValidationError::ReplicationDisabled)?;
1729 }
1730
1731 let available_replication_slots =
1732 mz_postgres_util::available_replication_slots(&client).await?;
1733
1734 if available_replication_slots < 2 {
1736 Err(
1737 PostgresConnectionValidationError::InsufficientReplicationSlotsAvailable {
1738 count: 2,
1739 },
1740 )?;
1741 }
1742
1743 Ok(client)
1744 }
1745}
1746
1747#[derive(Debug, Clone, thiserror::Error)]
1748pub enum PostgresConnectionValidationError {
1749 #[error("PostgreSQL server has insufficient number of replication slots available")]
1750 InsufficientReplicationSlotsAvailable { count: usize },
1751 #[error("server must have wal_level >= logical, but has {wal_level}")]
1752 InsufficientWalLevel {
1753 wal_level: mz_postgres_util::replication::WalLevel,
1754 },
1755 #[error("replication disabled on server")]
1756 ReplicationDisabled,
1757}
1758
1759impl PostgresConnectionValidationError {
1760 pub fn detail(&self) -> Option<String> {
1761 match self {
1762 Self::InsufficientReplicationSlotsAvailable { count } => Some(format!(
1763 "executing this statement requires {} replication slot{}",
1764 count,
1765 if *count == 1 { "" } else { "s" }
1766 )),
1767 _ => None,
1768 }
1769 }
1770
1771 pub fn hint(&self) -> Option<String> {
1772 match self {
1773 Self::InsufficientReplicationSlotsAvailable { .. } => Some(
1774 "you might be able to wait for other sources to finish snapshotting and try again"
1775 .into(),
1776 ),
1777 Self::ReplicationDisabled => Some("set max_wal_senders to a value > 0".into()),
1778 _ => None,
1779 }
1780 }
1781}
1782
1783impl<C: ConnectionAccess> AlterCompatible for PostgresConnection<C> {
1784 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1785 let PostgresConnection {
1786 tunnel,
1787 host: _,
1789 port: _,
1790 database: _,
1791 user: _,
1792 password: _,
1793 tls_mode: _,
1794 tls_root_cert: _,
1795 tls_identity: _,
1796 } = self;
1797
1798 let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
1799
1800 for (compatible, field) in compatibility_checks {
1801 if !compatible {
1802 tracing::warn!(
1803 "PostgresConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1804 self,
1805 other
1806 );
1807
1808 return Err(AlterError { id });
1809 }
1810 }
1811 Ok(())
1812 }
1813}
1814
1815#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1817pub enum Tunnel<C: ConnectionAccess = InlinedConnection> {
1818 Direct,
1820 Ssh(SshTunnel<C>),
1822 AwsPrivatelink(AwsPrivatelink),
1824}
1825
1826impl<R: ConnectionResolver> IntoInlineConnection<Tunnel, R> for Tunnel<ReferencedConnection> {
1827 fn into_inline_connection(self, r: R) -> Tunnel {
1828 match self {
1829 Tunnel::Direct => Tunnel::Direct,
1830 Tunnel::Ssh(ssh) => Tunnel::Ssh(ssh.into_inline_connection(r)),
1831 Tunnel::AwsPrivatelink(awspl) => Tunnel::AwsPrivatelink(awspl),
1832 }
1833 }
1834}
1835
1836impl<C: ConnectionAccess> AlterCompatible for Tunnel<C> {
1837 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1838 let compatible = match (self, other) {
1839 (Self::Ssh(s), Self::Ssh(o)) => s.alter_compatible(id, o).is_ok(),
1840 (s, o) => s == o,
1841 };
1842
1843 if !compatible {
1844 tracing::warn!(
1845 "Tunnel incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
1846 self,
1847 other
1848 );
1849
1850 return Err(AlterError { id });
1851 }
1852
1853 Ok(())
1854 }
1855}
1856
1857#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1861pub enum MySqlSslMode {
1862 Disabled,
1863 Required,
1864 VerifyCa,
1865 VerifyIdentity,
1866}
1867
1868#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1870pub struct MySqlConnection<C: ConnectionAccess = InlinedConnection> {
1871 pub host: String,
1873 pub port: u16,
1875 pub user: StringOrSecret,
1877 pub password: Option<CatalogItemId>,
1879 pub tunnel: Tunnel<C>,
1881 pub tls_mode: MySqlSslMode,
1883 pub tls_root_cert: Option<StringOrSecret>,
1886 pub tls_identity: Option<TlsIdentity>,
1888 pub aws_connection: Option<AwsConnectionReference<C>>,
1891}
1892
1893impl<R: ConnectionResolver> IntoInlineConnection<MySqlConnection, R>
1894 for MySqlConnection<ReferencedConnection>
1895{
1896 fn into_inline_connection(self, r: R) -> MySqlConnection {
1897 let MySqlConnection {
1898 host,
1899 port,
1900 user,
1901 password,
1902 tunnel,
1903 tls_mode,
1904 tls_root_cert,
1905 tls_identity,
1906 aws_connection,
1907 } = self;
1908
1909 MySqlConnection {
1910 host,
1911 port,
1912 user,
1913 password,
1914 tunnel: tunnel.into_inline_connection(&r),
1915 tls_mode,
1916 tls_root_cert,
1917 tls_identity,
1918 aws_connection: aws_connection.map(|aws| aws.into_inline_connection(&r)),
1919 }
1920 }
1921}
1922
1923impl<C: ConnectionAccess> MySqlConnection<C> {
1924 fn validate_by_default(&self) -> bool {
1925 true
1926 }
1927}
1928
1929impl MySqlConnection<InlinedConnection> {
1930 pub async fn config(
1931 &self,
1932 secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
1933 storage_configuration: &StorageConfiguration,
1934 in_task: InTask,
1935 ) -> Result<mz_mysql_util::Config, anyhow::Error> {
1936 let mut opts = mysql_async::OptsBuilder::default()
1938 .ip_or_hostname(&self.host)
1939 .tcp_port(self.port)
1940 .user(Some(&self.user.get_string(in_task, secrets_reader).await?));
1941
1942 if let Some(password) = self.password {
1943 let password = secrets_reader
1944 .read_string_in_task_if(in_task, password)
1945 .await?;
1946 opts = opts.pass(Some(password));
1947 }
1948
1949 let mut ssl_opts = match self.tls_mode {
1954 MySqlSslMode::Disabled => None,
1955 MySqlSslMode::Required => Some(
1956 mysql_async::SslOpts::default()
1957 .with_danger_accept_invalid_certs(true)
1958 .with_danger_skip_domain_validation(true),
1959 ),
1960 MySqlSslMode::VerifyCa => {
1961 Some(mysql_async::SslOpts::default().with_danger_skip_domain_validation(true))
1962 }
1963 MySqlSslMode::VerifyIdentity => Some(mysql_async::SslOpts::default()),
1964 };
1965
1966 if matches!(
1967 self.tls_mode,
1968 MySqlSslMode::VerifyCa | MySqlSslMode::VerifyIdentity
1969 ) {
1970 if let Some(tls_root_cert) = &self.tls_root_cert {
1971 let tls_root_cert = tls_root_cert.get_string(in_task, secrets_reader).await?;
1972 ssl_opts = ssl_opts.map(|opts| {
1973 opts.with_root_certs(vec![tls_root_cert.as_bytes().to_vec().into()])
1974 });
1975 }
1976 }
1977
1978 if let Some(identity) = &self.tls_identity {
1979 let key = secrets_reader
1980 .read_string_in_task_if(in_task, identity.key)
1981 .await?;
1982 let cert = identity.cert.get_string(in_task, secrets_reader).await?;
1983 let Pkcs12Archive { der, pass } =
1984 mz_tls_util::pkcs12der_from_pem(key.as_bytes(), cert.as_bytes())?;
1985
1986 ssl_opts = ssl_opts.map(|opts| {
1988 opts.with_client_identity(Some(
1989 mysql_async::ClientIdentity::new(der.into()).with_password(pass),
1990 ))
1991 });
1992 }
1993
1994 opts = opts.ssl_opts(ssl_opts);
1995
1996 let tunnel = match &self.tunnel {
1997 Tunnel::Direct => {
1998 let resolved = resolve_address(
2000 &self.host,
2001 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2002 )
2003 .await?;
2004 mz_mysql_util::TunnelConfig::Direct {
2005 resolved_ips: Some(resolved),
2006 }
2007 }
2008 Tunnel::Ssh(SshTunnel {
2009 connection_id,
2010 connection,
2011 }) => {
2012 let secret = secrets_reader
2013 .read_in_task_if(in_task, *connection_id)
2014 .await?;
2015 let key_pair = SshKeyPair::from_bytes(&secret)?;
2016 let resolved = resolve_address(
2018 &connection.host,
2019 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2020 )
2021 .await?;
2022 mz_mysql_util::TunnelConfig::Ssh {
2023 config: SshTunnelConfig {
2024 host: resolved
2025 .iter()
2026 .map(|a| a.to_string())
2027 .collect::<BTreeSet<_>>(),
2028 port: connection.port,
2029 user: connection.user.clone(),
2030 key_pair,
2031 },
2032 }
2033 }
2034 Tunnel::AwsPrivatelink(connection) => {
2035 assert_none!(connection.port);
2036 mz_mysql_util::TunnelConfig::AwsPrivatelink {
2037 connection_id: connection.connection_id,
2038 }
2039 }
2040 };
2041
2042 let aws_config = match self.aws_connection.as_ref() {
2043 None => None,
2044 Some(aws_ref) => Some(
2045 aws_ref
2046 .connection
2047 .load_sdk_config(
2048 &storage_configuration.connection_context,
2049 aws_ref.connection_id,
2050 in_task,
2051 )
2052 .await?,
2053 ),
2054 };
2055
2056 Ok(mz_mysql_util::Config::new(
2057 opts,
2058 tunnel,
2059 storage_configuration.parameters.ssh_timeout_config,
2060 in_task,
2061 storage_configuration
2062 .parameters
2063 .mysql_source_timeouts
2064 .clone(),
2065 aws_config,
2066 )?)
2067 }
2068
2069 pub async fn validate(
2070 &self,
2071 _id: CatalogItemId,
2072 storage_configuration: &StorageConfiguration,
2073 ) -> Result<MySqlConn, MySqlConnectionValidationError> {
2074 let config = self
2075 .config(
2076 &storage_configuration.connection_context.secrets_reader,
2077 storage_configuration,
2078 InTask::No,
2080 )
2081 .await?;
2082 let mut conn = config
2083 .connect(
2084 "connection validation",
2085 &storage_configuration.connection_context.ssh_tunnel_manager,
2086 )
2087 .await?;
2088
2089 let mut setting_errors = vec![];
2091 let gtid_res = mz_mysql_util::ensure_gtid_consistency(&mut conn).await;
2092 let binlog_res = mz_mysql_util::ensure_full_row_binlog_format(&mut conn).await;
2093 let order_res = mz_mysql_util::ensure_replication_commit_order(&mut conn).await;
2094 for res in [gtid_res, binlog_res, order_res] {
2095 match res {
2096 Err(MySqlError::InvalidSystemSetting {
2097 setting,
2098 expected,
2099 actual,
2100 }) => {
2101 setting_errors.push((setting, expected, actual));
2102 }
2103 Err(err) => Err(err)?,
2104 Ok(()) => {}
2105 }
2106 }
2107 if !setting_errors.is_empty() {
2108 Err(MySqlConnectionValidationError::ReplicationSettingsError(
2109 setting_errors,
2110 ))?;
2111 }
2112
2113 Ok(conn)
2114 }
2115}
2116
2117#[derive(Debug, thiserror::Error)]
2118pub enum MySqlConnectionValidationError {
2119 #[error("Invalid MySQL system replication settings")]
2120 ReplicationSettingsError(Vec<(String, String, String)>),
2121 #[error(transparent)]
2122 Client(#[from] MySqlError),
2123 #[error("{}", .0.display_with_causes())]
2124 Other(#[from] anyhow::Error),
2125}
2126
2127impl MySqlConnectionValidationError {
2128 pub fn detail(&self) -> Option<String> {
2129 match self {
2130 Self::ReplicationSettingsError(settings) => Some(format!(
2131 "Invalid MySQL system replication settings: {}",
2132 itertools::join(
2133 settings.iter().map(|(setting, expected, actual)| format!(
2134 "{}: expected {}, got {}",
2135 setting, expected, actual
2136 )),
2137 "; "
2138 )
2139 )),
2140 _ => None,
2141 }
2142 }
2143
2144 pub fn hint(&self) -> Option<String> {
2145 match self {
2146 Self::ReplicationSettingsError(_) => {
2147 Some("Set the necessary MySQL database system settings.".into())
2148 }
2149 _ => None,
2150 }
2151 }
2152}
2153
2154impl<C: ConnectionAccess> AlterCompatible for MySqlConnection<C> {
2155 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2156 let MySqlConnection {
2157 tunnel,
2158 host: _,
2160 port: _,
2161 user: _,
2162 password: _,
2163 tls_mode: _,
2164 tls_root_cert: _,
2165 tls_identity: _,
2166 aws_connection: _,
2167 } = self;
2168
2169 let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
2170
2171 for (compatible, field) in compatibility_checks {
2172 if !compatible {
2173 tracing::warn!(
2174 "MySqlConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2175 self,
2176 other
2177 );
2178
2179 return Err(AlterError { id });
2180 }
2181 }
2182 Ok(())
2183 }
2184}
2185
2186#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2193pub struct SqlServerConnectionDetails<C: ConnectionAccess = InlinedConnection> {
2194 pub host: String,
2196 pub port: u16,
2198 pub database: String,
2200 pub user: StringOrSecret,
2202 pub password: CatalogItemId,
2204 pub tunnel: Tunnel<C>,
2206 pub encryption: mz_sql_server_util::config::EncryptionLevel,
2208 pub certificate_validation_policy: mz_sql_server_util::config::CertificateValidationPolicy,
2210 pub tls_root_cert: Option<StringOrSecret>,
2212}
2213
2214impl<C: ConnectionAccess> SqlServerConnectionDetails<C> {
2215 fn validate_by_default(&self) -> bool {
2216 true
2217 }
2218}
2219
2220impl SqlServerConnectionDetails<InlinedConnection> {
2221 pub async fn validate(
2223 &self,
2224 _id: CatalogItemId,
2225 storage_configuration: &StorageConfiguration,
2226 ) -> Result<mz_sql_server_util::Client, anyhow::Error> {
2227 let config = self
2228 .resolve_config(
2229 &storage_configuration.connection_context.secrets_reader,
2230 storage_configuration,
2231 InTask::No,
2232 )
2233 .await?;
2234 tracing::debug!(?config, "Validating SQL Server connection");
2235
2236 let mut client = mz_sql_server_util::Client::connect(config).await?;
2237
2238 let mut replication_errors = vec![];
2243 for error in [
2244 mz_sql_server_util::inspect::ensure_database_cdc_enabled(&mut client).await,
2245 mz_sql_server_util::inspect::ensure_snapshot_isolation_enabled(&mut client).await,
2246 mz_sql_server_util::inspect::ensure_sql_server_agent_running(&mut client).await,
2247 ] {
2248 match error {
2249 Err(mz_sql_server_util::SqlServerError::InvalidSystemSetting {
2250 name,
2251 expected,
2252 actual,
2253 }) => replication_errors.push((name, expected, actual)),
2254 Err(other) => Err(other)?,
2255 Ok(()) => (),
2256 }
2257 }
2258 if !replication_errors.is_empty() {
2259 Err(SqlServerConnectionValidationError::ReplicationSettingsError(replication_errors))?;
2260 }
2261
2262 Ok(client)
2263 }
2264
2265 pub async fn resolve_config(
2275 &self,
2276 secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
2277 storage_configuration: &StorageConfiguration,
2278 in_task: InTask,
2279 ) -> Result<mz_sql_server_util::Config, anyhow::Error> {
2280 let dyncfg = storage_configuration.config_set();
2281 let mut inner_config = tiberius::Config::new();
2282
2283 inner_config.host(&self.host);
2285 inner_config.port(self.port);
2286 inner_config.database(self.database.clone());
2287 inner_config.encryption(self.encryption.into());
2288 match self.certificate_validation_policy {
2289 mz_sql_server_util::config::CertificateValidationPolicy::TrustAll => {
2290 inner_config.trust_cert()
2291 }
2292 mz_sql_server_util::config::CertificateValidationPolicy::VerifyCA => {
2293 inner_config.trust_cert_ca_pem(
2294 self.tls_root_cert
2295 .as_ref()
2296 .unwrap()
2297 .get_string(in_task, secrets_reader)
2298 .await
2299 .context("ca certificate")?,
2300 );
2301 }
2302 mz_sql_server_util::config::CertificateValidationPolicy::VerifySystem => (), }
2304
2305 inner_config.application_name("materialize");
2306
2307 let user = self
2309 .user
2310 .get_string(in_task, secrets_reader)
2311 .await
2312 .context("username")?;
2313 let password = secrets_reader
2314 .read_string_in_task_if(in_task, self.password)
2315 .await
2316 .context("password")?;
2317 inner_config.authentication(tiberius::AuthMethod::sql_server(user, password));
2320
2321 let enfoce_external_addresses = ENFORCE_EXTERNAL_ADDRESSES.get(dyncfg);
2324
2325 let tunnel = match &self.tunnel {
2326 Tunnel::Direct => mz_sql_server_util::config::TunnelConfig::Direct,
2327 Tunnel::Ssh(SshTunnel {
2328 connection_id,
2329 connection: ssh_connection,
2330 }) => {
2331 let secret = secrets_reader
2332 .read_in_task_if(in_task, *connection_id)
2333 .await
2334 .context("ssh secret")?;
2335 let key_pair = SshKeyPair::from_bytes(&secret).context("ssh key pair")?;
2336 let addresses = resolve_address(&ssh_connection.host, enfoce_external_addresses)
2339 .await
2340 .context("ssh tunnel")?;
2341
2342 let config = SshTunnelConfig {
2343 host: addresses.into_iter().map(|a| a.to_string()).collect(),
2344 port: ssh_connection.port,
2345 user: ssh_connection.user.clone(),
2346 key_pair,
2347 };
2348 mz_sql_server_util::config::TunnelConfig::Ssh {
2349 config,
2350 manager: storage_configuration
2351 .connection_context
2352 .ssh_tunnel_manager
2353 .clone(),
2354 timeout: storage_configuration.parameters.ssh_timeout_config.clone(),
2355 host: self.host.clone(),
2356 port: self.port,
2357 }
2358 }
2359 Tunnel::AwsPrivatelink(private_link_connection) => {
2360 assert_none!(private_link_connection.port);
2361 mz_sql_server_util::config::TunnelConfig::AwsPrivatelink {
2362 connection_id: private_link_connection.connection_id,
2363 port: self.port,
2364 }
2365 }
2366 };
2367
2368 Ok(mz_sql_server_util::Config::new(
2369 inner_config,
2370 tunnel,
2371 in_task,
2372 ))
2373 }
2374}
2375
2376#[derive(Debug, Clone, thiserror::Error)]
2377pub enum SqlServerConnectionValidationError {
2378 #[error("Invalid SQL Server system replication settings")]
2379 ReplicationSettingsError(Vec<(String, String, String)>),
2380}
2381
2382impl SqlServerConnectionValidationError {
2383 pub fn detail(&self) -> Option<String> {
2384 match self {
2385 Self::ReplicationSettingsError(settings) => Some(format!(
2386 "Invalid SQL Server system replication settings: {}",
2387 itertools::join(
2388 settings.iter().map(|(setting, expected, actual)| format!(
2389 "{}: expected {}, got {}",
2390 setting, expected, actual
2391 )),
2392 "; "
2393 )
2394 )),
2395 }
2396 }
2397
2398 pub fn hint(&self) -> Option<String> {
2399 match self {
2400 _ => None,
2401 }
2402 }
2403}
2404
2405impl<R: ConnectionResolver> IntoInlineConnection<SqlServerConnectionDetails, R>
2406 for SqlServerConnectionDetails<ReferencedConnection>
2407{
2408 fn into_inline_connection(self, r: R) -> SqlServerConnectionDetails {
2409 let SqlServerConnectionDetails {
2410 host,
2411 port,
2412 database,
2413 user,
2414 password,
2415 tunnel,
2416 encryption,
2417 certificate_validation_policy,
2418 tls_root_cert,
2419 } = self;
2420
2421 SqlServerConnectionDetails {
2422 host,
2423 port,
2424 database,
2425 user,
2426 password,
2427 tunnel: tunnel.into_inline_connection(&r),
2428 encryption,
2429 certificate_validation_policy,
2430 tls_root_cert,
2431 }
2432 }
2433}
2434
2435impl<C: ConnectionAccess> AlterCompatible for SqlServerConnectionDetails<C> {
2436 fn alter_compatible(
2437 &self,
2438 id: mz_repr::GlobalId,
2439 other: &Self,
2440 ) -> Result<(), crate::controller::AlterError> {
2441 let SqlServerConnectionDetails {
2442 tunnel,
2443 host: _,
2445 port: _,
2446 database: _,
2447 user: _,
2448 password: _,
2449 encryption: _,
2450 certificate_validation_policy: _,
2451 tls_root_cert: _,
2452 } = self;
2453
2454 let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
2455
2456 for (compatible, field) in compatibility_checks {
2457 if !compatible {
2458 tracing::warn!(
2459 "SqlServerConnectionDetails incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2460 self,
2461 other
2462 );
2463
2464 return Err(AlterError { id });
2465 }
2466 }
2467 Ok(())
2468 }
2469}
2470
2471#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2473pub struct SshConnection {
2474 pub host: String,
2475 pub port: u16,
2476 pub user: String,
2477}
2478
2479use self::inline::{
2480 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
2481 ReferencedConnection,
2482};
2483
2484impl AlterCompatible for SshConnection {
2485 fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
2486 Ok(())
2488 }
2489}
2490
2491#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2493pub struct AwsPrivatelink {
2494 pub connection_id: CatalogItemId,
2496 pub availability_zone: Option<String>,
2498 pub port: Option<u16>,
2501}
2502
2503impl AlterCompatible for AwsPrivatelink {
2504 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2505 let AwsPrivatelink {
2506 connection_id,
2507 availability_zone: _,
2508 port: _,
2509 } = self;
2510
2511 let compatibility_checks = [(connection_id == &other.connection_id, "connection_id")];
2512
2513 for (compatible, field) in compatibility_checks {
2514 if !compatible {
2515 tracing::warn!(
2516 "AwsPrivatelink incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2517 self,
2518 other
2519 );
2520
2521 return Err(AlterError { id });
2522 }
2523 }
2524
2525 Ok(())
2526 }
2527}
2528
2529#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2531pub struct SshTunnel<C: ConnectionAccess = InlinedConnection> {
2532 pub connection_id: CatalogItemId,
2534 pub connection: C::Ssh,
2536}
2537
2538impl<R: ConnectionResolver> IntoInlineConnection<SshTunnel, R> for SshTunnel<ReferencedConnection> {
2539 fn into_inline_connection(self, r: R) -> SshTunnel {
2540 let SshTunnel {
2541 connection,
2542 connection_id,
2543 } = self;
2544
2545 SshTunnel {
2546 connection: r.resolve_connection(connection).unwrap_ssh(),
2547 connection_id,
2548 }
2549 }
2550}
2551
2552impl SshTunnel<InlinedConnection> {
2553 async fn connect(
2556 &self,
2557 storage_configuration: &StorageConfiguration,
2558 remote_host: &str,
2559 remote_port: u16,
2560 in_task: InTask,
2561 ) -> Result<ManagedSshTunnelHandle, anyhow::Error> {
2562 let resolved = resolve_address(
2564 &self.connection.host,
2565 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2566 )
2567 .await?;
2568 storage_configuration
2569 .connection_context
2570 .ssh_tunnel_manager
2571 .connect(
2572 SshTunnelConfig {
2573 host: resolved
2574 .iter()
2575 .map(|a| a.to_string())
2576 .collect::<BTreeSet<_>>(),
2577 port: self.connection.port,
2578 user: self.connection.user.clone(),
2579 key_pair: SshKeyPair::from_bytes(
2580 &storage_configuration
2581 .connection_context
2582 .secrets_reader
2583 .read_in_task_if(in_task, self.connection_id)
2584 .await?,
2585 )?,
2586 },
2587 remote_host,
2588 remote_port,
2589 storage_configuration.parameters.ssh_timeout_config,
2590 in_task,
2591 )
2592 .await
2593 }
2594}
2595
2596impl<C: ConnectionAccess> AlterCompatible for SshTunnel<C> {
2597 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2598 let SshTunnel {
2599 connection_id,
2600 connection,
2601 } = self;
2602
2603 let compatibility_checks = [
2604 (connection_id == &other.connection_id, "connection_id"),
2605 (
2606 connection.alter_compatible(id, &other.connection).is_ok(),
2607 "connection",
2608 ),
2609 ];
2610
2611 for (compatible, field) in compatibility_checks {
2612 if !compatible {
2613 tracing::warn!(
2614 "SshTunnel incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2615 self,
2616 other
2617 );
2618
2619 return Err(AlterError { id });
2620 }
2621 }
2622
2623 Ok(())
2624 }
2625}
2626
2627impl SshConnection {
2628 #[allow(clippy::unused_async)]
2629 async fn validate(
2630 &self,
2631 id: CatalogItemId,
2632 storage_configuration: &StorageConfiguration,
2633 ) -> Result<(), anyhow::Error> {
2634 let secret = storage_configuration
2635 .connection_context
2636 .secrets_reader
2637 .read_in_task_if(
2638 InTask::No,
2640 id,
2641 )
2642 .await?;
2643 let key_pair = SshKeyPair::from_bytes(&secret)?;
2644
2645 let resolved = resolve_address(
2647 &self.host,
2648 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2649 )
2650 .await?;
2651
2652 let config = SshTunnelConfig {
2653 host: resolved
2654 .iter()
2655 .map(|a| a.to_string())
2656 .collect::<BTreeSet<_>>(),
2657 port: self.port,
2658 user: self.user.clone(),
2659 key_pair,
2660 };
2661 config
2664 .validate(storage_configuration.parameters.ssh_timeout_config)
2665 .await
2666 }
2667
2668 fn validate_by_default(&self) -> bool {
2669 false
2670 }
2671}
2672
2673impl AwsPrivatelinkConnection {
2674 #[allow(clippy::unused_async)]
2675 async fn validate(
2676 &self,
2677 id: CatalogItemId,
2678 storage_configuration: &StorageConfiguration,
2679 ) -> Result<(), anyhow::Error> {
2680 let Some(ref cloud_resource_reader) = storage_configuration
2681 .connection_context
2682 .cloud_resource_reader
2683 else {
2684 return Err(anyhow!("AWS PrivateLink connections are unsupported"));
2685 };
2686
2687 let status = cloud_resource_reader.read(id).await?;
2689
2690 let availability = status
2691 .conditions
2692 .as_ref()
2693 .and_then(|conditions| conditions.iter().find(|c| c.type_ == "Available"));
2694
2695 match availability {
2696 Some(condition) if condition.status == "True" => Ok(()),
2697 Some(condition) => Err(anyhow!("{}", condition.message)),
2698 None => Err(anyhow!("Endpoint availability is unknown")),
2699 }
2700 }
2701
2702 fn validate_by_default(&self) -> bool {
2703 false
2704 }
2705}