1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::net::SocketAddr;
15use std::sync::Arc;
16
17use anyhow::{Context, anyhow};
18use async_trait::async_trait;
19use aws_credential_types::provider::ProvideCredentials;
20use iceberg::Catalog;
21use iceberg::CatalogBuilder;
22use iceberg::io::{
23 AwsCredential, AwsCredentialLoad, CustomAwsCredentialLoader, S3_ACCESS_KEY_ID,
24 S3_DISABLE_EC2_METADATA, S3_REGION, S3_SECRET_ACCESS_KEY,
25};
26use iceberg_catalog_rest::{
27 REST_CATALOG_PROP_URI, REST_CATALOG_PROP_WAREHOUSE, RestCatalogBuilder,
28};
29use itertools::Itertools;
30use mz_ccsr::tls::{Certificate, Identity};
31use mz_cloud_resources::{AwsExternalIdPrefix, CloudResourceReader, vpc_endpoint_host};
32use mz_dyncfg::ConfigSet;
33use mz_kafka_util::client::{
34 BrokerAddr, BrokerRewrite, MzClientContext, MzKafkaError, TunnelConfig, TunnelingClientContext,
35};
36use mz_mysql_util::{MySqlConn, MySqlError};
37use mz_ore::assert_none;
38use mz_ore::error::ErrorExt;
39use mz_ore::future::{InTask, OreFutureExt};
40use mz_ore::netio::resolve_address;
41use mz_ore::num::NonNeg;
42use mz_repr::{CatalogItemId, GlobalId};
43use mz_secrets::SecretsReader;
44use mz_ssh_util::keys::SshKeyPair;
45use mz_ssh_util::tunnel::SshTunnelConfig;
46use mz_ssh_util::tunnel_manager::{ManagedSshTunnelHandle, SshTunnelManager};
47use mz_tracing::CloneableEnvFilter;
48use rdkafka::ClientContext;
49use rdkafka::config::FromClientConfigAndContext;
50use rdkafka::consumer::{BaseConsumer, Consumer};
51use regex::Regex;
52use serde::{Deserialize, Deserializer, Serialize};
53use tokio::net;
54use tokio::runtime::Handle;
55use tokio_postgres::config::SslMode;
56use tracing::{debug, warn};
57use url::Url;
58
59use crate::AlterCompatible;
60use crate::configuration::StorageConfiguration;
61use crate::connections::aws::{
62 AwsAuth, AwsConnection, AwsConnectionReference, AwsConnectionValidationError,
63};
64use crate::connections::string_or_secret::StringOrSecret;
65use crate::controller::AlterError;
66use crate::dyncfgs::{
67 ENFORCE_EXTERNAL_ADDRESSES, KAFKA_CLIENT_ID_ENRICHMENT_RULES,
68 KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM, KAFKA_RECONNECT_BACKOFF,
69 KAFKA_RECONNECT_BACKOFF_MAX, KAFKA_RETRY_BACKOFF, KAFKA_RETRY_BACKOFF_MAX,
70};
71use crate::errors::{ContextCreationError, CsrConnectError};
72
73pub mod aws;
74pub mod inline;
75pub mod string_or_secret;
76
77const REST_CATALOG_PROP_SCOPE: &str = "scope";
78const REST_CATALOG_PROP_CREDENTIAL: &str = "credential";
79
80struct AwsSdkCredentialLoader {
88 provider: aws_credential_types::provider::SharedCredentialsProvider,
91}
92
93impl AwsSdkCredentialLoader {
94 fn new(provider: aws_credential_types::provider::SharedCredentialsProvider) -> Self {
95 Self { provider }
96 }
97}
98
99#[async_trait]
100impl AwsCredentialLoad for AwsSdkCredentialLoader {
101 async fn load_credential(
102 &self,
103 _client: reqwest::Client,
104 ) -> anyhow::Result<Option<AwsCredential>> {
105 let creds = self
106 .provider
107 .provide_credentials()
108 .await
109 .map_err(|e| {
110 warn!(
111 error = %e.display_with_causes(),
112 "failed to load AWS credentials for Iceberg FileIO from SDK provider"
113 );
114 e
115 })
116 .context(
117 "failed to load AWS credentials from SDK provider for Iceberg FileIO \
118 (credential source may be temporarily unavailable)",
119 )?;
120
121 Ok(Some(AwsCredential {
122 access_key_id: creds.access_key_id().to_string(),
123 secret_access_key: creds.secret_access_key().to_string(),
124 session_token: creds.session_token().map(|s| s.to_string()),
125 expires_in: creds.expiry().map(|t| t.into()),
126 }))
127 }
128}
129
130#[async_trait::async_trait]
132trait SecretsReaderExt {
133 async fn read_in_task_if(
135 &self,
136 in_task: InTask,
137 id: CatalogItemId,
138 ) -> Result<Vec<u8>, anyhow::Error>;
139
140 async fn read_string_in_task_if(
142 &self,
143 in_task: InTask,
144 id: CatalogItemId,
145 ) -> Result<String, anyhow::Error>;
146}
147
148#[async_trait::async_trait]
149impl SecretsReaderExt for Arc<dyn SecretsReader> {
150 async fn read_in_task_if(
151 &self,
152 in_task: InTask,
153 id: CatalogItemId,
154 ) -> Result<Vec<u8>, anyhow::Error> {
155 let sr = Arc::clone(self);
156 async move { sr.read(id).await }
157 .run_in_task_if(in_task, || "secrets_reader_read".to_string())
158 .await
159 }
160 async fn read_string_in_task_if(
161 &self,
162 in_task: InTask,
163 id: CatalogItemId,
164 ) -> Result<String, anyhow::Error> {
165 let sr = Arc::clone(self);
166 async move { sr.read_string(id).await }
167 .run_in_task_if(in_task, || "secrets_reader_read".to_string())
168 .await
169 }
170}
171
172#[derive(Debug, Clone)]
177pub struct ConnectionContext {
178 pub environment_id: String,
185 pub librdkafka_log_level: tracing::Level,
187 pub aws_external_id_prefix: Option<AwsExternalIdPrefix>,
189 pub aws_connection_role_arn: Option<String>,
192 pub secrets_reader: Arc<dyn SecretsReader>,
194 pub cloud_resource_reader: Option<Arc<dyn CloudResourceReader>>,
196 pub ssh_tunnel_manager: SshTunnelManager,
198}
199
200impl ConnectionContext {
201 pub fn from_cli_args(
209 environment_id: String,
210 startup_log_level: &CloneableEnvFilter,
211 aws_external_id_prefix: Option<AwsExternalIdPrefix>,
212 aws_connection_role_arn: Option<String>,
213 secrets_reader: Arc<dyn SecretsReader>,
214 cloud_resource_reader: Option<Arc<dyn CloudResourceReader>>,
215 ) -> ConnectionContext {
216 ConnectionContext {
217 environment_id,
218 librdkafka_log_level: mz_ore::tracing::crate_level(
219 &startup_log_level.clone().into(),
220 "librdkafka",
221 ),
222 aws_external_id_prefix,
223 aws_connection_role_arn,
224 secrets_reader,
225 cloud_resource_reader,
226 ssh_tunnel_manager: SshTunnelManager::default(),
227 }
228 }
229
230 pub fn for_tests(secrets_reader: Arc<dyn SecretsReader>) -> ConnectionContext {
232 ConnectionContext {
233 environment_id: "test-environment-id".into(),
234 librdkafka_log_level: tracing::Level::INFO,
235 aws_external_id_prefix: Some(
236 AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable(
237 "test-aws-external-id-prefix",
238 )
239 .expect("infallible"),
240 ),
241 aws_connection_role_arn: Some(
242 "arn:aws:iam::123456789000:role/MaterializeConnection".into(),
243 ),
244 secrets_reader,
245 cloud_resource_reader: None,
246 ssh_tunnel_manager: SshTunnelManager::default(),
247 }
248 }
249}
250
251#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
252pub enum Connection<C: ConnectionAccess = InlinedConnection> {
253 Kafka(KafkaConnection<C>),
254 Csr(CsrConnection<C>),
255 Postgres(PostgresConnection<C>),
256 Ssh(SshConnection),
257 Aws(AwsConnection),
258 AwsPrivatelink(AwsPrivatelinkConnection),
259 MySql(MySqlConnection<C>),
260 SqlServer(SqlServerConnectionDetails<C>),
261 IcebergCatalog(IcebergCatalogConnection<C>),
262}
263
264impl<R: ConnectionResolver> IntoInlineConnection<Connection, R>
265 for Connection<ReferencedConnection>
266{
267 fn into_inline_connection(self, r: R) -> Connection {
268 match self {
269 Connection::Kafka(kafka) => Connection::Kafka(kafka.into_inline_connection(r)),
270 Connection::Csr(csr) => Connection::Csr(csr.into_inline_connection(r)),
271 Connection::Postgres(pg) => Connection::Postgres(pg.into_inline_connection(r)),
272 Connection::Ssh(ssh) => Connection::Ssh(ssh),
273 Connection::Aws(aws) => Connection::Aws(aws),
274 Connection::AwsPrivatelink(awspl) => Connection::AwsPrivatelink(awspl),
275 Connection::MySql(mysql) => Connection::MySql(mysql.into_inline_connection(r)),
276 Connection::SqlServer(sql_server) => {
277 Connection::SqlServer(sql_server.into_inline_connection(r))
278 }
279 Connection::IcebergCatalog(iceberg) => {
280 Connection::IcebergCatalog(iceberg.into_inline_connection(r))
281 }
282 }
283 }
284}
285
286impl<C: ConnectionAccess> Connection<C> {
287 pub fn validate_by_default(&self) -> bool {
289 match self {
290 Connection::Kafka(conn) => conn.validate_by_default(),
291 Connection::Csr(conn) => conn.validate_by_default(),
292 Connection::Postgres(conn) => conn.validate_by_default(),
293 Connection::Ssh(conn) => conn.validate_by_default(),
294 Connection::Aws(conn) => conn.validate_by_default(),
295 Connection::AwsPrivatelink(conn) => conn.validate_by_default(),
296 Connection::MySql(conn) => conn.validate_by_default(),
297 Connection::SqlServer(conn) => conn.validate_by_default(),
298 Connection::IcebergCatalog(conn) => conn.validate_by_default(),
299 }
300 }
301}
302
303impl Connection<InlinedConnection> {
304 pub async fn validate(
306 &self,
307 id: CatalogItemId,
308 storage_configuration: &StorageConfiguration,
309 ) -> Result<(), ConnectionValidationError> {
310 match self {
311 Connection::Kafka(conn) => conn.validate(id, storage_configuration).await?,
312 Connection::Csr(conn) => conn.validate(id, storage_configuration).await?,
313 Connection::Postgres(conn) => {
314 conn.validate(id, storage_configuration).await?;
315 }
316 Connection::Ssh(conn) => conn.validate(id, storage_configuration).await?,
317 Connection::Aws(conn) => conn.validate(id, storage_configuration).await?,
318 Connection::AwsPrivatelink(conn) => conn.validate(id, storage_configuration).await?,
319 Connection::MySql(conn) => {
320 conn.validate(id, storage_configuration).await?;
321 }
322 Connection::SqlServer(conn) => {
323 conn.validate(id, storage_configuration).await?;
324 }
325 Connection::IcebergCatalog(conn) => conn.validate(id, storage_configuration).await?,
326 }
327 Ok(())
328 }
329
330 pub fn unwrap_kafka(self) -> <InlinedConnection as ConnectionAccess>::Kafka {
331 match self {
332 Self::Kafka(conn) => conn,
333 o => unreachable!("{o:?} is not a Kafka connection"),
334 }
335 }
336
337 pub fn unwrap_pg(self) -> <InlinedConnection as ConnectionAccess>::Pg {
338 match self {
339 Self::Postgres(conn) => conn,
340 o => unreachable!("{o:?} is not a Postgres connection"),
341 }
342 }
343
344 pub fn unwrap_mysql(self) -> <InlinedConnection as ConnectionAccess>::MySql {
345 match self {
346 Self::MySql(conn) => conn,
347 o => unreachable!("{o:?} is not a MySQL connection"),
348 }
349 }
350
351 pub fn unwrap_sql_server(self) -> <InlinedConnection as ConnectionAccess>::SqlServer {
352 match self {
353 Self::SqlServer(conn) => conn,
354 o => unreachable!("{o:?} is not a SQL Server connection"),
355 }
356 }
357
358 pub fn unwrap_aws(self) -> <InlinedConnection as ConnectionAccess>::Aws {
359 match self {
360 Self::Aws(conn) => conn,
361 o => unreachable!("{o:?} is not an AWS connection"),
362 }
363 }
364
365 pub fn unwrap_ssh(self) -> <InlinedConnection as ConnectionAccess>::Ssh {
366 match self {
367 Self::Ssh(conn) => conn,
368 o => unreachable!("{o:?} is not an SSH connection"),
369 }
370 }
371
372 pub fn unwrap_csr(self) -> <InlinedConnection as ConnectionAccess>::Csr {
373 match self {
374 Self::Csr(conn) => conn,
375 o => unreachable!("{o:?} is not a Kafka connection"),
376 }
377 }
378
379 pub fn unwrap_iceberg_catalog(self) -> <InlinedConnection as ConnectionAccess>::IcebergCatalog {
380 match self {
381 Self::IcebergCatalog(conn) => conn,
382 o => unreachable!("{o:?} is not an Iceberg catalog connection"),
383 }
384 }
385}
386
387#[derive(thiserror::Error, Debug)]
389pub enum ConnectionValidationError {
390 #[error(transparent)]
391 Postgres(#[from] PostgresConnectionValidationError),
392 #[error(transparent)]
393 MySql(#[from] MySqlConnectionValidationError),
394 #[error(transparent)]
395 SqlServer(#[from] SqlServerConnectionValidationError),
396 #[error(transparent)]
397 Aws(#[from] AwsConnectionValidationError),
398 #[error("{}", .0.display_with_causes())]
399 Other(#[from] anyhow::Error),
400}
401
402impl ConnectionValidationError {
403 pub fn detail(&self) -> Option<String> {
405 match self {
406 ConnectionValidationError::Postgres(e) => e.detail(),
407 ConnectionValidationError::MySql(e) => e.detail(),
408 ConnectionValidationError::SqlServer(e) => e.detail(),
409 ConnectionValidationError::Aws(e) => e.detail(),
410 ConnectionValidationError::Other(_) => None,
411 }
412 }
413
414 pub fn hint(&self) -> Option<String> {
416 match self {
417 ConnectionValidationError::Postgres(e) => e.hint(),
418 ConnectionValidationError::MySql(e) => e.hint(),
419 ConnectionValidationError::SqlServer(e) => e.hint(),
420 ConnectionValidationError::Aws(e) => e.hint(),
421 ConnectionValidationError::Other(_) => None,
422 }
423 }
424}
425
426impl<C: ConnectionAccess> AlterCompatible for Connection<C> {
427 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
428 match (self, other) {
429 (Self::Aws(s), Self::Aws(o)) => s.alter_compatible(id, o),
430 (Self::AwsPrivatelink(s), Self::AwsPrivatelink(o)) => s.alter_compatible(id, o),
431 (Self::Ssh(s), Self::Ssh(o)) => s.alter_compatible(id, o),
432 (Self::Csr(s), Self::Csr(o)) => s.alter_compatible(id, o),
433 (Self::Kafka(s), Self::Kafka(o)) => s.alter_compatible(id, o),
434 (Self::Postgres(s), Self::Postgres(o)) => s.alter_compatible(id, o),
435 (Self::MySql(s), Self::MySql(o)) => s.alter_compatible(id, o),
436 _ => {
437 tracing::warn!(
438 "Connection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
439 self,
440 other
441 );
442 Err(AlterError { id })
443 }
444 }
445 }
446}
447
448#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
449pub struct RestIcebergCatalog {
450 pub credential: StringOrSecret,
452 pub scope: Option<String>,
454 pub warehouse: Option<String>,
456}
457
458#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
459pub struct S3TablesRestIcebergCatalog<C: ConnectionAccess = InlinedConnection> {
460 pub aws_connection: AwsConnectionReference<C>,
462 pub warehouse: String,
464}
465
466impl<R: ConnectionResolver> IntoInlineConnection<S3TablesRestIcebergCatalog, R>
467 for S3TablesRestIcebergCatalog<ReferencedConnection>
468{
469 fn into_inline_connection(self, r: R) -> S3TablesRestIcebergCatalog {
470 S3TablesRestIcebergCatalog {
471 aws_connection: self.aws_connection.into_inline_connection(&r),
472 warehouse: self.warehouse,
473 }
474 }
475}
476
477#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
478pub enum IcebergCatalogType {
479 Rest,
480 S3TablesRest,
481}
482
483#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
484pub enum IcebergCatalogImpl<C: ConnectionAccess = InlinedConnection> {
485 Rest(RestIcebergCatalog),
486 S3TablesRest(S3TablesRestIcebergCatalog<C>),
487}
488
489impl<R: ConnectionResolver> IntoInlineConnection<IcebergCatalogImpl, R>
490 for IcebergCatalogImpl<ReferencedConnection>
491{
492 fn into_inline_connection(self, r: R) -> IcebergCatalogImpl {
493 match self {
494 IcebergCatalogImpl::Rest(rest) => IcebergCatalogImpl::Rest(rest),
495 IcebergCatalogImpl::S3TablesRest(s3tables) => {
496 IcebergCatalogImpl::S3TablesRest(s3tables.into_inline_connection(r))
497 }
498 }
499 }
500}
501
502#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
503pub struct IcebergCatalogConnection<C: ConnectionAccess = InlinedConnection> {
504 pub catalog: IcebergCatalogImpl<C>,
506 pub uri: reqwest::Url,
508}
509
510impl AlterCompatible for IcebergCatalogConnection {
511 fn alter_compatible(&self, id: GlobalId, _other: &Self) -> Result<(), AlterError> {
512 Err(AlterError { id })
513 }
514}
515
516impl<R: ConnectionResolver> IntoInlineConnection<IcebergCatalogConnection, R>
517 for IcebergCatalogConnection<ReferencedConnection>
518{
519 fn into_inline_connection(self, r: R) -> IcebergCatalogConnection {
520 IcebergCatalogConnection {
521 catalog: self.catalog.into_inline_connection(&r),
522 uri: self.uri,
523 }
524 }
525}
526
527impl<C: ConnectionAccess> IcebergCatalogConnection<C> {
528 fn validate_by_default(&self) -> bool {
529 true
530 }
531}
532
533impl IcebergCatalogConnection<InlinedConnection> {
534 pub async fn connect(
535 &self,
536 storage_configuration: &StorageConfiguration,
537 in_task: InTask,
538 ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
539 match self.catalog {
540 IcebergCatalogImpl::S3TablesRest(ref s3tables) => {
541 self.connect_s3tables(s3tables, storage_configuration, in_task)
542 .await
543 }
544 IcebergCatalogImpl::Rest(ref rest) => {
545 self.connect_rest(rest, storage_configuration, in_task)
546 .await
547 }
548 }
549 }
550
551 pub fn catalog_type(&self) -> IcebergCatalogType {
552 match self.catalog {
553 IcebergCatalogImpl::S3TablesRest(_) => IcebergCatalogType::S3TablesRest,
554 IcebergCatalogImpl::Rest(_) => IcebergCatalogType::Rest,
555 }
556 }
557
558 pub fn s3tables_catalog(&self) -> Option<&S3TablesRestIcebergCatalog> {
559 match &self.catalog {
560 IcebergCatalogImpl::S3TablesRest(s3tables) => Some(s3tables),
561 IcebergCatalogImpl::Rest(_) => None,
562 }
563 }
564
565 pub fn rest_catalog(&self) -> Option<&RestIcebergCatalog> {
566 match &self.catalog {
567 IcebergCatalogImpl::Rest(rest) => Some(rest),
568 IcebergCatalogImpl::S3TablesRest(_) => None,
569 }
570 }
571
572 async fn connect_s3tables(
573 &self,
574 s3tables: &S3TablesRestIcebergCatalog,
575 storage_configuration: &StorageConfiguration,
576 in_task: InTask,
577 ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
578 let secret_reader = &storage_configuration.connection_context.secrets_reader;
579 let aws_ref = &s3tables.aws_connection;
580 let aws_config = aws_ref
581 .connection
582 .load_sdk_config(
583 &storage_configuration.connection_context,
584 aws_ref.connection_id,
585 in_task,
586 )
587 .await
588 .with_context(|| {
589 format!(
590 "failed to load AWS SDK config for S3 Tables Iceberg catalog \
591 (connection id: {}, auth method: {}, catalog uri: {}, warehouse: {})",
592 aws_ref.connection_id,
593 aws_ref.connection.auth_method(),
594 self.uri,
595 s3tables.warehouse
596 )
597 })?;
598
599 let aws_region = aws_ref
600 .connection
601 .region
602 .clone()
603 .unwrap_or_else(|| "us-east-1".to_string());
604
605 let mut props = vec![
606 (S3_REGION.to_string(), aws_region),
607 (S3_DISABLE_EC2_METADATA.to_string(), "true".to_string()),
608 (
609 REST_CATALOG_PROP_WAREHOUSE.to_string(),
610 s3tables.warehouse.clone(),
611 ),
612 (REST_CATALOG_PROP_URI.to_string(), self.uri.to_string()),
613 ];
614
615 let aws_auth = aws_ref.connection.auth.clone();
616
617 if let AwsAuth::Credentials(creds) = &aws_auth {
618 props.push((
619 S3_ACCESS_KEY_ID.to_string(),
620 creds
621 .access_key_id
622 .get_string(in_task, secret_reader)
623 .await?,
624 ));
625 props.push((
626 S3_SECRET_ACCESS_KEY.to_string(),
627 secret_reader.read_string(creds.secret_access_key).await?,
628 ));
629 }
630
631 let catalog = RestCatalogBuilder::default()
635 .with_aws_config(aws_config.clone())
636 .load("IcebergCatalog", props.into_iter().collect())
637 .await
638 .with_context(|| {
639 format!(
640 "failed to create S3 Tables Iceberg catalog \
641 (connection id: {}, catalog uri: {}, warehouse: {})",
642 aws_ref.connection_id, self.uri, s3tables.warehouse
643 )
644 })?;
645
646 let catalog = if matches!(aws_auth, AwsAuth::AssumeRole(_)) {
647 let credentials_provider = aws_config
648 .credentials_provider()
649 .ok_or_else(|| anyhow!("aws_config missing credentials provider"))?;
650 let file_io_loader = CustomAwsCredentialLoader::new(Arc::new(
651 AwsSdkCredentialLoader::new(credentials_provider),
652 ));
653 catalog.with_file_io_extension(file_io_loader)
654 } else {
655 catalog
656 };
657
658 Ok(Arc::new(catalog))
659 }
660
661 async fn connect_rest(
662 &self,
663 rest: &RestIcebergCatalog,
664 storage_configuration: &StorageConfiguration,
665 in_task: InTask,
666 ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
667 let mut props = BTreeMap::from([(
668 REST_CATALOG_PROP_URI.to_string(),
669 self.uri.to_string().clone(),
670 )]);
671
672 if let Some(warehouse) = &rest.warehouse {
673 props.insert(REST_CATALOG_PROP_WAREHOUSE.to_string(), warehouse.clone());
674 }
675
676 let credential = rest
677 .credential
678 .get_string(
679 in_task,
680 &storage_configuration.connection_context.secrets_reader,
681 )
682 .await
683 .map_err(|e| anyhow!("failed to read Iceberg catalog credential: {e}"))?;
684 props.insert(REST_CATALOG_PROP_CREDENTIAL.to_string(), credential);
685
686 if let Some(scope) = &rest.scope {
687 props.insert(REST_CATALOG_PROP_SCOPE.to_string(), scope.clone());
688 }
689
690 let catalog = RestCatalogBuilder::default()
691 .load("IcebergCatalog", props.into_iter().collect())
692 .await
693 .map_err(|e| anyhow!("failed to create Iceberg catalog: {e}"))?;
694 Ok(Arc::new(catalog))
695 }
696
697 async fn validate(
698 &self,
699 _id: CatalogItemId,
700 storage_configuration: &StorageConfiguration,
701 ) -> Result<(), ConnectionValidationError> {
702 let catalog = self
703 .connect(storage_configuration, InTask::No)
704 .await
705 .map_err(|e| {
706 ConnectionValidationError::Other(anyhow!("failed to connect to catalog: {e}"))
707 })?;
708
709 catalog.list_namespaces(None).await.map_err(|e| {
711 ConnectionValidationError::Other(anyhow!("failed to list namespaces: {e}"))
712 })?;
713
714 Ok(())
715 }
716}
717
718#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
719pub struct AwsPrivatelinkConnection {
720 pub service_name: String,
721 pub availability_zones: Vec<String>,
722}
723
724impl AlterCompatible for AwsPrivatelinkConnection {
725 fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
726 Ok(())
728 }
729}
730
731#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
732pub struct KafkaTlsConfig {
733 pub identity: Option<TlsIdentity>,
734 pub root_cert: Option<StringOrSecret>,
735}
736
737#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
738pub struct KafkaSaslConfig<C: ConnectionAccess = InlinedConnection> {
739 pub mechanism: String,
740 pub username: StringOrSecret,
741 pub password: Option<CatalogItemId>,
742 pub aws: Option<AwsConnectionReference<C>>,
743}
744
745impl<R: ConnectionResolver> IntoInlineConnection<KafkaSaslConfig, R>
746 for KafkaSaslConfig<ReferencedConnection>
747{
748 fn into_inline_connection(self, r: R) -> KafkaSaslConfig {
749 KafkaSaslConfig {
750 mechanism: self.mechanism,
751 username: self.username,
752 password: self.password,
753 aws: self.aws.map(|aws| aws.into_inline_connection(&r)),
754 }
755 }
756}
757
758#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
760pub struct KafkaBroker<C: ConnectionAccess = InlinedConnection> {
761 pub address: String,
763 pub tunnel: Tunnel<C>,
765}
766
767impl<R: ConnectionResolver> IntoInlineConnection<KafkaBroker, R>
768 for KafkaBroker<ReferencedConnection>
769{
770 fn into_inline_connection(self, r: R) -> KafkaBroker {
771 let KafkaBroker { address, tunnel } = self;
772 KafkaBroker {
773 address,
774 tunnel: tunnel.into_inline_connection(r),
775 }
776 }
777}
778
779#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Default)]
780pub struct KafkaTopicOptions {
781 pub replication_factor: Option<NonNeg<i32>>,
784 pub partition_count: Option<NonNeg<i32>>,
787 pub topic_config: BTreeMap<String, String>,
789}
790
791#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
792pub struct KafkaConnection<C: ConnectionAccess = InlinedConnection> {
793 pub brokers: Vec<KafkaBroker<C>>,
794 pub default_tunnel: Tunnel<C>,
798 pub progress_topic: Option<String>,
799 pub progress_topic_options: KafkaTopicOptions,
800 pub options: BTreeMap<String, StringOrSecret>,
801 pub tls: Option<KafkaTlsConfig>,
802 pub sasl: Option<KafkaSaslConfig<C>>,
803}
804
805impl<R: ConnectionResolver> IntoInlineConnection<KafkaConnection, R>
806 for KafkaConnection<ReferencedConnection>
807{
808 fn into_inline_connection(self, r: R) -> KafkaConnection {
809 let KafkaConnection {
810 brokers,
811 progress_topic,
812 progress_topic_options,
813 default_tunnel,
814 options,
815 tls,
816 sasl,
817 } = self;
818
819 let brokers = brokers
820 .into_iter()
821 .map(|broker| broker.into_inline_connection(&r))
822 .collect();
823
824 KafkaConnection {
825 brokers,
826 progress_topic,
827 progress_topic_options,
828 default_tunnel: default_tunnel.into_inline_connection(&r),
829 options,
830 tls,
831 sasl: sasl.map(|sasl| sasl.into_inline_connection(&r)),
832 }
833 }
834}
835
836impl<C: ConnectionAccess> KafkaConnection<C> {
837 pub fn progress_topic(
842 &self,
843 connection_context: &ConnectionContext,
844 connection_id: CatalogItemId,
845 ) -> Cow<'_, str> {
846 if let Some(progress_topic) = &self.progress_topic {
847 Cow::Borrowed(progress_topic)
848 } else {
849 Cow::Owned(format!(
850 "_materialize-progress-{}-{}",
851 connection_context.environment_id, connection_id,
852 ))
853 }
854 }
855
856 fn validate_by_default(&self) -> bool {
857 true
858 }
859}
860
861impl KafkaConnection {
862 pub fn id_base(
866 connection_context: &ConnectionContext,
867 connection_id: CatalogItemId,
868 object_id: GlobalId,
869 ) -> String {
870 format!(
871 "materialize-{}-{}-{}",
872 connection_context.environment_id, connection_id, object_id,
873 )
874 }
875
876 pub fn enrich_client_id(&self, configs: &ConfigSet, client_id: &mut String) {
879 #[derive(Debug, Deserialize)]
880 struct EnrichmentRule {
881 #[serde(deserialize_with = "deserialize_regex")]
882 pattern: Regex,
883 payload: String,
884 }
885
886 fn deserialize_regex<'de, D>(deserializer: D) -> Result<Regex, D::Error>
887 where
888 D: Deserializer<'de>,
889 {
890 let buf = String::deserialize(deserializer)?;
891 Regex::new(&buf).map_err(serde::de::Error::custom)
892 }
893
894 let rules = KAFKA_CLIENT_ID_ENRICHMENT_RULES.get(configs);
895 let rules = match serde_json::from_value::<Vec<EnrichmentRule>>(rules) {
896 Ok(rules) => rules,
897 Err(e) => {
898 warn!(%e, "failed to decode kafka_client_id_enrichment_rules");
899 return;
900 }
901 };
902
903 debug!(?self.brokers, "evaluating client ID enrichment rules");
908 for rule in rules {
909 let is_match = self
910 .brokers
911 .iter()
912 .any(|b| rule.pattern.is_match(&b.address));
913 debug!(?rule, is_match, "evaluated client ID enrichment rule");
914 if is_match {
915 client_id.push('-');
916 client_id.push_str(&rule.payload);
917 }
918 }
919 }
920
921 pub async fn create_with_context<C, T>(
923 &self,
924 storage_configuration: &StorageConfiguration,
925 context: C,
926 extra_options: &BTreeMap<&str, String>,
927 in_task: InTask,
928 ) -> Result<T, ContextCreationError>
929 where
930 C: ClientContext,
931 T: FromClientConfigAndContext<TunnelingClientContext<C>>,
932 {
933 let mut options = self.options.clone();
934
935 options.insert("allow.auto.create.topics".into(), "false".into());
940
941 let brokers = match &self.default_tunnel {
942 Tunnel::AwsPrivatelink(t) => {
943 assert!(&self.brokers.is_empty());
944
945 let algo = KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM
946 .get(storage_configuration.config_set());
947 options.insert("ssl.endpoint.identification.algorithm".into(), algo.into());
948
949 format!(
952 "{}:{}",
953 vpc_endpoint_host(
954 t.connection_id,
955 None, ),
957 t.port.unwrap_or(9092)
958 )
959 }
960 _ => self.brokers.iter().map(|b| &b.address).join(","),
961 };
962 options.insert("bootstrap.servers".into(), brokers.into());
963 let security_protocol = match (self.tls.is_some(), self.sasl.is_some()) {
964 (false, false) => "PLAINTEXT",
965 (true, false) => "SSL",
966 (false, true) => "SASL_PLAINTEXT",
967 (true, true) => "SASL_SSL",
968 };
969 options.insert("security.protocol".into(), security_protocol.into());
970 if let Some(tls) = &self.tls {
971 if let Some(root_cert) = &tls.root_cert {
972 options.insert("ssl.ca.pem".into(), root_cert.clone());
973 }
974 if let Some(identity) = &tls.identity {
975 options.insert("ssl.key.pem".into(), StringOrSecret::Secret(identity.key));
976 options.insert("ssl.certificate.pem".into(), identity.cert.clone());
977 }
978 }
979 if let Some(sasl) = &self.sasl {
980 options.insert("sasl.mechanisms".into(), (&sasl.mechanism).into());
981 options.insert("sasl.username".into(), sasl.username.clone());
982 if let Some(password) = sasl.password {
983 options.insert("sasl.password".into(), StringOrSecret::Secret(password));
984 }
985 }
986
987 options.insert(
988 "retry.backoff.ms".into(),
989 KAFKA_RETRY_BACKOFF
990 .get(storage_configuration.config_set())
991 .as_millis()
992 .into(),
993 );
994 options.insert(
995 "retry.backoff.max.ms".into(),
996 KAFKA_RETRY_BACKOFF_MAX
997 .get(storage_configuration.config_set())
998 .as_millis()
999 .into(),
1000 );
1001 options.insert(
1002 "reconnect.backoff.ms".into(),
1003 KAFKA_RECONNECT_BACKOFF
1004 .get(storage_configuration.config_set())
1005 .as_millis()
1006 .into(),
1007 );
1008 options.insert(
1009 "reconnect.backoff.max.ms".into(),
1010 KAFKA_RECONNECT_BACKOFF_MAX
1011 .get(storage_configuration.config_set())
1012 .as_millis()
1013 .into(),
1014 );
1015
1016 let mut config = mz_kafka_util::client::create_new_client_config(
1017 storage_configuration
1018 .connection_context
1019 .librdkafka_log_level,
1020 storage_configuration.parameters.kafka_timeout_config,
1021 );
1022 for (k, v) in options {
1023 config.set(
1024 k,
1025 v.get_string(
1026 in_task,
1027 &storage_configuration.connection_context.secrets_reader,
1028 )
1029 .await
1030 .context("reading kafka secret")?,
1031 );
1032 }
1033 for (k, v) in extra_options {
1034 config.set(*k, v);
1035 }
1036
1037 let aws_config = match self.sasl.as_ref().and_then(|sasl| sasl.aws.as_ref()) {
1038 None => None,
1039 Some(aws) => Some(
1040 aws.connection
1041 .load_sdk_config(
1042 &storage_configuration.connection_context,
1043 aws.connection_id,
1044 in_task,
1045 )
1046 .await?,
1047 ),
1048 };
1049
1050 let mut context = TunnelingClientContext::new(
1054 context,
1055 Handle::current(),
1056 storage_configuration
1057 .connection_context
1058 .ssh_tunnel_manager
1059 .clone(),
1060 storage_configuration.parameters.ssh_timeout_config,
1061 aws_config,
1062 in_task,
1063 );
1064
1065 match &self.default_tunnel {
1066 Tunnel::Direct => {
1067 }
1069 Tunnel::AwsPrivatelink(pl) => {
1070 context.set_default_tunnel(TunnelConfig::StaticHost(vpc_endpoint_host(
1071 pl.connection_id,
1072 None, )));
1074 }
1075 Tunnel::Ssh(ssh_tunnel) => {
1076 let secret = storage_configuration
1077 .connection_context
1078 .secrets_reader
1079 .read_in_task_if(in_task, ssh_tunnel.connection_id)
1080 .await?;
1081 let key_pair = SshKeyPair::from_bytes(&secret)?;
1082
1083 let resolved = resolve_address(
1085 &ssh_tunnel.connection.host,
1086 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1087 )
1088 .await?;
1089 context.set_default_tunnel(TunnelConfig::Ssh(SshTunnelConfig {
1090 host: resolved
1091 .iter()
1092 .map(|a| a.to_string())
1093 .collect::<BTreeSet<_>>(),
1094 port: ssh_tunnel.connection.port,
1095 user: ssh_tunnel.connection.user.clone(),
1096 key_pair,
1097 }));
1098 }
1099 }
1100
1101 for broker in &self.brokers {
1102 let mut addr_parts = broker.address.splitn(2, ':');
1103 let addr = BrokerAddr {
1104 host: addr_parts
1105 .next()
1106 .context("BROKER is not address:port")?
1107 .into(),
1108 port: addr_parts
1109 .next()
1110 .unwrap_or("9092")
1111 .parse()
1112 .context("parsing BROKER port")?,
1113 };
1114 match &broker.tunnel {
1115 Tunnel::Direct => {
1116 }
1126 Tunnel::AwsPrivatelink(aws_privatelink) => {
1127 let host = mz_cloud_resources::vpc_endpoint_host(
1128 aws_privatelink.connection_id,
1129 aws_privatelink.availability_zone.as_deref(),
1130 );
1131 let port = aws_privatelink.port;
1132 context.add_broker_rewrite(
1133 addr,
1134 BrokerRewrite {
1135 host: host.clone(),
1136 port,
1137 },
1138 );
1139 }
1140 Tunnel::Ssh(ssh_tunnel) => {
1141 let ssh_host_resolved = resolve_address(
1143 &ssh_tunnel.connection.host,
1144 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1145 )
1146 .await?;
1147 context
1148 .add_ssh_tunnel(
1149 addr,
1150 SshTunnelConfig {
1151 host: ssh_host_resolved
1152 .iter()
1153 .map(|a| a.to_string())
1154 .collect::<BTreeSet<_>>(),
1155 port: ssh_tunnel.connection.port,
1156 user: ssh_tunnel.connection.user.clone(),
1157 key_pair: SshKeyPair::from_bytes(
1158 &storage_configuration
1159 .connection_context
1160 .secrets_reader
1161 .read_in_task_if(in_task, ssh_tunnel.connection_id)
1162 .await?,
1163 )?,
1164 },
1165 )
1166 .await
1167 .map_err(ContextCreationError::Ssh)?;
1168 }
1169 }
1170 }
1171
1172 Ok(config.create_with_context(context)?)
1173 }
1174
1175 async fn validate(
1176 &self,
1177 _id: CatalogItemId,
1178 storage_configuration: &StorageConfiguration,
1179 ) -> Result<(), anyhow::Error> {
1180 let (context, error_rx) = MzClientContext::with_errors();
1181 let consumer: BaseConsumer<_> = self
1182 .create_with_context(
1183 storage_configuration,
1184 context,
1185 &BTreeMap::new(),
1186 InTask::No,
1188 )
1189 .await?;
1190 let consumer = Arc::new(consumer);
1191
1192 let timeout = storage_configuration
1193 .parameters
1194 .kafka_timeout_config
1195 .fetch_metadata_timeout;
1196
1197 let result = mz_ore::task::spawn_blocking(|| "kafka_get_metadata", {
1208 let consumer = Arc::clone(&consumer);
1209 move || consumer.fetch_metadata(None, timeout)
1210 })
1211 .await;
1212 match result {
1213 Ok(_) => Ok(()),
1214 Err(err) => {
1219 let main_err = error_rx.try_iter().reduce(|cur, new| match cur {
1223 MzKafkaError::Internal(_) => new,
1224 _ => cur,
1225 });
1226
1227 drop(consumer);
1231
1232 match main_err {
1233 Some(err) => Err(err.into()),
1234 None => Err(err.into()),
1235 }
1236 }
1237 }
1238 }
1239}
1240
1241impl<C: ConnectionAccess> AlterCompatible for KafkaConnection<C> {
1242 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1243 let KafkaConnection {
1244 brokers: _,
1245 default_tunnel: _,
1246 progress_topic,
1247 progress_topic_options,
1248 options: _,
1249 tls: _,
1250 sasl: _,
1251 } = self;
1252
1253 let compatibility_checks = [
1254 (progress_topic == &other.progress_topic, "progress_topic"),
1255 (
1256 progress_topic_options == &other.progress_topic_options,
1257 "progress_topic_options",
1258 ),
1259 ];
1260
1261 for (compatible, field) in compatibility_checks {
1262 if !compatible {
1263 tracing::warn!(
1264 "KafkaConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1265 self,
1266 other
1267 );
1268
1269 return Err(AlterError { id });
1270 }
1271 }
1272
1273 Ok(())
1274 }
1275}
1276
1277#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1279pub struct CsrConnection<C: ConnectionAccess = InlinedConnection> {
1280 pub url: Url,
1282 pub tls_root_cert: Option<StringOrSecret>,
1284 pub tls_identity: Option<TlsIdentity>,
1287 pub http_auth: Option<CsrConnectionHttpAuth>,
1289 pub tunnel: Tunnel<C>,
1291}
1292
1293impl<R: ConnectionResolver> IntoInlineConnection<CsrConnection, R>
1294 for CsrConnection<ReferencedConnection>
1295{
1296 fn into_inline_connection(self, r: R) -> CsrConnection {
1297 let CsrConnection {
1298 url,
1299 tls_root_cert,
1300 tls_identity,
1301 http_auth,
1302 tunnel,
1303 } = self;
1304 CsrConnection {
1305 url,
1306 tls_root_cert,
1307 tls_identity,
1308 http_auth,
1309 tunnel: tunnel.into_inline_connection(r),
1310 }
1311 }
1312}
1313
1314impl<C: ConnectionAccess> CsrConnection<C> {
1315 fn validate_by_default(&self) -> bool {
1316 true
1317 }
1318}
1319
1320impl CsrConnection {
1321 pub async fn connect(
1323 &self,
1324 storage_configuration: &StorageConfiguration,
1325 in_task: InTask,
1326 ) -> Result<mz_ccsr::Client, CsrConnectError> {
1327 let mut client_config = mz_ccsr::ClientConfig::new(self.url.clone());
1328 if let Some(root_cert) = &self.tls_root_cert {
1329 let root_cert = root_cert
1330 .get_string(
1331 in_task,
1332 &storage_configuration.connection_context.secrets_reader,
1333 )
1334 .await?;
1335 let root_cert = Certificate::from_pem(root_cert.as_bytes())?;
1336 client_config = client_config.add_root_certificate(root_cert);
1337 }
1338
1339 if let Some(tls_identity) = &self.tls_identity {
1340 let key = &storage_configuration
1341 .connection_context
1342 .secrets_reader
1343 .read_string_in_task_if(in_task, tls_identity.key)
1344 .await?;
1345 let cert = tls_identity
1346 .cert
1347 .get_string(
1348 in_task,
1349 &storage_configuration.connection_context.secrets_reader,
1350 )
1351 .await?;
1352 let ident = Identity::from_pem(key.as_bytes(), cert.as_bytes())?;
1353 client_config = client_config.identity(ident);
1354 }
1355
1356 if let Some(http_auth) = &self.http_auth {
1357 let username = http_auth
1358 .username
1359 .get_string(
1360 in_task,
1361 &storage_configuration.connection_context.secrets_reader,
1362 )
1363 .await?;
1364 let password = match http_auth.password {
1365 None => None,
1366 Some(password) => Some(
1367 storage_configuration
1368 .connection_context
1369 .secrets_reader
1370 .read_string_in_task_if(in_task, password)
1371 .await?,
1372 ),
1373 };
1374 client_config = client_config.auth(username, password);
1375 }
1376
1377 let host = self
1379 .url
1380 .host_str()
1381 .ok_or_else(|| anyhow!("url missing host"))?;
1382 match &self.tunnel {
1383 Tunnel::Direct => {
1384 let resolved = resolve_address(
1386 host,
1387 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1388 )
1389 .await?;
1390 client_config = client_config.resolve_to_addrs(
1391 host,
1392 &resolved
1393 .iter()
1394 .map(|addr| SocketAddr::new(*addr, 0))
1395 .collect::<Vec<_>>(),
1396 )
1397 }
1398 Tunnel::Ssh(ssh_tunnel) => {
1399 let ssh_tunnel = ssh_tunnel
1400 .connect(
1401 storage_configuration,
1402 host,
1403 self.url.port().unwrap_or(80),
1406 in_task,
1407 )
1408 .await
1409 .map_err(CsrConnectError::Ssh)?;
1410
1411 client_config = client_config
1417 .resolve_to_addrs(host, &[SocketAddr::new(ssh_tunnel.local_addr().ip(), 0)])
1424 .dynamic_url({
1435 let remote_url = self.url.clone();
1436 move || {
1437 let mut url = remote_url.clone();
1438 url.set_port(Some(ssh_tunnel.local_addr().port()))
1439 .expect("cannot fail");
1440 url
1441 }
1442 });
1443 }
1444 Tunnel::AwsPrivatelink(connection) => {
1445 assert_none!(connection.port);
1446
1447 let privatelink_host = mz_cloud_resources::vpc_endpoint_host(
1448 connection.connection_id,
1449 connection.availability_zone.as_deref(),
1450 );
1451 let addrs: Vec<_> = net::lookup_host((privatelink_host, 0))
1452 .await
1453 .context("resolving PrivateLink host")?
1454 .collect();
1455 client_config = client_config.resolve_to_addrs(host, &addrs)
1456 }
1457 }
1458
1459 Ok(client_config.build()?)
1460 }
1461
1462 async fn validate(
1463 &self,
1464 _id: CatalogItemId,
1465 storage_configuration: &StorageConfiguration,
1466 ) -> Result<(), anyhow::Error> {
1467 let client = self
1468 .connect(
1469 storage_configuration,
1470 InTask::No,
1472 )
1473 .await?;
1474 client.list_subjects().await?;
1475 Ok(())
1476 }
1477}
1478
1479impl<C: ConnectionAccess> AlterCompatible for CsrConnection<C> {
1480 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1481 let CsrConnection {
1482 tunnel,
1483 url: _,
1485 tls_root_cert: _,
1486 tls_identity: _,
1487 http_auth: _,
1488 } = self;
1489
1490 let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
1491
1492 for (compatible, field) in compatibility_checks {
1493 if !compatible {
1494 tracing::warn!(
1495 "CsrConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1496 self,
1497 other
1498 );
1499
1500 return Err(AlterError { id });
1501 }
1502 }
1503 Ok(())
1504 }
1505}
1506
1507#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1509pub struct TlsIdentity {
1510 pub cert: StringOrSecret,
1512 pub key: CatalogItemId,
1515}
1516
1517#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1519pub struct CsrConnectionHttpAuth {
1520 pub username: StringOrSecret,
1522 pub password: Option<CatalogItemId>,
1524}
1525
1526#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1528pub struct PostgresConnection<C: ConnectionAccess = InlinedConnection> {
1529 pub host: String,
1531 pub port: u16,
1533 pub database: String,
1535 pub user: StringOrSecret,
1537 pub password: Option<CatalogItemId>,
1539 pub tunnel: Tunnel<C>,
1541 pub tls_mode: SslMode,
1543 pub tls_root_cert: Option<StringOrSecret>,
1546 pub tls_identity: Option<TlsIdentity>,
1548}
1549
1550impl<R: ConnectionResolver> IntoInlineConnection<PostgresConnection, R>
1551 for PostgresConnection<ReferencedConnection>
1552{
1553 fn into_inline_connection(self, r: R) -> PostgresConnection {
1554 let PostgresConnection {
1555 host,
1556 port,
1557 database,
1558 user,
1559 password,
1560 tunnel,
1561 tls_mode,
1562 tls_root_cert,
1563 tls_identity,
1564 } = self;
1565
1566 PostgresConnection {
1567 host,
1568 port,
1569 database,
1570 user,
1571 password,
1572 tunnel: tunnel.into_inline_connection(r),
1573 tls_mode,
1574 tls_root_cert,
1575 tls_identity,
1576 }
1577 }
1578}
1579
1580impl<C: ConnectionAccess> PostgresConnection<C> {
1581 fn validate_by_default(&self) -> bool {
1582 true
1583 }
1584}
1585
1586impl PostgresConnection<InlinedConnection> {
1587 pub async fn config(
1588 &self,
1589 secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
1590 storage_configuration: &StorageConfiguration,
1591 in_task: InTask,
1592 ) -> Result<mz_postgres_util::Config, anyhow::Error> {
1593 let params = &storage_configuration.parameters;
1594
1595 let mut config = tokio_postgres::Config::new();
1596 config
1597 .host(&self.host)
1598 .port(self.port)
1599 .dbname(&self.database)
1600 .user(&self.user.get_string(in_task, secrets_reader).await?)
1601 .ssl_mode(self.tls_mode);
1602 if let Some(password) = self.password {
1603 let password = secrets_reader
1604 .read_string_in_task_if(in_task, password)
1605 .await?;
1606 config.password(password);
1607 }
1608 if let Some(tls_root_cert) = &self.tls_root_cert {
1609 let tls_root_cert = tls_root_cert.get_string(in_task, secrets_reader).await?;
1610 config.ssl_root_cert(tls_root_cert.as_bytes());
1611 }
1612 if let Some(tls_identity) = &self.tls_identity {
1613 let cert = tls_identity
1614 .cert
1615 .get_string(in_task, secrets_reader)
1616 .await?;
1617 let key = secrets_reader
1618 .read_string_in_task_if(in_task, tls_identity.key)
1619 .await?;
1620 config.ssl_cert(cert.as_bytes()).ssl_key(key.as_bytes());
1621 }
1622
1623 if let Some(connect_timeout) = params.pg_source_connect_timeout {
1624 config.connect_timeout(connect_timeout);
1625 }
1626 if let Some(keepalives_retries) = params.pg_source_tcp_keepalives_retries {
1627 config.keepalives_retries(keepalives_retries);
1628 }
1629 if let Some(keepalives_idle) = params.pg_source_tcp_keepalives_idle {
1630 config.keepalives_idle(keepalives_idle);
1631 }
1632 if let Some(keepalives_interval) = params.pg_source_tcp_keepalives_interval {
1633 config.keepalives_interval(keepalives_interval);
1634 }
1635 if let Some(tcp_user_timeout) = params.pg_source_tcp_user_timeout {
1636 config.tcp_user_timeout(tcp_user_timeout);
1637 }
1638
1639 let mut options = vec![];
1640 if let Some(wal_sender_timeout) = params.pg_source_wal_sender_timeout {
1641 options.push(format!(
1642 "--wal_sender_timeout={}",
1643 wal_sender_timeout.as_millis()
1644 ));
1645 };
1646 if params.pg_source_tcp_configure_server {
1647 if let Some(keepalives_retries) = params.pg_source_tcp_keepalives_retries {
1648 options.push(format!("--tcp_keepalives_count={}", keepalives_retries));
1649 }
1650 if let Some(keepalives_idle) = params.pg_source_tcp_keepalives_idle {
1651 options.push(format!(
1652 "--tcp_keepalives_idle={}",
1653 keepalives_idle.as_secs()
1654 ));
1655 }
1656 if let Some(keepalives_interval) = params.pg_source_tcp_keepalives_interval {
1657 options.push(format!(
1658 "--tcp_keepalives_interval={}",
1659 keepalives_interval.as_secs()
1660 ));
1661 }
1662 if let Some(tcp_user_timeout) = params.pg_source_tcp_user_timeout {
1663 options.push(format!(
1664 "--tcp_user_timeout={}",
1665 tcp_user_timeout.as_millis()
1666 ));
1667 }
1668 }
1669 config.options(options.join(" ").as_str());
1670
1671 let tunnel = match &self.tunnel {
1672 Tunnel::Direct => {
1673 let resolved = resolve_address(
1675 &self.host,
1676 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1677 )
1678 .await?;
1679 mz_postgres_util::TunnelConfig::Direct {
1680 resolved_ips: Some(resolved),
1681 }
1682 }
1683 Tunnel::Ssh(SshTunnel {
1684 connection_id,
1685 connection,
1686 }) => {
1687 let secret = secrets_reader
1688 .read_in_task_if(in_task, *connection_id)
1689 .await?;
1690 let key_pair = SshKeyPair::from_bytes(&secret)?;
1691 let resolved = resolve_address(
1693 &connection.host,
1694 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1695 )
1696 .await?;
1697 mz_postgres_util::TunnelConfig::Ssh {
1698 config: SshTunnelConfig {
1699 host: resolved
1700 .iter()
1701 .map(|a| a.to_string())
1702 .collect::<BTreeSet<_>>(),
1703 port: connection.port,
1704 user: connection.user.clone(),
1705 key_pair,
1706 },
1707 }
1708 }
1709 Tunnel::AwsPrivatelink(connection) => {
1710 assert_none!(connection.port);
1711 mz_postgres_util::TunnelConfig::AwsPrivatelink {
1712 connection_id: connection.connection_id,
1713 }
1714 }
1715 };
1716
1717 Ok(mz_postgres_util::Config::new(
1718 config,
1719 tunnel,
1720 params.ssh_timeout_config,
1721 in_task,
1722 )?)
1723 }
1724
1725 pub async fn validate(
1726 &self,
1727 _id: CatalogItemId,
1728 storage_configuration: &StorageConfiguration,
1729 ) -> Result<mz_postgres_util::Client, anyhow::Error> {
1730 let config = self
1731 .config(
1732 &storage_configuration.connection_context.secrets_reader,
1733 storage_configuration,
1734 InTask::No,
1736 )
1737 .await?;
1738 let client = config
1739 .connect(
1740 "connection validation",
1741 &storage_configuration.connection_context.ssh_tunnel_manager,
1742 )
1743 .await?;
1744
1745 let wal_level = mz_postgres_util::get_wal_level(&client).await?;
1746
1747 if wal_level < mz_postgres_util::replication::WalLevel::Logical {
1748 Err(PostgresConnectionValidationError::InsufficientWalLevel { wal_level })?;
1749 }
1750
1751 let max_wal_senders = mz_postgres_util::get_max_wal_senders(&client).await?;
1752
1753 if max_wal_senders < 1 {
1754 Err(PostgresConnectionValidationError::ReplicationDisabled)?;
1755 }
1756
1757 let available_replication_slots =
1758 mz_postgres_util::available_replication_slots(&client).await?;
1759
1760 if available_replication_slots < 2 {
1762 Err(
1763 PostgresConnectionValidationError::InsufficientReplicationSlotsAvailable {
1764 count: 2,
1765 },
1766 )?;
1767 }
1768
1769 Ok(client)
1770 }
1771}
1772
1773#[derive(Debug, Clone, thiserror::Error)]
1774pub enum PostgresConnectionValidationError {
1775 #[error("PostgreSQL server has insufficient number of replication slots available")]
1776 InsufficientReplicationSlotsAvailable { count: usize },
1777 #[error("server must have wal_level >= logical, but has {wal_level}")]
1778 InsufficientWalLevel {
1779 wal_level: mz_postgres_util::replication::WalLevel,
1780 },
1781 #[error("replication disabled on server")]
1782 ReplicationDisabled,
1783}
1784
1785impl PostgresConnectionValidationError {
1786 pub fn detail(&self) -> Option<String> {
1787 match self {
1788 Self::InsufficientReplicationSlotsAvailable { count } => Some(format!(
1789 "executing this statement requires {} replication slot{}",
1790 count,
1791 if *count == 1 { "" } else { "s" }
1792 )),
1793 _ => None,
1794 }
1795 }
1796
1797 pub fn hint(&self) -> Option<String> {
1798 match self {
1799 Self::InsufficientReplicationSlotsAvailable { .. } => Some(
1800 "you might be able to wait for other sources to finish snapshotting and try again"
1801 .into(),
1802 ),
1803 Self::ReplicationDisabled => Some("set max_wal_senders to a value > 0".into()),
1804 Self::InsufficientWalLevel { .. } => None,
1805 }
1806 }
1807}
1808
1809impl<C: ConnectionAccess> AlterCompatible for PostgresConnection<C> {
1810 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1811 let PostgresConnection {
1812 tunnel,
1813 host: _,
1815 port: _,
1816 database: _,
1817 user: _,
1818 password: _,
1819 tls_mode: _,
1820 tls_root_cert: _,
1821 tls_identity: _,
1822 } = self;
1823
1824 let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
1825
1826 for (compatible, field) in compatibility_checks {
1827 if !compatible {
1828 tracing::warn!(
1829 "PostgresConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1830 self,
1831 other
1832 );
1833
1834 return Err(AlterError { id });
1835 }
1836 }
1837 Ok(())
1838 }
1839}
1840
1841#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1843pub enum Tunnel<C: ConnectionAccess = InlinedConnection> {
1844 Direct,
1846 Ssh(SshTunnel<C>),
1848 AwsPrivatelink(AwsPrivatelink),
1850}
1851
1852impl<R: ConnectionResolver> IntoInlineConnection<Tunnel, R> for Tunnel<ReferencedConnection> {
1853 fn into_inline_connection(self, r: R) -> Tunnel {
1854 match self {
1855 Tunnel::Direct => Tunnel::Direct,
1856 Tunnel::Ssh(ssh) => Tunnel::Ssh(ssh.into_inline_connection(r)),
1857 Tunnel::AwsPrivatelink(awspl) => Tunnel::AwsPrivatelink(awspl),
1858 }
1859 }
1860}
1861
1862impl<C: ConnectionAccess> AlterCompatible for Tunnel<C> {
1863 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1864 let compatible = match (self, other) {
1865 (Self::Ssh(s), Self::Ssh(o)) => s.alter_compatible(id, o).is_ok(),
1866 (s, o) => s == o,
1867 };
1868
1869 if !compatible {
1870 tracing::warn!(
1871 "Tunnel incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
1872 self,
1873 other
1874 );
1875
1876 return Err(AlterError { id });
1877 }
1878
1879 Ok(())
1880 }
1881}
1882
1883#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1887pub enum MySqlSslMode {
1888 Disabled,
1889 Required,
1890 VerifyCa,
1891 VerifyIdentity,
1892}
1893
1894#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1896pub struct MySqlConnection<C: ConnectionAccess = InlinedConnection> {
1897 pub host: String,
1899 pub port: u16,
1901 pub user: StringOrSecret,
1903 pub password: Option<CatalogItemId>,
1905 pub tunnel: Tunnel<C>,
1907 pub tls_mode: MySqlSslMode,
1909 pub tls_root_cert: Option<StringOrSecret>,
1912 pub tls_identity: Option<TlsIdentity>,
1914 pub aws_connection: Option<AwsConnectionReference<C>>,
1917}
1918
1919impl<R: ConnectionResolver> IntoInlineConnection<MySqlConnection, R>
1920 for MySqlConnection<ReferencedConnection>
1921{
1922 fn into_inline_connection(self, r: R) -> MySqlConnection {
1923 let MySqlConnection {
1924 host,
1925 port,
1926 user,
1927 password,
1928 tunnel,
1929 tls_mode,
1930 tls_root_cert,
1931 tls_identity,
1932 aws_connection,
1933 } = self;
1934
1935 MySqlConnection {
1936 host,
1937 port,
1938 user,
1939 password,
1940 tunnel: tunnel.into_inline_connection(&r),
1941 tls_mode,
1942 tls_root_cert,
1943 tls_identity,
1944 aws_connection: aws_connection.map(|aws| aws.into_inline_connection(&r)),
1945 }
1946 }
1947}
1948
1949impl<C: ConnectionAccess> MySqlConnection<C> {
1950 fn validate_by_default(&self) -> bool {
1951 true
1952 }
1953}
1954
1955impl MySqlConnection<InlinedConnection> {
1956 pub async fn config(
1957 &self,
1958 secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
1959 storage_configuration: &StorageConfiguration,
1960 in_task: InTask,
1961 ) -> Result<mz_mysql_util::Config, anyhow::Error> {
1962 let mut opts = mysql_async::OptsBuilder::default()
1964 .ip_or_hostname(&self.host)
1965 .tcp_port(self.port)
1966 .user(Some(&self.user.get_string(in_task, secrets_reader).await?));
1967
1968 if let Some(password) = self.password {
1969 let password = secrets_reader
1970 .read_string_in_task_if(in_task, password)
1971 .await?;
1972 opts = opts.pass(Some(password));
1973 }
1974
1975 let mut ssl_opts = match self.tls_mode {
1980 MySqlSslMode::Disabled => None,
1981 MySqlSslMode::Required => Some(
1982 mysql_async::SslOpts::default()
1983 .with_danger_accept_invalid_certs(true)
1984 .with_danger_skip_domain_validation(true),
1985 ),
1986 MySqlSslMode::VerifyCa => {
1987 Some(mysql_async::SslOpts::default().with_danger_skip_domain_validation(true))
1988 }
1989 MySqlSslMode::VerifyIdentity => Some(mysql_async::SslOpts::default()),
1990 };
1991
1992 if matches!(
1993 self.tls_mode,
1994 MySqlSslMode::VerifyCa | MySqlSslMode::VerifyIdentity
1995 ) {
1996 if let Some(tls_root_cert) = &self.tls_root_cert {
1997 let tls_root_cert = tls_root_cert.get_string(in_task, secrets_reader).await?;
1998 ssl_opts = ssl_opts.map(|opts| {
1999 opts.with_root_certs(vec![tls_root_cert.as_bytes().to_vec().into()])
2000 });
2001 }
2002 }
2003
2004 if let Some(identity) = &self.tls_identity {
2005 let key = secrets_reader
2006 .read_string_in_task_if(in_task, identity.key)
2007 .await?;
2008 let cert = identity.cert.get_string(in_task, secrets_reader).await?;
2009 let mut archive = mz_tls_util::pkcs12der_from_pem(key.as_bytes(), cert.as_bytes())?;
2010 let der = std::mem::take(&mut archive.der);
2011 let pass = std::mem::take(&mut archive.pass);
2012
2013 ssl_opts = ssl_opts.map(|opts| {
2015 opts.with_client_identity(Some(
2016 mysql_async::ClientIdentity::new(der.into()).with_password(pass),
2017 ))
2018 });
2019 }
2020
2021 opts = opts.ssl_opts(ssl_opts);
2022
2023 let tunnel = match &self.tunnel {
2024 Tunnel::Direct => {
2025 let resolved = resolve_address(
2027 &self.host,
2028 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2029 )
2030 .await?;
2031 mz_mysql_util::TunnelConfig::Direct {
2032 resolved_ips: Some(resolved),
2033 }
2034 }
2035 Tunnel::Ssh(SshTunnel {
2036 connection_id,
2037 connection,
2038 }) => {
2039 let secret = secrets_reader
2040 .read_in_task_if(in_task, *connection_id)
2041 .await?;
2042 let key_pair = SshKeyPair::from_bytes(&secret)?;
2043 let resolved = resolve_address(
2045 &connection.host,
2046 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2047 )
2048 .await?;
2049 mz_mysql_util::TunnelConfig::Ssh {
2050 config: SshTunnelConfig {
2051 host: resolved
2052 .iter()
2053 .map(|a| a.to_string())
2054 .collect::<BTreeSet<_>>(),
2055 port: connection.port,
2056 user: connection.user.clone(),
2057 key_pair,
2058 },
2059 }
2060 }
2061 Tunnel::AwsPrivatelink(connection) => {
2062 assert_none!(connection.port);
2063 mz_mysql_util::TunnelConfig::AwsPrivatelink {
2064 connection_id: connection.connection_id,
2065 }
2066 }
2067 };
2068
2069 let aws_config = match self.aws_connection.as_ref() {
2070 None => None,
2071 Some(aws_ref) => Some(
2072 aws_ref
2073 .connection
2074 .load_sdk_config(
2075 &storage_configuration.connection_context,
2076 aws_ref.connection_id,
2077 in_task,
2078 )
2079 .await?,
2080 ),
2081 };
2082
2083 Ok(mz_mysql_util::Config::new(
2084 opts,
2085 tunnel,
2086 storage_configuration.parameters.ssh_timeout_config,
2087 in_task,
2088 storage_configuration
2089 .parameters
2090 .mysql_source_timeouts
2091 .clone(),
2092 aws_config,
2093 )?)
2094 }
2095
2096 pub async fn validate(
2097 &self,
2098 _id: CatalogItemId,
2099 storage_configuration: &StorageConfiguration,
2100 ) -> Result<MySqlConn, MySqlConnectionValidationError> {
2101 let config = self
2102 .config(
2103 &storage_configuration.connection_context.secrets_reader,
2104 storage_configuration,
2105 InTask::No,
2107 )
2108 .await?;
2109 let mut conn = config
2110 .connect(
2111 "connection validation",
2112 &storage_configuration.connection_context.ssh_tunnel_manager,
2113 )
2114 .await?;
2115
2116 let mut setting_errors = vec![];
2118 let gtid_res = mz_mysql_util::ensure_gtid_consistency(&mut conn).await;
2119 let binlog_res = mz_mysql_util::ensure_full_row_binlog_format(&mut conn).await;
2120 let order_res = mz_mysql_util::ensure_replication_commit_order(&mut conn).await;
2121 for res in [gtid_res, binlog_res, order_res] {
2122 match res {
2123 Err(MySqlError::InvalidSystemSetting {
2124 setting,
2125 expected,
2126 actual,
2127 }) => {
2128 setting_errors.push((setting, expected, actual));
2129 }
2130 Err(err) => Err(err)?,
2131 Ok(()) => {}
2132 }
2133 }
2134 if !setting_errors.is_empty() {
2135 Err(MySqlConnectionValidationError::ReplicationSettingsError(
2136 setting_errors,
2137 ))?;
2138 }
2139
2140 Ok(conn)
2141 }
2142}
2143
2144#[derive(Debug, thiserror::Error)]
2145pub enum MySqlConnectionValidationError {
2146 #[error("Invalid MySQL system replication settings")]
2147 ReplicationSettingsError(Vec<(String, String, String)>),
2148 #[error(transparent)]
2149 Client(#[from] MySqlError),
2150 #[error("{}", .0.display_with_causes())]
2151 Other(#[from] anyhow::Error),
2152}
2153
2154impl MySqlConnectionValidationError {
2155 pub fn detail(&self) -> Option<String> {
2156 match self {
2157 Self::ReplicationSettingsError(settings) => Some(format!(
2158 "Invalid MySQL system replication settings: {}",
2159 itertools::join(
2160 settings.iter().map(|(setting, expected, actual)| format!(
2161 "{}: expected {}, got {}",
2162 setting, expected, actual
2163 )),
2164 "; "
2165 )
2166 )),
2167 _ => None,
2168 }
2169 }
2170
2171 pub fn hint(&self) -> Option<String> {
2172 match self {
2173 Self::ReplicationSettingsError(_) => {
2174 Some("Set the necessary MySQL database system settings.".into())
2175 }
2176 _ => None,
2177 }
2178 }
2179}
2180
2181impl<C: ConnectionAccess> AlterCompatible for MySqlConnection<C> {
2182 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2183 let MySqlConnection {
2184 tunnel,
2185 host: _,
2187 port: _,
2188 user: _,
2189 password: _,
2190 tls_mode: _,
2191 tls_root_cert: _,
2192 tls_identity: _,
2193 aws_connection: _,
2194 } = self;
2195
2196 let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
2197
2198 for (compatible, field) in compatibility_checks {
2199 if !compatible {
2200 tracing::warn!(
2201 "MySqlConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2202 self,
2203 other
2204 );
2205
2206 return Err(AlterError { id });
2207 }
2208 }
2209 Ok(())
2210 }
2211}
2212
2213#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2220pub struct SqlServerConnectionDetails<C: ConnectionAccess = InlinedConnection> {
2221 pub host: String,
2223 pub port: u16,
2225 pub database: String,
2227 pub user: StringOrSecret,
2229 pub password: CatalogItemId,
2231 pub tunnel: Tunnel<C>,
2233 pub encryption: mz_sql_server_util::config::EncryptionLevel,
2235 pub certificate_validation_policy: mz_sql_server_util::config::CertificateValidationPolicy,
2237 pub tls_root_cert: Option<StringOrSecret>,
2239}
2240
2241impl<C: ConnectionAccess> SqlServerConnectionDetails<C> {
2242 fn validate_by_default(&self) -> bool {
2243 true
2244 }
2245}
2246
2247impl SqlServerConnectionDetails<InlinedConnection> {
2248 pub async fn validate(
2250 &self,
2251 _id: CatalogItemId,
2252 storage_configuration: &StorageConfiguration,
2253 ) -> Result<mz_sql_server_util::Client, anyhow::Error> {
2254 let config = self
2255 .resolve_config(
2256 &storage_configuration.connection_context.secrets_reader,
2257 storage_configuration,
2258 InTask::No,
2259 )
2260 .await?;
2261 tracing::debug!(?config, "Validating SQL Server connection");
2262
2263 let mut client = mz_sql_server_util::Client::connect(config).await?;
2264
2265 let mut replication_errors = vec![];
2270 for error in [
2271 mz_sql_server_util::inspect::ensure_database_cdc_enabled(&mut client).await,
2272 mz_sql_server_util::inspect::ensure_snapshot_isolation_enabled(&mut client).await,
2273 mz_sql_server_util::inspect::ensure_sql_server_agent_running(&mut client).await,
2274 ] {
2275 match error {
2276 Err(mz_sql_server_util::SqlServerError::InvalidSystemSetting {
2277 name,
2278 expected,
2279 actual,
2280 }) => replication_errors.push((name, expected, actual)),
2281 Err(other) => Err(other)?,
2282 Ok(()) => (),
2283 }
2284 }
2285 if !replication_errors.is_empty() {
2286 Err(SqlServerConnectionValidationError::ReplicationSettingsError(replication_errors))?;
2287 }
2288
2289 Ok(client)
2290 }
2291
2292 pub async fn resolve_config(
2302 &self,
2303 secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
2304 storage_configuration: &StorageConfiguration,
2305 in_task: InTask,
2306 ) -> Result<mz_sql_server_util::Config, anyhow::Error> {
2307 let dyncfg = storage_configuration.config_set();
2308 let mut inner_config = tiberius::Config::new();
2309
2310 inner_config.host(&self.host);
2312 inner_config.port(self.port);
2313 inner_config.database(self.database.clone());
2314 inner_config.encryption(self.encryption.into());
2315 match self.certificate_validation_policy {
2316 mz_sql_server_util::config::CertificateValidationPolicy::TrustAll => {
2317 inner_config.trust_cert()
2318 }
2319 mz_sql_server_util::config::CertificateValidationPolicy::VerifyCA => {
2320 inner_config.trust_cert_ca_pem(
2321 self.tls_root_cert
2322 .as_ref()
2323 .unwrap()
2324 .get_string(in_task, secrets_reader)
2325 .await
2326 .context("ca certificate")?,
2327 );
2328 }
2329 mz_sql_server_util::config::CertificateValidationPolicy::VerifySystem => (), }
2331
2332 inner_config.application_name("materialize");
2333
2334 let user = self
2336 .user
2337 .get_string(in_task, secrets_reader)
2338 .await
2339 .context("username")?;
2340 let password = secrets_reader
2341 .read_string_in_task_if(in_task, self.password)
2342 .await
2343 .context("password")?;
2344 inner_config.authentication(tiberius::AuthMethod::sql_server(user, password));
2347
2348 let enfoce_external_addresses = ENFORCE_EXTERNAL_ADDRESSES.get(dyncfg);
2351
2352 let tunnel = match &self.tunnel {
2353 Tunnel::Direct => mz_sql_server_util::config::TunnelConfig::Direct,
2354 Tunnel::Ssh(SshTunnel {
2355 connection_id,
2356 connection: ssh_connection,
2357 }) => {
2358 let secret = secrets_reader
2359 .read_in_task_if(in_task, *connection_id)
2360 .await
2361 .context("ssh secret")?;
2362 let key_pair = SshKeyPair::from_bytes(&secret).context("ssh key pair")?;
2363 let addresses = resolve_address(&ssh_connection.host, enfoce_external_addresses)
2366 .await
2367 .context("ssh tunnel")?;
2368
2369 let config = SshTunnelConfig {
2370 host: addresses.into_iter().map(|a| a.to_string()).collect(),
2371 port: ssh_connection.port,
2372 user: ssh_connection.user.clone(),
2373 key_pair,
2374 };
2375 mz_sql_server_util::config::TunnelConfig::Ssh {
2376 config,
2377 manager: storage_configuration
2378 .connection_context
2379 .ssh_tunnel_manager
2380 .clone(),
2381 timeout: storage_configuration.parameters.ssh_timeout_config.clone(),
2382 host: self.host.clone(),
2383 port: self.port,
2384 }
2385 }
2386 Tunnel::AwsPrivatelink(private_link_connection) => {
2387 assert_none!(private_link_connection.port);
2388 mz_sql_server_util::config::TunnelConfig::AwsPrivatelink {
2389 connection_id: private_link_connection.connection_id,
2390 port: self.port,
2391 }
2392 }
2393 };
2394
2395 Ok(mz_sql_server_util::Config::new(
2396 inner_config,
2397 tunnel,
2398 in_task,
2399 ))
2400 }
2401}
2402
2403#[derive(Debug, Clone, thiserror::Error)]
2404pub enum SqlServerConnectionValidationError {
2405 #[error("Invalid SQL Server system replication settings")]
2406 ReplicationSettingsError(Vec<(String, String, String)>),
2407}
2408
2409impl SqlServerConnectionValidationError {
2410 pub fn detail(&self) -> Option<String> {
2411 match self {
2412 Self::ReplicationSettingsError(settings) => Some(format!(
2413 "Invalid SQL Server system replication settings: {}",
2414 itertools::join(
2415 settings.iter().map(|(setting, expected, actual)| format!(
2416 "{}: expected {}, got {}",
2417 setting, expected, actual
2418 )),
2419 "; "
2420 )
2421 )),
2422 }
2423 }
2424
2425 pub fn hint(&self) -> Option<String> {
2426 match self {
2427 _ => None,
2428 }
2429 }
2430}
2431
2432impl<R: ConnectionResolver> IntoInlineConnection<SqlServerConnectionDetails, R>
2433 for SqlServerConnectionDetails<ReferencedConnection>
2434{
2435 fn into_inline_connection(self, r: R) -> SqlServerConnectionDetails {
2436 let SqlServerConnectionDetails {
2437 host,
2438 port,
2439 database,
2440 user,
2441 password,
2442 tunnel,
2443 encryption,
2444 certificate_validation_policy,
2445 tls_root_cert,
2446 } = self;
2447
2448 SqlServerConnectionDetails {
2449 host,
2450 port,
2451 database,
2452 user,
2453 password,
2454 tunnel: tunnel.into_inline_connection(&r),
2455 encryption,
2456 certificate_validation_policy,
2457 tls_root_cert,
2458 }
2459 }
2460}
2461
2462impl<C: ConnectionAccess> AlterCompatible for SqlServerConnectionDetails<C> {
2463 fn alter_compatible(
2464 &self,
2465 id: mz_repr::GlobalId,
2466 other: &Self,
2467 ) -> Result<(), crate::controller::AlterError> {
2468 let SqlServerConnectionDetails {
2469 tunnel,
2470 host: _,
2472 port: _,
2473 database: _,
2474 user: _,
2475 password: _,
2476 encryption: _,
2477 certificate_validation_policy: _,
2478 tls_root_cert: _,
2479 } = self;
2480
2481 let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
2482
2483 for (compatible, field) in compatibility_checks {
2484 if !compatible {
2485 tracing::warn!(
2486 "SqlServerConnectionDetails incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2487 self,
2488 other
2489 );
2490
2491 return Err(AlterError { id });
2492 }
2493 }
2494 Ok(())
2495 }
2496}
2497
2498#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2500pub struct SshConnection {
2501 pub host: String,
2502 pub port: u16,
2503 pub user: String,
2504}
2505
2506use self::inline::{
2507 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
2508 ReferencedConnection,
2509};
2510
2511impl AlterCompatible for SshConnection {
2512 fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
2513 Ok(())
2515 }
2516}
2517
2518#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2520pub struct AwsPrivatelink {
2521 pub connection_id: CatalogItemId,
2523 pub availability_zone: Option<String>,
2525 pub port: Option<u16>,
2528}
2529
2530impl AlterCompatible for AwsPrivatelink {
2531 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2532 let AwsPrivatelink {
2533 connection_id,
2534 availability_zone: _,
2535 port: _,
2536 } = self;
2537
2538 let compatibility_checks = [(connection_id == &other.connection_id, "connection_id")];
2539
2540 for (compatible, field) in compatibility_checks {
2541 if !compatible {
2542 tracing::warn!(
2543 "AwsPrivatelink incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2544 self,
2545 other
2546 );
2547
2548 return Err(AlterError { id });
2549 }
2550 }
2551
2552 Ok(())
2553 }
2554}
2555
2556#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2558pub struct SshTunnel<C: ConnectionAccess = InlinedConnection> {
2559 pub connection_id: CatalogItemId,
2561 pub connection: C::Ssh,
2563}
2564
2565impl<R: ConnectionResolver> IntoInlineConnection<SshTunnel, R> for SshTunnel<ReferencedConnection> {
2566 fn into_inline_connection(self, r: R) -> SshTunnel {
2567 let SshTunnel {
2568 connection,
2569 connection_id,
2570 } = self;
2571
2572 SshTunnel {
2573 connection: r.resolve_connection(connection).unwrap_ssh(),
2574 connection_id,
2575 }
2576 }
2577}
2578
2579impl SshTunnel<InlinedConnection> {
2580 async fn connect(
2583 &self,
2584 storage_configuration: &StorageConfiguration,
2585 remote_host: &str,
2586 remote_port: u16,
2587 in_task: InTask,
2588 ) -> Result<ManagedSshTunnelHandle, anyhow::Error> {
2589 let resolved = resolve_address(
2591 &self.connection.host,
2592 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2593 )
2594 .await?;
2595 storage_configuration
2596 .connection_context
2597 .ssh_tunnel_manager
2598 .connect(
2599 SshTunnelConfig {
2600 host: resolved
2601 .iter()
2602 .map(|a| a.to_string())
2603 .collect::<BTreeSet<_>>(),
2604 port: self.connection.port,
2605 user: self.connection.user.clone(),
2606 key_pair: SshKeyPair::from_bytes(
2607 &storage_configuration
2608 .connection_context
2609 .secrets_reader
2610 .read_in_task_if(in_task, self.connection_id)
2611 .await?,
2612 )?,
2613 },
2614 remote_host,
2615 remote_port,
2616 storage_configuration.parameters.ssh_timeout_config,
2617 in_task,
2618 )
2619 .await
2620 }
2621}
2622
2623impl<C: ConnectionAccess> AlterCompatible for SshTunnel<C> {
2624 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2625 let SshTunnel {
2626 connection_id,
2627 connection,
2628 } = self;
2629
2630 let compatibility_checks = [
2631 (connection_id == &other.connection_id, "connection_id"),
2632 (
2633 connection.alter_compatible(id, &other.connection).is_ok(),
2634 "connection",
2635 ),
2636 ];
2637
2638 for (compatible, field) in compatibility_checks {
2639 if !compatible {
2640 tracing::warn!(
2641 "SshTunnel incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2642 self,
2643 other
2644 );
2645
2646 return Err(AlterError { id });
2647 }
2648 }
2649
2650 Ok(())
2651 }
2652}
2653
2654impl SshConnection {
2655 #[allow(clippy::unused_async)]
2656 async fn validate(
2657 &self,
2658 id: CatalogItemId,
2659 storage_configuration: &StorageConfiguration,
2660 ) -> Result<(), anyhow::Error> {
2661 let secret = storage_configuration
2662 .connection_context
2663 .secrets_reader
2664 .read_in_task_if(
2665 InTask::No,
2667 id,
2668 )
2669 .await?;
2670 let key_pair = SshKeyPair::from_bytes(&secret)?;
2671
2672 let resolved = resolve_address(
2674 &self.host,
2675 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2676 )
2677 .await?;
2678
2679 let config = SshTunnelConfig {
2680 host: resolved
2681 .iter()
2682 .map(|a| a.to_string())
2683 .collect::<BTreeSet<_>>(),
2684 port: self.port,
2685 user: self.user.clone(),
2686 key_pair,
2687 };
2688 config
2691 .validate(storage_configuration.parameters.ssh_timeout_config)
2692 .await
2693 }
2694
2695 fn validate_by_default(&self) -> bool {
2696 false
2697 }
2698}
2699
2700impl AwsPrivatelinkConnection {
2701 #[allow(clippy::unused_async)]
2702 async fn validate(
2703 &self,
2704 id: CatalogItemId,
2705 storage_configuration: &StorageConfiguration,
2706 ) -> Result<(), anyhow::Error> {
2707 let Some(ref cloud_resource_reader) = storage_configuration
2708 .connection_context
2709 .cloud_resource_reader
2710 else {
2711 return Err(anyhow!("AWS PrivateLink connections are unsupported"));
2712 };
2713
2714 let status = cloud_resource_reader.read(id).await?;
2716
2717 let availability = status
2718 .conditions
2719 .as_ref()
2720 .and_then(|conditions| conditions.iter().find(|c| c.type_ == "Available"));
2721
2722 match availability {
2723 Some(condition) if condition.status == "True" => Ok(()),
2724 Some(condition) => Err(anyhow!("{}", condition.message)),
2725 None => Err(anyhow!("Endpoint availability is unknown")),
2726 }
2727 }
2728
2729 fn validate_by_default(&self) -> bool {
2730 false
2731 }
2732}