1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::net::SocketAddr;
15use std::sync::Arc;
16
17use anyhow::{Context, anyhow};
18use async_trait::async_trait;
19use aws_credential_types::provider::ProvideCredentials;
20use iceberg::Catalog;
21use iceberg::CatalogBuilder;
22use iceberg::io::{
23 AwsCredential, AwsCredentialLoad, CustomAwsCredentialLoader, S3_ACCESS_KEY_ID,
24 S3_DISABLE_EC2_METADATA, S3_REGION, S3_SECRET_ACCESS_KEY,
25};
26use iceberg_catalog_rest::{
27 REST_CATALOG_PROP_URI, REST_CATALOG_PROP_WAREHOUSE, RestCatalogBuilder,
28};
29use itertools::Itertools;
30use mz_ccsr::tls::{Certificate, Identity};
31use mz_cloud_resources::{AwsExternalIdPrefix, CloudResourceReader, vpc_endpoint_host};
32use mz_dyncfg::ConfigSet;
33use mz_kafka_util::client::{
34 BrokerAddr, BrokerRewrite, HostMappingRules, MzClientContext, MzKafkaError, TunnelConfig,
35 TunnelingClientContext,
36};
37use mz_mysql_util::{MySqlConn, MySqlError};
38use mz_ore::assert_none;
39use mz_ore::error::ErrorExt;
40use mz_ore::future::{InTask, OreFutureExt};
41use mz_ore::netio::resolve_address;
42use mz_ore::num::NonNeg;
43use mz_repr::{CatalogItemId, GlobalId};
44use mz_secrets::SecretsReader;
45use mz_sql_parser::ast::ConnectionRulePattern;
46use mz_ssh_util::keys::SshKeyPair;
47use mz_ssh_util::tunnel::SshTunnelConfig;
48use mz_ssh_util::tunnel_manager::{ManagedSshTunnelHandle, SshTunnelManager};
49use mz_tracing::CloneableEnvFilter;
50use rdkafka::ClientContext;
51use rdkafka::config::FromClientConfigAndContext;
52use rdkafka::consumer::{BaseConsumer, Consumer};
53use regex::Regex;
54use serde::{Deserialize, Deserializer, Serialize};
55use tokio::net;
56use tokio::runtime::Handle;
57use tokio_postgres::config::SslMode;
58use tracing::{debug, info, warn};
59use url::Url;
60
61use crate::AlterCompatible;
62use crate::configuration::StorageConfiguration;
63use crate::connections::aws::{
64 AwsAuth, AwsConnection, AwsConnectionReference, AwsConnectionValidationError,
65};
66use crate::connections::string_or_secret::StringOrSecret;
67use crate::controller::AlterError;
68use crate::dyncfgs::{
69 ENFORCE_EXTERNAL_ADDRESSES, KAFKA_CLIENT_ID_ENRICHMENT_RULES,
70 KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM, KAFKA_RECONNECT_BACKOFF,
71 KAFKA_RECONNECT_BACKOFF_MAX, KAFKA_RETRY_BACKOFF, KAFKA_RETRY_BACKOFF_MAX,
72};
73use crate::errors::{ContextCreationError, CsrConnectError};
74
75pub mod aws;
76pub mod inline;
77pub mod string_or_secret;
78
79const REST_CATALOG_PROP_SCOPE: &str = "scope";
80const REST_CATALOG_PROP_CREDENTIAL: &str = "credential";
81
82struct AwsSdkCredentialLoader {
90 provider: aws_credential_types::provider::SharedCredentialsProvider,
93}
94
95impl AwsSdkCredentialLoader {
96 fn new(provider: aws_credential_types::provider::SharedCredentialsProvider) -> Self {
97 Self { provider }
98 }
99}
100
101#[async_trait]
102impl AwsCredentialLoad for AwsSdkCredentialLoader {
103 async fn load_credential(
104 &self,
105 _client: reqwest::Client,
106 ) -> anyhow::Result<Option<AwsCredential>> {
107 let creds = self
108 .provider
109 .provide_credentials()
110 .await
111 .map_err(|e| {
112 warn!(
113 error = %e.display_with_causes(),
114 "failed to load AWS credentials for Iceberg FileIO from SDK provider"
115 );
116 e
117 })
118 .context(
119 "failed to load AWS credentials from SDK provider for Iceberg FileIO \
120 (credential source may be temporarily unavailable)",
121 )?;
122
123 Ok(Some(AwsCredential {
124 access_key_id: creds.access_key_id().to_string(),
125 secret_access_key: creds.secret_access_key().to_string(),
126 session_token: creds.session_token().map(|s| s.to_string()),
127 expires_in: creds.expiry().map(|t| t.into()),
128 }))
129 }
130}
131
132#[async_trait::async_trait]
134trait SecretsReaderExt {
135 async fn read_in_task_if(
137 &self,
138 in_task: InTask,
139 id: CatalogItemId,
140 ) -> Result<Vec<u8>, anyhow::Error>;
141
142 async fn read_string_in_task_if(
144 &self,
145 in_task: InTask,
146 id: CatalogItemId,
147 ) -> Result<String, anyhow::Error>;
148}
149
150#[async_trait::async_trait]
151impl SecretsReaderExt for Arc<dyn SecretsReader> {
152 async fn read_in_task_if(
153 &self,
154 in_task: InTask,
155 id: CatalogItemId,
156 ) -> Result<Vec<u8>, anyhow::Error> {
157 let sr = Arc::clone(self);
158 async move { sr.read(id).await }
159 .run_in_task_if(in_task, || "secrets_reader_read".to_string())
160 .await
161 }
162 async fn read_string_in_task_if(
163 &self,
164 in_task: InTask,
165 id: CatalogItemId,
166 ) -> Result<String, anyhow::Error> {
167 let sr = Arc::clone(self);
168 async move { sr.read_string(id).await }
169 .run_in_task_if(in_task, || "secrets_reader_read".to_string())
170 .await
171 }
172}
173
174#[derive(Debug, Clone)]
179pub struct ConnectionContext {
180 pub environment_id: String,
187 pub librdkafka_log_level: tracing::Level,
189 pub aws_external_id_prefix: Option<AwsExternalIdPrefix>,
191 pub aws_connection_role_arn: Option<String>,
194 pub secrets_reader: Arc<dyn SecretsReader>,
196 pub cloud_resource_reader: Option<Arc<dyn CloudResourceReader>>,
198 pub ssh_tunnel_manager: SshTunnelManager,
200}
201
202impl ConnectionContext {
203 pub fn from_cli_args(
211 environment_id: String,
212 startup_log_level: &CloneableEnvFilter,
213 aws_external_id_prefix: Option<AwsExternalIdPrefix>,
214 aws_connection_role_arn: Option<String>,
215 secrets_reader: Arc<dyn SecretsReader>,
216 cloud_resource_reader: Option<Arc<dyn CloudResourceReader>>,
217 ) -> ConnectionContext {
218 ConnectionContext {
219 environment_id,
220 librdkafka_log_level: mz_ore::tracing::crate_level(
221 &startup_log_level.clone().into(),
222 "librdkafka",
223 ),
224 aws_external_id_prefix,
225 aws_connection_role_arn,
226 secrets_reader,
227 cloud_resource_reader,
228 ssh_tunnel_manager: SshTunnelManager::default(),
229 }
230 }
231
232 pub fn for_tests(secrets_reader: Arc<dyn SecretsReader>) -> ConnectionContext {
234 ConnectionContext {
235 environment_id: "test-environment-id".into(),
236 librdkafka_log_level: tracing::Level::INFO,
237 aws_external_id_prefix: Some(
238 AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable(
239 "test-aws-external-id-prefix",
240 )
241 .expect("infallible"),
242 ),
243 aws_connection_role_arn: Some(
244 "arn:aws:iam::123456789000:role/MaterializeConnection".into(),
245 ),
246 secrets_reader,
247 cloud_resource_reader: None,
248 ssh_tunnel_manager: SshTunnelManager::default(),
249 }
250 }
251}
252
253#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
254pub enum Connection<C: ConnectionAccess = InlinedConnection> {
255 Kafka(KafkaConnection<C>),
256 Csr(CsrConnection<C>),
257 GlueSchemaRegistry(GlueSchemaRegistryConnection<C>),
258 Postgres(PostgresConnection<C>),
259 Ssh(SshConnection),
260 Aws(AwsConnection),
261 AwsPrivatelink(AwsPrivatelinkConnection),
262 MySql(MySqlConnection<C>),
263 SqlServer(SqlServerConnectionDetails<C>),
264 IcebergCatalog(IcebergCatalogConnection<C>),
265}
266
267impl<R: ConnectionResolver> IntoInlineConnection<Connection, R>
268 for Connection<ReferencedConnection>
269{
270 fn into_inline_connection(self, r: R) -> Connection {
271 match self {
272 Connection::Kafka(kafka) => Connection::Kafka(kafka.into_inline_connection(r)),
273 Connection::Csr(csr) => Connection::Csr(csr.into_inline_connection(r)),
274 Connection::GlueSchemaRegistry(glue) => {
275 Connection::GlueSchemaRegistry(glue.into_inline_connection(r))
276 }
277 Connection::Postgres(pg) => Connection::Postgres(pg.into_inline_connection(r)),
278 Connection::Ssh(ssh) => Connection::Ssh(ssh),
279 Connection::Aws(aws) => Connection::Aws(aws),
280 Connection::AwsPrivatelink(awspl) => Connection::AwsPrivatelink(awspl),
281 Connection::MySql(mysql) => Connection::MySql(mysql.into_inline_connection(r)),
282 Connection::SqlServer(sql_server) => {
283 Connection::SqlServer(sql_server.into_inline_connection(r))
284 }
285 Connection::IcebergCatalog(iceberg) => {
286 Connection::IcebergCatalog(iceberg.into_inline_connection(r))
287 }
288 }
289 }
290}
291
292impl<C: ConnectionAccess> Connection<C> {
293 pub fn validate_by_default(&self) -> bool {
295 match self {
296 Connection::Kafka(conn) => conn.validate_by_default(),
297 Connection::Csr(conn) => conn.validate_by_default(),
298 Connection::GlueSchemaRegistry(conn) => conn.validate_by_default(),
299 Connection::Postgres(conn) => conn.validate_by_default(),
300 Connection::Ssh(conn) => conn.validate_by_default(),
301 Connection::Aws(conn) => conn.validate_by_default(),
302 Connection::AwsPrivatelink(conn) => conn.validate_by_default(),
303 Connection::MySql(conn) => conn.validate_by_default(),
304 Connection::SqlServer(conn) => conn.validate_by_default(),
305 Connection::IcebergCatalog(conn) => conn.validate_by_default(),
306 }
307 }
308}
309
310impl Connection<InlinedConnection> {
311 pub async fn validate(
313 &self,
314 id: CatalogItemId,
315 storage_configuration: &StorageConfiguration,
316 ) -> Result<(), ConnectionValidationError> {
317 match self {
318 Connection::Kafka(conn) => conn.validate(id, storage_configuration).await?,
319 Connection::Csr(conn) => conn.validate(id, storage_configuration).await?,
320 Connection::GlueSchemaRegistry(conn) => {
321 conn.validate(id, storage_configuration).await?
322 }
323 Connection::Postgres(conn) => {
324 conn.validate(id, storage_configuration).await?;
325 }
326 Connection::Ssh(conn) => conn.validate(id, storage_configuration).await?,
327 Connection::Aws(conn) => conn.validate(id, storage_configuration).await?,
328 Connection::AwsPrivatelink(conn) => conn.validate(id, storage_configuration).await?,
329 Connection::MySql(conn) => {
330 conn.validate(id, storage_configuration).await?;
331 }
332 Connection::SqlServer(conn) => {
333 conn.validate(id, storage_configuration).await?;
334 }
335 Connection::IcebergCatalog(conn) => conn.validate(id, storage_configuration).await?,
336 }
337 Ok(())
338 }
339
340 pub fn unwrap_kafka(self) -> <InlinedConnection as ConnectionAccess>::Kafka {
341 match self {
342 Self::Kafka(conn) => conn,
343 o => unreachable!("{o:?} is not a Kafka connection"),
344 }
345 }
346
347 pub fn unwrap_pg(self) -> <InlinedConnection as ConnectionAccess>::Pg {
348 match self {
349 Self::Postgres(conn) => conn,
350 o => unreachable!("{o:?} is not a Postgres connection"),
351 }
352 }
353
354 pub fn unwrap_mysql(self) -> <InlinedConnection as ConnectionAccess>::MySql {
355 match self {
356 Self::MySql(conn) => conn,
357 o => unreachable!("{o:?} is not a MySQL connection"),
358 }
359 }
360
361 pub fn unwrap_sql_server(self) -> <InlinedConnection as ConnectionAccess>::SqlServer {
362 match self {
363 Self::SqlServer(conn) => conn,
364 o => unreachable!("{o:?} is not a SQL Server connection"),
365 }
366 }
367
368 pub fn unwrap_aws(self) -> <InlinedConnection as ConnectionAccess>::Aws {
369 match self {
370 Self::Aws(conn) => conn,
371 o => unreachable!("{o:?} is not an AWS connection"),
372 }
373 }
374
375 pub fn unwrap_ssh(self) -> <InlinedConnection as ConnectionAccess>::Ssh {
376 match self {
377 Self::Ssh(conn) => conn,
378 o => unreachable!("{o:?} is not an SSH connection"),
379 }
380 }
381
382 pub fn unwrap_csr(self) -> <InlinedConnection as ConnectionAccess>::Csr {
383 match self {
384 Self::Csr(conn) => conn,
385 o => unreachable!("{o:?} is not a Kafka connection"),
386 }
387 }
388
389 pub fn unwrap_glue_schema_registry(
390 self,
391 ) -> <InlinedConnection as ConnectionAccess>::GlueSchemaRegistry {
392 match self {
393 Self::GlueSchemaRegistry(conn) => conn,
394 o => unreachable!("{o:?} is not an AWS Glue Schema Registry connection"),
395 }
396 }
397
398 pub fn unwrap_iceberg_catalog(self) -> <InlinedConnection as ConnectionAccess>::IcebergCatalog {
399 match self {
400 Self::IcebergCatalog(conn) => conn,
401 o => unreachable!("{o:?} is not an Iceberg catalog connection"),
402 }
403 }
404}
405
406#[derive(thiserror::Error, Debug)]
408pub enum ConnectionValidationError {
409 #[error(transparent)]
410 Postgres(#[from] PostgresConnectionValidationError),
411 #[error(transparent)]
412 MySql(#[from] MySqlConnectionValidationError),
413 #[error(transparent)]
414 SqlServer(#[from] SqlServerConnectionValidationError),
415 #[error(transparent)]
416 Aws(#[from] AwsConnectionValidationError),
417 #[error("{}", .0.display_with_causes())]
418 Other(#[from] anyhow::Error),
419}
420
421impl ConnectionValidationError {
422 pub fn detail(&self) -> Option<String> {
424 match self {
425 ConnectionValidationError::Postgres(e) => e.detail(),
426 ConnectionValidationError::MySql(e) => e.detail(),
427 ConnectionValidationError::SqlServer(e) => e.detail(),
428 ConnectionValidationError::Aws(e) => e.detail(),
429 ConnectionValidationError::Other(_) => None,
430 }
431 }
432
433 pub fn hint(&self) -> Option<String> {
435 match self {
436 ConnectionValidationError::Postgres(e) => e.hint(),
437 ConnectionValidationError::MySql(e) => e.hint(),
438 ConnectionValidationError::SqlServer(e) => e.hint(),
439 ConnectionValidationError::Aws(e) => e.hint(),
440 ConnectionValidationError::Other(_) => None,
441 }
442 }
443}
444
445impl<C: ConnectionAccess> AlterCompatible for Connection<C> {
446 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
447 match (self, other) {
448 (Self::Aws(s), Self::Aws(o)) => s.alter_compatible(id, o),
449 (Self::AwsPrivatelink(s), Self::AwsPrivatelink(o)) => s.alter_compatible(id, o),
450 (Self::Ssh(s), Self::Ssh(o)) => s.alter_compatible(id, o),
451 (Self::Csr(s), Self::Csr(o)) => s.alter_compatible(id, o),
452 (Self::Kafka(s), Self::Kafka(o)) => s.alter_compatible(id, o),
453 (Self::Postgres(s), Self::Postgres(o)) => s.alter_compatible(id, o),
454 (Self::MySql(s), Self::MySql(o)) => s.alter_compatible(id, o),
455 _ => {
456 tracing::warn!(
457 "Connection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
458 self,
459 other
460 );
461 Err(AlterError { id })
462 }
463 }
464 }
465}
466
467#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
468pub struct RestIcebergCatalog {
469 pub credential: StringOrSecret,
471 pub scope: Option<String>,
473 pub warehouse: Option<String>,
475}
476
477#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
478pub struct S3TablesRestIcebergCatalog<C: ConnectionAccess = InlinedConnection> {
479 pub aws_connection: AwsConnectionReference<C>,
481 pub warehouse: String,
483}
484
485impl<R: ConnectionResolver> IntoInlineConnection<S3TablesRestIcebergCatalog, R>
486 for S3TablesRestIcebergCatalog<ReferencedConnection>
487{
488 fn into_inline_connection(self, r: R) -> S3TablesRestIcebergCatalog {
489 S3TablesRestIcebergCatalog {
490 aws_connection: self.aws_connection.into_inline_connection(&r),
491 warehouse: self.warehouse,
492 }
493 }
494}
495
496#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
497pub enum IcebergCatalogType {
498 Rest,
499 S3TablesRest,
500}
501
502#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
503pub enum IcebergCatalogImpl<C: ConnectionAccess = InlinedConnection> {
504 Rest(RestIcebergCatalog),
505 S3TablesRest(S3TablesRestIcebergCatalog<C>),
506}
507
508impl<R: ConnectionResolver> IntoInlineConnection<IcebergCatalogImpl, R>
509 for IcebergCatalogImpl<ReferencedConnection>
510{
511 fn into_inline_connection(self, r: R) -> IcebergCatalogImpl {
512 match self {
513 IcebergCatalogImpl::Rest(rest) => IcebergCatalogImpl::Rest(rest),
514 IcebergCatalogImpl::S3TablesRest(s3tables) => {
515 IcebergCatalogImpl::S3TablesRest(s3tables.into_inline_connection(r))
516 }
517 }
518 }
519}
520
521#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
522pub struct IcebergCatalogConnection<C: ConnectionAccess = InlinedConnection> {
523 pub catalog: IcebergCatalogImpl<C>,
525 pub uri: reqwest::Url,
527}
528
529impl AlterCompatible for IcebergCatalogConnection {
530 fn alter_compatible(&self, id: GlobalId, _other: &Self) -> Result<(), AlterError> {
531 Err(AlterError { id })
532 }
533}
534
535impl<R: ConnectionResolver> IntoInlineConnection<IcebergCatalogConnection, R>
536 for IcebergCatalogConnection<ReferencedConnection>
537{
538 fn into_inline_connection(self, r: R) -> IcebergCatalogConnection {
539 IcebergCatalogConnection {
540 catalog: self.catalog.into_inline_connection(&r),
541 uri: self.uri,
542 }
543 }
544}
545
546impl<C: ConnectionAccess> IcebergCatalogConnection<C> {
547 fn validate_by_default(&self) -> bool {
548 true
549 }
550}
551
552impl IcebergCatalogConnection<InlinedConnection> {
553 pub async fn connect(
554 &self,
555 storage_configuration: &StorageConfiguration,
556 in_task: InTask,
557 ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
558 match self.catalog {
559 IcebergCatalogImpl::S3TablesRest(ref s3tables) => {
560 self.connect_s3tables(s3tables, storage_configuration, in_task)
561 .await
562 }
563 IcebergCatalogImpl::Rest(ref rest) => {
564 self.connect_rest(rest, storage_configuration, in_task)
565 .await
566 }
567 }
568 }
569
570 pub fn catalog_type(&self) -> IcebergCatalogType {
571 match self.catalog {
572 IcebergCatalogImpl::S3TablesRest(_) => IcebergCatalogType::S3TablesRest,
573 IcebergCatalogImpl::Rest(_) => IcebergCatalogType::Rest,
574 }
575 }
576
577 pub fn s3tables_catalog(&self) -> Option<&S3TablesRestIcebergCatalog> {
578 match &self.catalog {
579 IcebergCatalogImpl::S3TablesRest(s3tables) => Some(s3tables),
580 IcebergCatalogImpl::Rest(_) => None,
581 }
582 }
583
584 pub fn rest_catalog(&self) -> Option<&RestIcebergCatalog> {
585 match &self.catalog {
586 IcebergCatalogImpl::Rest(rest) => Some(rest),
587 IcebergCatalogImpl::S3TablesRest(_) => None,
588 }
589 }
590
591 async fn connect_s3tables(
592 &self,
593 s3tables: &S3TablesRestIcebergCatalog,
594 storage_configuration: &StorageConfiguration,
595 in_task: InTask,
596 ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
597 let secret_reader = &storage_configuration.connection_context.secrets_reader;
598 let aws_ref = &s3tables.aws_connection;
599 let aws_config = aws_ref
600 .connection
601 .load_sdk_config(
602 &storage_configuration.connection_context,
603 aws_ref.connection_id,
604 in_task,
605 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
606 )
607 .await
608 .with_context(|| {
609 format!(
610 "failed to load AWS SDK config for S3 Tables Iceberg catalog \
611 (connection id: {}, auth method: {}, catalog uri: {}, warehouse: {})",
612 aws_ref.connection_id,
613 aws_ref.connection.auth_method(),
614 self.uri,
615 s3tables.warehouse
616 )
617 })?;
618
619 let aws_region = aws_ref
620 .connection
621 .region
622 .clone()
623 .unwrap_or_else(|| "us-east-1".to_string());
624
625 let mut props = vec![
626 (S3_REGION.to_string(), aws_region),
627 (S3_DISABLE_EC2_METADATA.to_string(), "true".to_string()),
628 (
629 REST_CATALOG_PROP_WAREHOUSE.to_string(),
630 s3tables.warehouse.clone(),
631 ),
632 (REST_CATALOG_PROP_URI.to_string(), self.uri.to_string()),
633 ];
634
635 let aws_auth = aws_ref.connection.auth.clone();
636
637 if let AwsAuth::Credentials(creds) = &aws_auth {
638 props.push((
639 S3_ACCESS_KEY_ID.to_string(),
640 creds
641 .access_key_id
642 .get_string(in_task, secret_reader)
643 .await?,
644 ));
645 props.push((
646 S3_SECRET_ACCESS_KEY.to_string(),
647 secret_reader.read_string(creds.secret_access_key).await?,
648 ));
649 }
650
651 let catalog = RestCatalogBuilder::default()
655 .with_aws_config(aws_config.clone())
656 .load("IcebergCatalog", props.into_iter().collect())
657 .await
658 .with_context(|| {
659 format!(
660 "failed to create S3 Tables Iceberg catalog \
661 (connection id: {}, catalog uri: {}, warehouse: {})",
662 aws_ref.connection_id, self.uri, s3tables.warehouse
663 )
664 })?;
665
666 let catalog = if matches!(aws_auth, AwsAuth::AssumeRole(_)) {
667 let credentials_provider = aws_config
668 .credentials_provider()
669 .ok_or_else(|| anyhow!("aws_config missing credentials provider"))?;
670 let file_io_loader = CustomAwsCredentialLoader::new(Arc::new(
671 AwsSdkCredentialLoader::new(credentials_provider),
672 ));
673 catalog.with_file_io_extension(file_io_loader)
674 } else {
675 catalog
676 };
677
678 Ok(Arc::new(catalog))
679 }
680
681 async fn connect_rest(
682 &self,
683 rest: &RestIcebergCatalog,
684 storage_configuration: &StorageConfiguration,
685 in_task: InTask,
686 ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
687 let mut props = BTreeMap::from([(
688 REST_CATALOG_PROP_URI.to_string(),
689 self.uri.to_string().clone(),
690 )]);
691
692 if let Some(warehouse) = &rest.warehouse {
693 props.insert(REST_CATALOG_PROP_WAREHOUSE.to_string(), warehouse.clone());
694 }
695
696 let credential = rest
697 .credential
698 .get_string(
699 in_task,
700 &storage_configuration.connection_context.secrets_reader,
701 )
702 .await
703 .map_err(|e| anyhow!("failed to read Iceberg catalog credential: {e}"))?;
704 props.insert(REST_CATALOG_PROP_CREDENTIAL.to_string(), credential);
705
706 if let Some(scope) = &rest.scope {
707 props.insert(REST_CATALOG_PROP_SCOPE.to_string(), scope.clone());
708 }
709
710 let catalog = RestCatalogBuilder::default()
711 .load("IcebergCatalog", props.into_iter().collect())
712 .await
713 .map_err(|e| anyhow!("failed to create Iceberg catalog: {e}"))?;
714 Ok(Arc::new(catalog))
715 }
716
717 async fn validate(
718 &self,
719 _id: CatalogItemId,
720 storage_configuration: &StorageConfiguration,
721 ) -> Result<(), ConnectionValidationError> {
722 let catalog = self
723 .connect(storage_configuration, InTask::No)
724 .await
725 .map_err(|e| {
726 ConnectionValidationError::Other(anyhow!("failed to connect to catalog: {e}"))
727 })?;
728
729 catalog.list_namespaces(None).await.map_err(|e| {
731 ConnectionValidationError::Other(anyhow!("failed to list namespaces: {e}"))
732 })?;
733
734 Ok(())
735 }
736}
737
738#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
739pub struct AwsPrivatelinkConnection {
740 pub service_name: String,
741 pub availability_zones: Vec<String>,
742}
743
744impl AlterCompatible for AwsPrivatelinkConnection {
745 fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
746 Ok(())
748 }
749}
750
751#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
752pub struct KafkaTlsConfig {
753 pub identity: Option<TlsIdentity>,
754 pub root_cert: Option<StringOrSecret>,
755}
756
757#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
758pub struct KafkaSaslConfig<C: ConnectionAccess = InlinedConnection> {
759 pub mechanism: String,
760 pub username: StringOrSecret,
761 pub password: Option<CatalogItemId>,
762 pub aws: Option<AwsConnectionReference<C>>,
763}
764
765impl<R: ConnectionResolver> IntoInlineConnection<KafkaSaslConfig, R>
766 for KafkaSaslConfig<ReferencedConnection>
767{
768 fn into_inline_connection(self, r: R) -> KafkaSaslConfig {
769 KafkaSaslConfig {
770 mechanism: self.mechanism,
771 username: self.username,
772 password: self.password,
773 aws: self.aws.map(|aws| aws.into_inline_connection(&r)),
774 }
775 }
776}
777
778#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
780pub struct KafkaBroker<C: ConnectionAccess = InlinedConnection> {
781 pub address: String,
783 pub tunnel: Tunnel<C>,
785}
786
787impl<R: ConnectionResolver> IntoInlineConnection<KafkaBroker, R>
788 for KafkaBroker<ReferencedConnection>
789{
790 fn into_inline_connection(self, r: R) -> KafkaBroker {
791 let KafkaBroker { address, tunnel } = self;
792 KafkaBroker {
793 address,
794 tunnel: tunnel.into_inline_connection(r),
795 }
796 }
797}
798
799#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Default)]
800pub struct KafkaTopicOptions {
801 pub replication_factor: Option<NonNeg<i32>>,
804 pub partition_count: Option<NonNeg<i32>>,
807 pub topic_config: BTreeMap<String, String>,
809}
810
811#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
812pub struct KafkaConnection<C: ConnectionAccess = InlinedConnection> {
813 pub brokers: Vec<KafkaBroker<C>>,
814 pub default_tunnel: Tunnel<C>,
818 pub progress_topic: Option<String>,
819 pub progress_topic_options: KafkaTopicOptions,
820 pub options: BTreeMap<String, StringOrSecret>,
821 pub tls: Option<KafkaTlsConfig>,
822 pub sasl: Option<KafkaSaslConfig<C>>,
823}
824
825impl<R: ConnectionResolver> IntoInlineConnection<KafkaConnection, R>
826 for KafkaConnection<ReferencedConnection>
827{
828 fn into_inline_connection(self, r: R) -> KafkaConnection {
829 let KafkaConnection {
830 brokers,
831 progress_topic,
832 progress_topic_options,
833 default_tunnel,
834 options,
835 tls,
836 sasl,
837 } = self;
838
839 let brokers = brokers
840 .into_iter()
841 .map(|broker| broker.into_inline_connection(&r))
842 .collect();
843
844 KafkaConnection {
845 brokers,
846 progress_topic,
847 progress_topic_options,
848 default_tunnel: default_tunnel.into_inline_connection(&r),
849 options,
850 tls,
851 sasl: sasl.map(|sasl| sasl.into_inline_connection(&r)),
852 }
853 }
854}
855
856impl<C: ConnectionAccess> KafkaConnection<C> {
857 pub fn progress_topic(
862 &self,
863 connection_context: &ConnectionContext,
864 connection_id: CatalogItemId,
865 ) -> Cow<'_, str> {
866 if let Some(progress_topic) = &self.progress_topic {
867 Cow::Borrowed(progress_topic)
868 } else {
869 Cow::Owned(format!(
870 "_materialize-progress-{}-{}",
871 connection_context.environment_id, connection_id,
872 ))
873 }
874 }
875
876 fn validate_by_default(&self) -> bool {
877 true
878 }
879}
880
881impl KafkaConnection {
882 pub fn id_base(
886 connection_context: &ConnectionContext,
887 connection_id: CatalogItemId,
888 object_id: GlobalId,
889 ) -> String {
890 format!(
891 "materialize-{}-{}-{}",
892 connection_context.environment_id, connection_id, object_id,
893 )
894 }
895
896 pub fn enrich_client_id(&self, configs: &ConfigSet, client_id: &mut String) {
899 #[derive(Debug, Deserialize)]
900 struct EnrichmentRule {
901 #[serde(deserialize_with = "deserialize_regex")]
902 pattern: Regex,
903 payload: String,
904 }
905
906 fn deserialize_regex<'de, D>(deserializer: D) -> Result<Regex, D::Error>
907 where
908 D: Deserializer<'de>,
909 {
910 let buf = String::deserialize(deserializer)?;
911 Regex::new(&buf).map_err(serde::de::Error::custom)
912 }
913
914 let rules = KAFKA_CLIENT_ID_ENRICHMENT_RULES.get(configs);
915 let rules = match serde_json::from_value::<Vec<EnrichmentRule>>(rules) {
916 Ok(rules) => rules,
917 Err(e) => {
918 warn!(%e, "failed to decode kafka_client_id_enrichment_rules");
919 return;
920 }
921 };
922
923 debug!(?self.brokers, "evaluating client ID enrichment rules");
928 for rule in rules {
929 let is_match = self
930 .brokers
931 .iter()
932 .any(|b| rule.pattern.is_match(&b.address));
933 debug!(?rule, is_match, "evaluated client ID enrichment rule");
934 if is_match {
935 client_id.push('-');
936 client_id.push_str(&rule.payload);
937 }
938 }
939 }
940
941 pub async fn create_with_context<C, T>(
943 &self,
944 storage_configuration: &StorageConfiguration,
945 context: C,
946 extra_options: &BTreeMap<&str, String>,
947 in_task: InTask,
948 ) -> Result<T, ContextCreationError>
949 where
950 C: ClientContext,
951 T: FromClientConfigAndContext<TunnelingClientContext<C>>,
952 {
953 let mut options = self.options.clone();
954
955 options.insert("allow.auto.create.topics".into(), "false".into());
960
961 let brokers = match &self.default_tunnel {
962 Tunnel::AwsPrivatelink(t) => {
963 assert!(&self.brokers.is_empty());
964
965 let algo = KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM
966 .get(storage_configuration.config_set());
967 options.insert("ssl.endpoint.identification.algorithm".into(), algo.into());
968
969 format!(
972 "{}:{}",
973 vpc_endpoint_host(
974 t.connection_id,
975 None, ),
977 t.port.unwrap_or(9092)
978 )
979 }
980 Tunnel::AwsPrivatelinks(_pl) => {
981 let algo = KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM
982 .get(storage_configuration.config_set());
983 options.insert("ssl.endpoint.identification.algorithm".into(), algo.into());
984
985 if self.brokers.is_empty() {
986 return Err(ContextCreationError::Other(anyhow::anyhow!(
987 "at least one static broker is required when using BROKER or BROKERS"
988 )));
989 }
990 self.brokers.iter().map(|b| &b.address).join(",")
991 }
992 _ => self.brokers.iter().map(|b| &b.address).join(","),
993 };
994 options.insert("bootstrap.servers".into(), brokers.clone().into());
995 let security_protocol = match (self.tls.is_some(), self.sasl.is_some()) {
996 (false, false) => "PLAINTEXT",
997 (true, false) => "SSL",
998 (false, true) => "SASL_PLAINTEXT",
999 (true, true) => "SASL_SSL",
1000 };
1001 info!(
1002 "kafka: create_with_context bootstrap.servers={brokers}, security_protocol={security_protocol}"
1003 );
1004 options.insert("security.protocol".into(), security_protocol.into());
1005 if let Some(tls) = &self.tls {
1006 if let Some(root_cert) = &tls.root_cert {
1007 options.insert("ssl.ca.pem".into(), root_cert.clone());
1008 }
1009 if let Some(identity) = &tls.identity {
1010 options.insert("ssl.key.pem".into(), StringOrSecret::Secret(identity.key));
1011 options.insert("ssl.certificate.pem".into(), identity.cert.clone());
1012 }
1013 }
1014 if let Some(sasl) = &self.sasl {
1015 options.insert("sasl.mechanisms".into(), (&sasl.mechanism).into());
1016 options.insert("sasl.username".into(), sasl.username.clone());
1017 if let Some(password) = sasl.password {
1018 options.insert("sasl.password".into(), StringOrSecret::Secret(password));
1019 }
1020 }
1021
1022 options.insert(
1023 "retry.backoff.ms".into(),
1024 KAFKA_RETRY_BACKOFF
1025 .get(storage_configuration.config_set())
1026 .as_millis()
1027 .into(),
1028 );
1029 options.insert(
1030 "retry.backoff.max.ms".into(),
1031 KAFKA_RETRY_BACKOFF_MAX
1032 .get(storage_configuration.config_set())
1033 .as_millis()
1034 .into(),
1035 );
1036 options.insert(
1037 "reconnect.backoff.ms".into(),
1038 KAFKA_RECONNECT_BACKOFF
1039 .get(storage_configuration.config_set())
1040 .as_millis()
1041 .into(),
1042 );
1043 options.insert(
1044 "reconnect.backoff.max.ms".into(),
1045 KAFKA_RECONNECT_BACKOFF_MAX
1046 .get(storage_configuration.config_set())
1047 .as_millis()
1048 .into(),
1049 );
1050
1051 let mut config = mz_kafka_util::client::create_new_client_config(
1052 storage_configuration
1053 .connection_context
1054 .librdkafka_log_level,
1055 storage_configuration.parameters.kafka_timeout_config,
1056 );
1057 for (k, v) in options {
1058 config.set(
1059 k,
1060 v.get_string(
1061 in_task,
1062 &storage_configuration.connection_context.secrets_reader,
1063 )
1064 .await
1065 .context("reading kafka secret")?,
1066 );
1067 }
1068 for (k, v) in extra_options {
1069 config.set(*k, v);
1070 }
1071
1072 let aws_config = match self.sasl.as_ref().and_then(|sasl| sasl.aws.as_ref()) {
1073 None => None,
1074 Some(aws) => Some(
1075 aws.connection
1076 .load_sdk_config(
1077 &storage_configuration.connection_context,
1078 aws.connection_id,
1079 in_task,
1080 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1081 )
1082 .await?,
1083 ),
1084 };
1085
1086 let mut context = TunnelingClientContext::new(
1090 context,
1091 Handle::current(),
1092 storage_configuration
1093 .connection_context
1094 .ssh_tunnel_manager
1095 .clone(),
1096 storage_configuration.parameters.ssh_timeout_config,
1097 aws_config,
1098 in_task,
1099 );
1100
1101 match &self.default_tunnel {
1102 Tunnel::Direct => {
1103 }
1105 Tunnel::AwsPrivatelink(pl) => {
1106 context.set_default_tunnel(TunnelConfig::StaticHost(
1107 KafkaConnection::from_default_aws_privatelink(pl).host,
1109 ));
1110 }
1111 Tunnel::AwsPrivatelinks(pl) => {
1112 context.set_default_tunnel(TunnelConfig::Rules(
1113 KafkaConnection::from_aws_privatelinks(pl),
1114 ));
1115 }
1116 Tunnel::Ssh(ssh_tunnel) => {
1117 let secret = storage_configuration
1118 .connection_context
1119 .secrets_reader
1120 .read_in_task_if(in_task, ssh_tunnel.connection_id)
1121 .await?;
1122 let key_pair = SshKeyPair::from_bytes(&secret)?;
1123
1124 let resolved = resolve_address(
1126 &ssh_tunnel.connection.host,
1127 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1128 )
1129 .await?;
1130 context.set_default_tunnel(TunnelConfig::Ssh(SshTunnelConfig {
1131 host: resolved
1132 .iter()
1133 .map(|a| a.to_string())
1134 .collect::<BTreeSet<_>>(),
1135 port: ssh_tunnel.connection.port,
1136 user: ssh_tunnel.connection.user.clone(),
1137 key_pair,
1138 }));
1139 }
1140 }
1141 info!(
1142 "kafka: tunnel config set to {}",
1143 match &self.default_tunnel {
1144 Tunnel::Direct => "Direct".to_string(),
1145 Tunnel::AwsPrivatelink(_) => "AwsPrivatelink (static host)".to_string(),
1146 Tunnel::AwsPrivatelinks(pl) =>
1147 format!("AwsPrivatelinks ({} rules)", pl.rules.len()),
1148 Tunnel::Ssh(_) => "Ssh".to_string(),
1149 }
1150 );
1151
1152 for broker in &self.brokers {
1155 let mut addr_parts = broker.address.splitn(2, ':');
1156 let addr = BrokerAddr {
1157 host: addr_parts
1158 .next()
1159 .context("BROKER is not address:port")?
1160 .into(),
1161 port: addr_parts
1162 .next()
1163 .unwrap_or("9092")
1164 .parse()
1165 .context("parsing BROKER port")?,
1166 };
1167 match &broker.tunnel {
1168 Tunnel::Direct => {
1169 }
1179 Tunnel::AwsPrivatelink(aws_privatelink) => {
1180 context.add_broker_rewrite(
1181 addr,
1182 KafkaConnection::from_aws_privatelink(aws_privatelink),
1183 );
1184 }
1185 Tunnel::AwsPrivatelinks(_) => unreachable!(
1186 "Individually predefined brokers do not use rule-based PrivateLinks routing."
1187 ),
1188 Tunnel::Ssh(ssh_tunnel) => {
1189 let ssh_host_resolved = resolve_address(
1191 &ssh_tunnel.connection.host,
1192 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1193 )
1194 .await?;
1195 context
1196 .add_ssh_tunnel(
1197 addr,
1198 SshTunnelConfig {
1199 host: ssh_host_resolved
1200 .iter()
1201 .map(|a| a.to_string())
1202 .collect::<BTreeSet<_>>(),
1203 port: ssh_tunnel.connection.port,
1204 user: ssh_tunnel.connection.user.clone(),
1205 key_pair: SshKeyPair::from_bytes(
1206 &storage_configuration
1207 .connection_context
1208 .secrets_reader
1209 .read_in_task_if(in_task, ssh_tunnel.connection_id)
1210 .await?,
1211 )?,
1212 },
1213 )
1214 .await
1215 .map_err(ContextCreationError::Ssh)?;
1216 }
1217 }
1218 }
1219
1220 Ok(config.create_with_context(context)?)
1221 }
1222
1223 async fn validate(
1224 &self,
1225 _id: CatalogItemId,
1226 storage_configuration: &StorageConfiguration,
1227 ) -> Result<(), anyhow::Error> {
1228 let (context, error_rx) = MzClientContext::with_errors();
1229 let consumer: BaseConsumer<_> = self
1230 .create_with_context(
1231 storage_configuration,
1232 context,
1233 &BTreeMap::new(),
1234 InTask::No,
1236 )
1237 .await?;
1238 let consumer = Arc::new(consumer);
1239
1240 let timeout = storage_configuration
1241 .parameters
1242 .kafka_timeout_config
1243 .fetch_metadata_timeout;
1244
1245 info!("kafka: starting connection validation via fetch_metadata (timeout={timeout:?})");
1256 let result = mz_ore::task::spawn_blocking(|| "kafka_get_metadata", {
1257 let consumer = Arc::clone(&consumer);
1258 move || consumer.fetch_metadata(None, timeout)
1259 })
1260 .await;
1261 info!(
1262 "kafka: connection validation result: {}",
1263 if result.is_ok() { "success" } else { "failed" },
1264 );
1265 match result {
1266 Ok(_) => Ok(()),
1267 Err(err) => {
1272 let main_err = error_rx.try_iter().reduce(|cur, new| match cur {
1276 MzKafkaError::Internal(_) => new,
1277 _ => cur,
1278 });
1279
1280 drop(consumer);
1284
1285 match main_err {
1286 Some(err) => Err(err.into()),
1287 None => Err(err.into()),
1288 }
1289 }
1290 }
1291 }
1292
1293 fn from_default_aws_privatelink(pl: &AwsPrivatelink) -> BrokerRewrite {
1295 BrokerRewrite {
1296 host: vpc_endpoint_host(
1297 pl.connection_id,
1298 None, ),
1300 port: pl.port,
1301 }
1302 }
1303
1304 fn from_aws_privatelink(pl: &AwsPrivatelink) -> BrokerRewrite {
1306 BrokerRewrite {
1307 host: vpc_endpoint_host(pl.connection_id, pl.availability_zone.as_deref()),
1308 port: pl.port,
1309 }
1310 }
1311
1312 fn from_aws_privatelink_rule(
1313 AwsPrivatelinkRule { pattern, to }: &AwsPrivatelinkRule,
1314 ) -> (mz_kafka_util::client::ConnectionRulePattern, BrokerRewrite) {
1315 (
1316 mz_kafka_util::client::ConnectionRulePattern {
1317 prefix_wildcard: pattern.prefix_wildcard,
1318 literal_match: pattern.literal_match.clone(),
1319 suffix_wildcard: pattern.suffix_wildcard,
1320 },
1321 KafkaConnection::from_aws_privatelink(to),
1322 )
1323 }
1324
1325 fn from_aws_privatelinks(pl: &AwsPrivatelinks) -> HostMappingRules {
1326 HostMappingRules {
1327 rules: pl
1328 .rules
1329 .iter()
1330 .map(KafkaConnection::from_aws_privatelink_rule)
1331 .collect_vec(),
1332 }
1333 }
1334}
1335
1336impl<C: ConnectionAccess> AlterCompatible for KafkaConnection<C> {
1337 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1338 let KafkaConnection {
1339 brokers: _,
1340 default_tunnel: _,
1341 progress_topic,
1342 progress_topic_options,
1343 options: _,
1344 tls: _,
1345 sasl: _,
1346 } = self;
1347
1348 let compatibility_checks = [
1349 (progress_topic == &other.progress_topic, "progress_topic"),
1350 (
1351 progress_topic_options == &other.progress_topic_options,
1352 "progress_topic_options",
1353 ),
1354 ];
1355
1356 for (compatible, field) in compatibility_checks {
1357 if !compatible {
1358 tracing::warn!(
1359 "KafkaConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1360 self,
1361 other
1362 );
1363
1364 return Err(AlterError { id });
1365 }
1366 }
1367
1368 Ok(())
1369 }
1370}
1371
1372#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1374pub struct CsrConnection<C: ConnectionAccess = InlinedConnection> {
1375 pub url: Url,
1377 pub tls_root_cert: Option<StringOrSecret>,
1379 pub tls_identity: Option<TlsIdentity>,
1382 pub http_auth: Option<CsrConnectionHttpAuth>,
1384 pub tunnel: Tunnel<C>,
1386}
1387
1388impl<R: ConnectionResolver> IntoInlineConnection<CsrConnection, R>
1389 for CsrConnection<ReferencedConnection>
1390{
1391 fn into_inline_connection(self, r: R) -> CsrConnection {
1392 let CsrConnection {
1393 url,
1394 tls_root_cert,
1395 tls_identity,
1396 http_auth,
1397 tunnel,
1398 } = self;
1399 CsrConnection {
1400 url,
1401 tls_root_cert,
1402 tls_identity,
1403 http_auth,
1404 tunnel: tunnel.into_inline_connection(r),
1405 }
1406 }
1407}
1408
1409impl<C: ConnectionAccess> CsrConnection<C> {
1410 fn validate_by_default(&self) -> bool {
1411 true
1412 }
1413}
1414
1415impl CsrConnection {
1416 pub async fn connect(
1418 &self,
1419 storage_configuration: &StorageConfiguration,
1420 in_task: InTask,
1421 ) -> Result<mz_ccsr::Client, CsrConnectError> {
1422 let mut client_config = mz_ccsr::ClientConfig::new(self.url.clone());
1423 if let Some(root_cert) = &self.tls_root_cert {
1424 let root_cert = root_cert
1425 .get_string(
1426 in_task,
1427 &storage_configuration.connection_context.secrets_reader,
1428 )
1429 .await?;
1430 let root_cert = Certificate::from_pem(root_cert.as_bytes())?;
1431 client_config = client_config.add_root_certificate(root_cert);
1432 }
1433
1434 if let Some(tls_identity) = &self.tls_identity {
1435 let key = &storage_configuration
1436 .connection_context
1437 .secrets_reader
1438 .read_string_in_task_if(in_task, tls_identity.key)
1439 .await?;
1440 let cert = tls_identity
1441 .cert
1442 .get_string(
1443 in_task,
1444 &storage_configuration.connection_context.secrets_reader,
1445 )
1446 .await?;
1447 let ident = Identity::from_pem(key.as_bytes(), cert.as_bytes())?;
1448 client_config = client_config.identity(ident);
1449 }
1450
1451 if let Some(http_auth) = &self.http_auth {
1452 let username = http_auth
1453 .username
1454 .get_string(
1455 in_task,
1456 &storage_configuration.connection_context.secrets_reader,
1457 )
1458 .await?;
1459 let password = match http_auth.password {
1460 None => None,
1461 Some(password) => Some(
1462 storage_configuration
1463 .connection_context
1464 .secrets_reader
1465 .read_string_in_task_if(in_task, password)
1466 .await?,
1467 ),
1468 };
1469 client_config = client_config.auth(username, password);
1470 }
1471
1472 let host = self
1474 .url
1475 .host_str()
1476 .ok_or_else(|| anyhow!("url missing host"))?;
1477 match &self.tunnel {
1478 Tunnel::Direct => {
1479 let resolved = resolve_address(
1481 host,
1482 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1483 )
1484 .await?;
1485 client_config = client_config.resolve_to_addrs(
1486 host,
1487 &resolved
1488 .iter()
1489 .map(|addr| SocketAddr::new(*addr, 0))
1490 .collect::<Vec<_>>(),
1491 )
1492 }
1493 Tunnel::Ssh(ssh_tunnel) => {
1494 let ssh_tunnel = ssh_tunnel
1495 .connect(
1496 storage_configuration,
1497 host,
1498 self.url.port_or_known_default().unwrap_or(80),
1501 in_task,
1502 )
1503 .await
1504 .map_err(CsrConnectError::Ssh)?;
1505
1506 client_config = client_config
1512 .resolve_to_addrs(host, &[SocketAddr::new(ssh_tunnel.local_addr().ip(), 0)])
1519 .dynamic_url({
1530 let remote_url = self.url.clone();
1531 move || {
1532 let mut url = remote_url.clone();
1533 url.set_port(Some(ssh_tunnel.local_addr().port()))
1534 .expect("cannot fail");
1535 url
1536 }
1537 });
1538 }
1539 Tunnel::AwsPrivatelink(connection) => {
1540 assert_none!(connection.port);
1541
1542 let privatelink_host = mz_cloud_resources::vpc_endpoint_host(
1543 connection.connection_id,
1544 connection.availability_zone.as_deref(),
1545 );
1546 let addrs: Vec<_> = net::lookup_host((privatelink_host, 0))
1547 .await
1548 .context("resolving PrivateLink host")?
1549 .collect();
1550 client_config = client_config.resolve_to_addrs(host, &addrs)
1551 }
1552 Tunnel::AwsPrivatelinks(_) => {
1553 unreachable!("MATCHING broker rules are only available for Kafka connections.");
1554 }
1555 }
1556
1557 Ok(client_config.build()?)
1558 }
1559
1560 async fn validate(
1561 &self,
1562 _id: CatalogItemId,
1563 storage_configuration: &StorageConfiguration,
1564 ) -> Result<(), anyhow::Error> {
1565 let client = self
1566 .connect(
1567 storage_configuration,
1568 InTask::No,
1570 )
1571 .await?;
1572 client.list_subjects().await?;
1573 Ok(())
1574 }
1575}
1576
1577impl<C: ConnectionAccess> AlterCompatible for CsrConnection<C> {
1578 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1579 let CsrConnection {
1580 tunnel,
1581 url: _,
1583 tls_root_cert: _,
1584 tls_identity: _,
1585 http_auth: _,
1586 } = self;
1587
1588 let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
1589
1590 for (compatible, field) in compatibility_checks {
1591 if !compatible {
1592 tracing::warn!(
1593 "CsrConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1594 self,
1595 other
1596 );
1597
1598 return Err(AlterError { id });
1599 }
1600 }
1601 Ok(())
1602 }
1603}
1604
1605#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1616pub struct GlueSchemaRegistryConnection<C: ConnectionAccess = InlinedConnection> {
1617 pub aws_connection: AwsConnectionReference<C>,
1620 pub registry_name: String,
1622}
1623
1624impl<R: ConnectionResolver> IntoInlineConnection<GlueSchemaRegistryConnection, R>
1625 for GlueSchemaRegistryConnection<ReferencedConnection>
1626{
1627 fn into_inline_connection(self, r: R) -> GlueSchemaRegistryConnection {
1628 let GlueSchemaRegistryConnection {
1629 aws_connection,
1630 registry_name,
1631 } = self;
1632 GlueSchemaRegistryConnection {
1633 aws_connection: aws_connection.into_inline_connection(&r),
1634 registry_name,
1635 }
1636 }
1637}
1638
1639impl<C: ConnectionAccess> GlueSchemaRegistryConnection<C> {
1640 fn validate_by_default(&self) -> bool {
1641 true
1647 }
1648}
1649
1650impl GlueSchemaRegistryConnection {
1651 async fn validate(
1652 &self,
1653 _id: CatalogItemId,
1654 _storage_configuration: &StorageConfiguration,
1655 ) -> Result<(), anyhow::Error> {
1656 std::future::ready(Ok(())).await
1662 }
1663}
1664
1665impl<C: ConnectionAccess> AlterCompatible for GlueSchemaRegistryConnection<C> {
1666 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1667 let GlueSchemaRegistryConnection {
1668 registry_name,
1669 aws_connection: _,
1672 } = self;
1673
1674 let compatibility_checks = [(registry_name == &other.registry_name, "registry_name")];
1675
1676 for (compatible, field) in compatibility_checks {
1677 if !compatible {
1678 tracing::warn!(
1679 "GlueSchemaRegistryConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1680 self,
1681 other
1682 );
1683
1684 return Err(AlterError { id });
1685 }
1686 }
1687 Ok(())
1688 }
1689}
1690
1691#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1693pub struct TlsIdentity {
1694 pub cert: StringOrSecret,
1696 pub key: CatalogItemId,
1699}
1700
1701#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1703pub struct CsrConnectionHttpAuth {
1704 pub username: StringOrSecret,
1706 pub password: Option<CatalogItemId>,
1708}
1709
1710#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1712pub struct PostgresConnection<C: ConnectionAccess = InlinedConnection> {
1713 pub host: String,
1715 pub port: u16,
1717 pub database: String,
1719 pub user: StringOrSecret,
1721 pub password: Option<CatalogItemId>,
1723 pub tunnel: Tunnel<C>,
1725 pub tls_mode: SslMode,
1727 pub tls_root_cert: Option<StringOrSecret>,
1730 pub tls_identity: Option<TlsIdentity>,
1732}
1733
1734impl<R: ConnectionResolver> IntoInlineConnection<PostgresConnection, R>
1735 for PostgresConnection<ReferencedConnection>
1736{
1737 fn into_inline_connection(self, r: R) -> PostgresConnection {
1738 let PostgresConnection {
1739 host,
1740 port,
1741 database,
1742 user,
1743 password,
1744 tunnel,
1745 tls_mode,
1746 tls_root_cert,
1747 tls_identity,
1748 } = self;
1749
1750 PostgresConnection {
1751 host,
1752 port,
1753 database,
1754 user,
1755 password,
1756 tunnel: tunnel.into_inline_connection(r),
1757 tls_mode,
1758 tls_root_cert,
1759 tls_identity,
1760 }
1761 }
1762}
1763
1764impl<C: ConnectionAccess> PostgresConnection<C> {
1765 fn validate_by_default(&self) -> bool {
1766 true
1767 }
1768}
1769
1770impl PostgresConnection<InlinedConnection> {
1771 pub async fn config(
1772 &self,
1773 secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
1774 storage_configuration: &StorageConfiguration,
1775 in_task: InTask,
1776 ) -> Result<mz_postgres_util::Config, anyhow::Error> {
1777 let params = &storage_configuration.parameters;
1778
1779 let mut config = tokio_postgres::Config::new();
1780 config
1781 .host(&self.host)
1782 .port(self.port)
1783 .dbname(&self.database)
1784 .user(&self.user.get_string(in_task, secrets_reader).await?)
1785 .ssl_mode(self.tls_mode);
1786 if let Some(password) = self.password {
1787 let password = secrets_reader
1788 .read_string_in_task_if(in_task, password)
1789 .await?;
1790 config.password(password);
1791 }
1792 if let Some(tls_root_cert) = &self.tls_root_cert {
1793 let tls_root_cert = tls_root_cert.get_string(in_task, secrets_reader).await?;
1794 config.ssl_root_cert(tls_root_cert.as_bytes());
1795 }
1796 if let Some(tls_identity) = &self.tls_identity {
1797 let cert = tls_identity
1798 .cert
1799 .get_string(in_task, secrets_reader)
1800 .await?;
1801 let key = secrets_reader
1802 .read_string_in_task_if(in_task, tls_identity.key)
1803 .await?;
1804 config.ssl_cert(cert.as_bytes()).ssl_key(key.as_bytes());
1805 }
1806
1807 if let Some(connect_timeout) = params.pg_source_connect_timeout {
1808 config.connect_timeout(connect_timeout);
1809 }
1810 if let Some(keepalives_retries) = params.pg_source_tcp_keepalives_retries {
1811 config.keepalives_retries(keepalives_retries);
1812 }
1813 if let Some(keepalives_idle) = params.pg_source_tcp_keepalives_idle {
1814 config.keepalives_idle(keepalives_idle);
1815 }
1816 if let Some(keepalives_interval) = params.pg_source_tcp_keepalives_interval {
1817 config.keepalives_interval(keepalives_interval);
1818 }
1819 if let Some(tcp_user_timeout) = params.pg_source_tcp_user_timeout {
1820 config.tcp_user_timeout(tcp_user_timeout);
1821 }
1822
1823 let mut options = vec![];
1824 if let Some(wal_sender_timeout) = params.pg_source_wal_sender_timeout {
1825 options.push(format!(
1826 "--wal_sender_timeout={}",
1827 wal_sender_timeout.as_millis()
1828 ));
1829 };
1830 if params.pg_source_tcp_configure_server {
1831 if let Some(keepalives_retries) = params.pg_source_tcp_keepalives_retries {
1832 options.push(format!("--tcp_keepalives_count={}", keepalives_retries));
1833 }
1834 if let Some(keepalives_idle) = params.pg_source_tcp_keepalives_idle {
1835 options.push(format!(
1836 "--tcp_keepalives_idle={}",
1837 keepalives_idle.as_secs()
1838 ));
1839 }
1840 if let Some(keepalives_interval) = params.pg_source_tcp_keepalives_interval {
1841 options.push(format!(
1842 "--tcp_keepalives_interval={}",
1843 keepalives_interval.as_secs()
1844 ));
1845 }
1846 if let Some(tcp_user_timeout) = params.pg_source_tcp_user_timeout {
1847 options.push(format!(
1848 "--tcp_user_timeout={}",
1849 tcp_user_timeout.as_millis()
1850 ));
1851 }
1852 }
1853 config.options(options.join(" ").as_str());
1854
1855 let tunnel = match &self.tunnel {
1856 Tunnel::Direct => {
1857 let resolved = resolve_address(
1859 &self.host,
1860 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1861 )
1862 .await?;
1863 mz_postgres_util::TunnelConfig::Direct {
1864 resolved_ips: Some(resolved),
1865 }
1866 }
1867 Tunnel::Ssh(SshTunnel {
1868 connection_id,
1869 connection,
1870 }) => {
1871 let secret = secrets_reader
1872 .read_in_task_if(in_task, *connection_id)
1873 .await?;
1874 let key_pair = SshKeyPair::from_bytes(&secret)?;
1875 let resolved = resolve_address(
1877 &connection.host,
1878 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1879 )
1880 .await?;
1881 mz_postgres_util::TunnelConfig::Ssh {
1882 config: SshTunnelConfig {
1883 host: resolved
1884 .iter()
1885 .map(|a| a.to_string())
1886 .collect::<BTreeSet<_>>(),
1887 port: connection.port,
1888 user: connection.user.clone(),
1889 key_pair,
1890 },
1891 }
1892 }
1893 Tunnel::AwsPrivatelink(connection) => {
1894 assert_none!(connection.port);
1895 mz_postgres_util::TunnelConfig::AwsPrivatelink {
1896 connection_id: connection.connection_id,
1897 }
1898 }
1899 Tunnel::AwsPrivatelinks(_) => {
1900 unreachable!("MATCHING broker rules are only available for Kafka connections.");
1901 }
1902 };
1903
1904 Ok(mz_postgres_util::Config::new(
1905 config,
1906 tunnel,
1907 params.ssh_timeout_config,
1908 in_task,
1909 )?)
1910 }
1911
1912 pub async fn validate(
1913 &self,
1914 _id: CatalogItemId,
1915 storage_configuration: &StorageConfiguration,
1916 ) -> Result<mz_postgres_util::Client, anyhow::Error> {
1917 let config = self
1918 .config(
1919 &storage_configuration.connection_context.secrets_reader,
1920 storage_configuration,
1921 InTask::No,
1923 )
1924 .await?;
1925 let client = config
1926 .connect(
1927 "connection validation",
1928 &storage_configuration.connection_context.ssh_tunnel_manager,
1929 )
1930 .await?;
1931
1932 let wal_level = mz_postgres_util::get_wal_level(&client).await?;
1933
1934 if wal_level < mz_postgres_util::replication::WalLevel::Logical {
1935 Err(PostgresConnectionValidationError::InsufficientWalLevel { wal_level })?;
1936 }
1937
1938 let max_wal_senders = mz_postgres_util::get_max_wal_senders(&client).await?;
1939
1940 if max_wal_senders < 1 {
1941 Err(PostgresConnectionValidationError::ReplicationDisabled)?;
1942 }
1943
1944 let available_replication_slots =
1945 mz_postgres_util::available_replication_slots(&client).await?;
1946
1947 if available_replication_slots < 2 {
1949 Err(
1950 PostgresConnectionValidationError::InsufficientReplicationSlotsAvailable {
1951 count: 2,
1952 },
1953 )?;
1954 }
1955
1956 Ok(client)
1957 }
1958}
1959
1960#[derive(Debug, Clone, thiserror::Error)]
1961pub enum PostgresConnectionValidationError {
1962 #[error("PostgreSQL server has insufficient number of replication slots available")]
1963 InsufficientReplicationSlotsAvailable { count: usize },
1964 #[error("server must have wal_level >= logical, but has {wal_level}")]
1965 InsufficientWalLevel {
1966 wal_level: mz_postgres_util::replication::WalLevel,
1967 },
1968 #[error("replication disabled on server")]
1969 ReplicationDisabled,
1970}
1971
1972impl PostgresConnectionValidationError {
1973 pub fn detail(&self) -> Option<String> {
1974 match self {
1975 Self::InsufficientReplicationSlotsAvailable { count } => Some(format!(
1976 "executing this statement requires {} replication slot{}",
1977 count,
1978 if *count == 1 { "" } else { "s" }
1979 )),
1980 _ => None,
1981 }
1982 }
1983
1984 pub fn hint(&self) -> Option<String> {
1985 match self {
1986 Self::InsufficientReplicationSlotsAvailable { .. } => Some(
1987 "you might be able to wait for other sources to finish snapshotting and try again"
1988 .into(),
1989 ),
1990 Self::ReplicationDisabled => Some("set max_wal_senders to a value > 0".into()),
1991 Self::InsufficientWalLevel { .. } => None,
1992 }
1993 }
1994}
1995
1996impl<C: ConnectionAccess> AlterCompatible for PostgresConnection<C> {
1997 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1998 let PostgresConnection {
1999 tunnel,
2000 host: _,
2002 port: _,
2003 database: _,
2004 user: _,
2005 password: _,
2006 tls_mode: _,
2007 tls_root_cert: _,
2008 tls_identity: _,
2009 } = self;
2010
2011 let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
2012
2013 for (compatible, field) in compatibility_checks {
2014 if !compatible {
2015 tracing::warn!(
2016 "PostgresConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2017 self,
2018 other
2019 );
2020
2021 return Err(AlterError { id });
2022 }
2023 }
2024 Ok(())
2025 }
2026}
2027
2028#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2030pub enum Tunnel<C: ConnectionAccess = InlinedConnection> {
2031 Direct,
2033 Ssh(SshTunnel<C>),
2035 AwsPrivatelink(AwsPrivatelink),
2037 AwsPrivatelinks(AwsPrivatelinks),
2038}
2039
2040impl<R: ConnectionResolver> IntoInlineConnection<Tunnel, R> for Tunnel<ReferencedConnection> {
2041 fn into_inline_connection(self, r: R) -> Tunnel {
2042 match self {
2043 Tunnel::Direct => Tunnel::Direct,
2044 Tunnel::Ssh(ssh) => Tunnel::Ssh(ssh.into_inline_connection(r)),
2045 Tunnel::AwsPrivatelink(awspl) => Tunnel::AwsPrivatelink(awspl),
2046 Tunnel::AwsPrivatelinks(x) => Tunnel::AwsPrivatelinks(x),
2047 }
2048 }
2049}
2050
2051impl<C: ConnectionAccess> AlterCompatible for Tunnel<C> {
2052 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2053 let compatible = match (self, other) {
2054 (Self::Ssh(s), Self::Ssh(o)) => s.alter_compatible(id, o).is_ok(),
2055 (s, o) => s == o,
2056 };
2057
2058 if !compatible {
2059 tracing::warn!(
2060 "Tunnel incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
2061 self,
2062 other
2063 );
2064
2065 return Err(AlterError { id });
2066 }
2067
2068 Ok(())
2069 }
2070}
2071
2072#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2076pub enum MySqlSslMode {
2077 Disabled,
2078 Required,
2079 VerifyCa,
2080 VerifyIdentity,
2081}
2082
2083#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2085pub struct MySqlConnection<C: ConnectionAccess = InlinedConnection> {
2086 pub host: String,
2088 pub port: u16,
2090 pub user: StringOrSecret,
2092 pub password: Option<CatalogItemId>,
2094 pub tunnel: Tunnel<C>,
2096 pub tls_mode: MySqlSslMode,
2098 pub tls_root_cert: Option<StringOrSecret>,
2101 pub tls_identity: Option<TlsIdentity>,
2103 pub aws_connection: Option<AwsConnectionReference<C>>,
2106}
2107
2108impl<R: ConnectionResolver> IntoInlineConnection<MySqlConnection, R>
2109 for MySqlConnection<ReferencedConnection>
2110{
2111 fn into_inline_connection(self, r: R) -> MySqlConnection {
2112 let MySqlConnection {
2113 host,
2114 port,
2115 user,
2116 password,
2117 tunnel,
2118 tls_mode,
2119 tls_root_cert,
2120 tls_identity,
2121 aws_connection,
2122 } = self;
2123
2124 MySqlConnection {
2125 host,
2126 port,
2127 user,
2128 password,
2129 tunnel: tunnel.into_inline_connection(&r),
2130 tls_mode,
2131 tls_root_cert,
2132 tls_identity,
2133 aws_connection: aws_connection.map(|aws| aws.into_inline_connection(&r)),
2134 }
2135 }
2136}
2137
2138impl<C: ConnectionAccess> MySqlConnection<C> {
2139 fn validate_by_default(&self) -> bool {
2140 true
2141 }
2142}
2143
2144impl MySqlConnection<InlinedConnection> {
2145 pub async fn config(
2146 &self,
2147 secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
2148 storage_configuration: &StorageConfiguration,
2149 in_task: InTask,
2150 ) -> Result<mz_mysql_util::Config, anyhow::Error> {
2151 let mut opts = mysql_async::OptsBuilder::default()
2153 .ip_or_hostname(&self.host)
2154 .tcp_port(self.port)
2155 .user(Some(&self.user.get_string(in_task, secrets_reader).await?));
2156
2157 if let Some(password) = self.password {
2158 let password = secrets_reader
2159 .read_string_in_task_if(in_task, password)
2160 .await?;
2161 opts = opts.pass(Some(password));
2162 }
2163
2164 let mut ssl_opts = match self.tls_mode {
2169 MySqlSslMode::Disabled => None,
2170 MySqlSslMode::Required => Some(
2171 mysql_async::SslOpts::default()
2172 .with_danger_accept_invalid_certs(true)
2173 .with_danger_skip_domain_validation(true),
2174 ),
2175 MySqlSslMode::VerifyCa => {
2176 Some(mysql_async::SslOpts::default().with_danger_skip_domain_validation(true))
2177 }
2178 MySqlSslMode::VerifyIdentity => Some(mysql_async::SslOpts::default()),
2179 };
2180
2181 if matches!(
2182 self.tls_mode,
2183 MySqlSslMode::VerifyCa | MySqlSslMode::VerifyIdentity
2184 ) {
2185 if let Some(tls_root_cert) = &self.tls_root_cert {
2186 let tls_root_cert = tls_root_cert.get_string(in_task, secrets_reader).await?;
2187 ssl_opts = ssl_opts.map(|opts| {
2188 opts.with_root_certs(vec![tls_root_cert.as_bytes().to_vec().into()])
2189 });
2190 }
2191 }
2192
2193 if let Some(identity) = &self.tls_identity {
2194 let key = secrets_reader
2195 .read_string_in_task_if(in_task, identity.key)
2196 .await?;
2197 let cert = identity.cert.get_string(in_task, secrets_reader).await?;
2198 let (der, pass) =
2199 mz_tls_util::pkcs12der_from_pem(key.as_bytes(), cert.as_bytes())?.into_parts();
2200
2201 ssl_opts = ssl_opts.map(|opts| {
2203 opts.with_client_identity(Some(
2204 mysql_async::ClientIdentity::new(der.into()).with_password(pass),
2205 ))
2206 });
2207 }
2208
2209 opts = opts.ssl_opts(ssl_opts);
2210
2211 let tunnel = match &self.tunnel {
2212 Tunnel::Direct => {
2213 let resolved = resolve_address(
2215 &self.host,
2216 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2217 )
2218 .await?;
2219 mz_mysql_util::TunnelConfig::Direct {
2220 resolved_ips: Some(resolved),
2221 }
2222 }
2223 Tunnel::Ssh(SshTunnel {
2224 connection_id,
2225 connection,
2226 }) => {
2227 let secret = secrets_reader
2228 .read_in_task_if(in_task, *connection_id)
2229 .await?;
2230 let key_pair = SshKeyPair::from_bytes(&secret)?;
2231 let resolved = resolve_address(
2233 &connection.host,
2234 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2235 )
2236 .await?;
2237 mz_mysql_util::TunnelConfig::Ssh {
2238 config: SshTunnelConfig {
2239 host: resolved
2240 .iter()
2241 .map(|a| a.to_string())
2242 .collect::<BTreeSet<_>>(),
2243 port: connection.port,
2244 user: connection.user.clone(),
2245 key_pair,
2246 },
2247 }
2248 }
2249 Tunnel::AwsPrivatelink(connection) => {
2250 assert_none!(connection.port);
2251 mz_mysql_util::TunnelConfig::AwsPrivatelink {
2252 connection_id: connection.connection_id,
2253 }
2254 }
2255 Tunnel::AwsPrivatelinks(_) => {
2256 unreachable!("MATCHING broker rules are only available for Kafka connections.");
2257 }
2258 };
2259
2260 let aws_config = match self.aws_connection.as_ref() {
2261 None => None,
2262 Some(aws_ref) => Some(
2263 aws_ref
2264 .connection
2265 .load_sdk_config(
2266 &storage_configuration.connection_context,
2267 aws_ref.connection_id,
2268 in_task,
2269 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2270 )
2271 .await?,
2272 ),
2273 };
2274
2275 Ok(mz_mysql_util::Config::new(
2276 opts,
2277 tunnel,
2278 storage_configuration.parameters.ssh_timeout_config,
2279 in_task,
2280 storage_configuration
2281 .parameters
2282 .mysql_source_timeouts
2283 .clone(),
2284 aws_config,
2285 )?)
2286 }
2287
2288 pub async fn validate(
2289 &self,
2290 _id: CatalogItemId,
2291 storage_configuration: &StorageConfiguration,
2292 ) -> Result<MySqlConn, MySqlConnectionValidationError> {
2293 let config = self
2294 .config(
2295 &storage_configuration.connection_context.secrets_reader,
2296 storage_configuration,
2297 InTask::No,
2299 )
2300 .await?;
2301 let mut conn = config
2302 .connect(
2303 "connection validation",
2304 &storage_configuration.connection_context.ssh_tunnel_manager,
2305 )
2306 .await?;
2307
2308 let mut setting_errors = vec![];
2310 let gtid_res = mz_mysql_util::ensure_gtid_consistency(&mut conn).await;
2311 let binlog_res = mz_mysql_util::ensure_full_row_binlog_format(&mut conn).await;
2312 let order_res = mz_mysql_util::ensure_replication_commit_order(&mut conn).await;
2313 for res in [gtid_res, binlog_res, order_res] {
2314 match res {
2315 Err(MySqlError::InvalidSystemSetting {
2316 setting,
2317 expected,
2318 actual,
2319 }) => {
2320 setting_errors.push((setting, expected, actual));
2321 }
2322 Err(err) => Err(err)?,
2323 Ok(()) => {}
2324 }
2325 }
2326 if !setting_errors.is_empty() {
2327 Err(MySqlConnectionValidationError::ReplicationSettingsError(
2328 setting_errors,
2329 ))?;
2330 }
2331
2332 Ok(conn)
2333 }
2334}
2335
2336#[derive(Debug, thiserror::Error)]
2337pub enum MySqlConnectionValidationError {
2338 #[error("Invalid MySQL system replication settings")]
2339 ReplicationSettingsError(Vec<(String, String, String)>),
2340 #[error(transparent)]
2341 Client(#[from] MySqlError),
2342 #[error("{}", .0.display_with_causes())]
2343 Other(#[from] anyhow::Error),
2344}
2345
2346impl MySqlConnectionValidationError {
2347 pub fn detail(&self) -> Option<String> {
2348 match self {
2349 Self::ReplicationSettingsError(settings) => Some(format!(
2350 "Invalid MySQL system replication settings: {}",
2351 itertools::join(
2352 settings.iter().map(|(setting, expected, actual)| format!(
2353 "{}: expected {}, got {}",
2354 setting, expected, actual
2355 )),
2356 "; "
2357 )
2358 )),
2359 _ => None,
2360 }
2361 }
2362
2363 pub fn hint(&self) -> Option<String> {
2364 match self {
2365 Self::ReplicationSettingsError(_) => {
2366 Some("Set the necessary MySQL database system settings.".into())
2367 }
2368 _ => None,
2369 }
2370 }
2371}
2372
2373impl<C: ConnectionAccess> AlterCompatible for MySqlConnection<C> {
2374 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2375 let MySqlConnection {
2376 tunnel,
2377 host: _,
2379 port: _,
2380 user: _,
2381 password: _,
2382 tls_mode: _,
2383 tls_root_cert: _,
2384 tls_identity: _,
2385 aws_connection: _,
2386 } = self;
2387
2388 let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
2389
2390 for (compatible, field) in compatibility_checks {
2391 if !compatible {
2392 tracing::warn!(
2393 "MySqlConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2394 self,
2395 other
2396 );
2397
2398 return Err(AlterError { id });
2399 }
2400 }
2401 Ok(())
2402 }
2403}
2404
2405#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2412pub struct SqlServerConnectionDetails<C: ConnectionAccess = InlinedConnection> {
2413 pub host: String,
2415 pub port: u16,
2417 pub database: String,
2419 pub user: StringOrSecret,
2421 pub password: CatalogItemId,
2423 pub tunnel: Tunnel<C>,
2425 pub encryption: mz_sql_server_util::config::EncryptionLevel,
2427 pub certificate_validation_policy: mz_sql_server_util::config::CertificateValidationPolicy,
2429 pub tls_root_cert: Option<StringOrSecret>,
2431}
2432
2433impl<C: ConnectionAccess> SqlServerConnectionDetails<C> {
2434 fn validate_by_default(&self) -> bool {
2435 true
2436 }
2437}
2438
2439impl SqlServerConnectionDetails<InlinedConnection> {
2440 pub async fn validate(
2442 &self,
2443 _id: CatalogItemId,
2444 storage_configuration: &StorageConfiguration,
2445 ) -> Result<mz_sql_server_util::Client, anyhow::Error> {
2446 let config = self
2447 .resolve_config(
2448 &storage_configuration.connection_context.secrets_reader,
2449 storage_configuration,
2450 InTask::No,
2451 )
2452 .await?;
2453 tracing::debug!(?config, "Validating SQL Server connection");
2454
2455 let mut client = mz_sql_server_util::Client::connect(config).await?;
2456
2457 let mut replication_errors = vec![];
2462 for error in [
2463 mz_sql_server_util::inspect::ensure_database_cdc_enabled(&mut client).await,
2464 mz_sql_server_util::inspect::ensure_snapshot_isolation_enabled(&mut client).await,
2465 mz_sql_server_util::inspect::ensure_sql_server_agent_running(&mut client).await,
2466 ] {
2467 match error {
2468 Err(mz_sql_server_util::SqlServerError::InvalidSystemSetting {
2469 name,
2470 expected,
2471 actual,
2472 }) => replication_errors.push((name, expected, actual)),
2473 Err(other) => Err(other)?,
2474 Ok(()) => (),
2475 }
2476 }
2477 if !replication_errors.is_empty() {
2478 Err(SqlServerConnectionValidationError::ReplicationSettingsError(replication_errors))?;
2479 }
2480
2481 Ok(client)
2482 }
2483
2484 pub async fn resolve_config(
2494 &self,
2495 secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
2496 storage_configuration: &StorageConfiguration,
2497 in_task: InTask,
2498 ) -> Result<mz_sql_server_util::Config, anyhow::Error> {
2499 let dyncfg = storage_configuration.config_set();
2500 let mut inner_config = tiberius::Config::new();
2501
2502 inner_config.host(&self.host);
2504 inner_config.port(self.port);
2505 inner_config.database(self.database.clone());
2506 inner_config.encryption(self.encryption.into());
2507 match self.certificate_validation_policy {
2508 mz_sql_server_util::config::CertificateValidationPolicy::TrustAll => {
2509 inner_config.trust_cert()
2510 }
2511 mz_sql_server_util::config::CertificateValidationPolicy::VerifyCA => {
2512 inner_config.trust_cert_ca_pem(
2513 self.tls_root_cert
2514 .as_ref()
2515 .unwrap()
2516 .get_string(in_task, secrets_reader)
2517 .await
2518 .context("ca certificate")?,
2519 );
2520 }
2521 mz_sql_server_util::config::CertificateValidationPolicy::VerifySystem => (), }
2523
2524 inner_config.application_name("materialize");
2525
2526 let user = self
2528 .user
2529 .get_string(in_task, secrets_reader)
2530 .await
2531 .context("username")?;
2532 let password = secrets_reader
2533 .read_string_in_task_if(in_task, self.password)
2534 .await
2535 .context("password")?;
2536 inner_config.authentication(tiberius::AuthMethod::sql_server(user, password));
2539
2540 let enforce_external_addresses = ENFORCE_EXTERNAL_ADDRESSES.get(dyncfg);
2543
2544 let tunnel = match &self.tunnel {
2545 Tunnel::Direct => {
2546 let resolved_addresses: Vec<SocketAddr> =
2547 resolve_address(&self.host, enforce_external_addresses)
2548 .await?
2549 .into_iter()
2550 .map(|ip| SocketAddr::new(ip, self.port))
2551 .collect();
2552 mz_sql_server_util::config::TunnelConfig::Direct {
2553 resolved_addresses: resolved_addresses.into_boxed_slice(),
2554 }
2555 }
2556 Tunnel::Ssh(SshTunnel {
2557 connection_id,
2558 connection: ssh_connection,
2559 }) => {
2560 let secret = secrets_reader
2561 .read_in_task_if(in_task, *connection_id)
2562 .await
2563 .context("ssh secret")?;
2564 let key_pair = SshKeyPair::from_bytes(&secret).context("ssh key pair")?;
2565 let addresses = resolve_address(&ssh_connection.host, enforce_external_addresses)
2568 .await
2569 .context("ssh tunnel")?;
2570
2571 let config = SshTunnelConfig {
2572 host: addresses.into_iter().map(|a| a.to_string()).collect(),
2573 port: ssh_connection.port,
2574 user: ssh_connection.user.clone(),
2575 key_pair,
2576 };
2577 mz_sql_server_util::config::TunnelConfig::Ssh {
2578 config,
2579 manager: storage_configuration
2580 .connection_context
2581 .ssh_tunnel_manager
2582 .clone(),
2583 timeout: storage_configuration.parameters.ssh_timeout_config.clone(),
2584 host: self.host.clone(),
2585 port: self.port,
2586 }
2587 }
2588 Tunnel::AwsPrivatelink(private_link_connection) => {
2589 assert_none!(private_link_connection.port);
2590 mz_sql_server_util::config::TunnelConfig::AwsPrivatelink {
2591 connection_id: private_link_connection.connection_id,
2592 port: self.port,
2593 }
2594 }
2595 Tunnel::AwsPrivatelinks(_) => {
2596 unreachable!("MATCHING broker rules are only available for Kafka connections.");
2597 }
2598 };
2599
2600 Ok(mz_sql_server_util::Config::new(
2601 inner_config,
2602 tunnel,
2603 in_task,
2604 ))
2605 }
2606}
2607
2608#[derive(Debug, Clone, thiserror::Error)]
2609pub enum SqlServerConnectionValidationError {
2610 #[error("Invalid SQL Server system replication settings")]
2611 ReplicationSettingsError(Vec<(String, String, String)>),
2612}
2613
2614impl SqlServerConnectionValidationError {
2615 pub fn detail(&self) -> Option<String> {
2616 match self {
2617 Self::ReplicationSettingsError(settings) => Some(format!(
2618 "Invalid SQL Server system replication settings: {}",
2619 itertools::join(
2620 settings.iter().map(|(setting, expected, actual)| format!(
2621 "{}: expected {}, got {}",
2622 setting, expected, actual
2623 )),
2624 "; "
2625 )
2626 )),
2627 }
2628 }
2629
2630 pub fn hint(&self) -> Option<String> {
2631 match self {
2632 _ => None,
2633 }
2634 }
2635}
2636
2637impl<R: ConnectionResolver> IntoInlineConnection<SqlServerConnectionDetails, R>
2638 for SqlServerConnectionDetails<ReferencedConnection>
2639{
2640 fn into_inline_connection(self, r: R) -> SqlServerConnectionDetails {
2641 let SqlServerConnectionDetails {
2642 host,
2643 port,
2644 database,
2645 user,
2646 password,
2647 tunnel,
2648 encryption,
2649 certificate_validation_policy,
2650 tls_root_cert,
2651 } = self;
2652
2653 SqlServerConnectionDetails {
2654 host,
2655 port,
2656 database,
2657 user,
2658 password,
2659 tunnel: tunnel.into_inline_connection(&r),
2660 encryption,
2661 certificate_validation_policy,
2662 tls_root_cert,
2663 }
2664 }
2665}
2666
2667impl<C: ConnectionAccess> AlterCompatible for SqlServerConnectionDetails<C> {
2668 fn alter_compatible(
2669 &self,
2670 id: mz_repr::GlobalId,
2671 other: &Self,
2672 ) -> Result<(), crate::controller::AlterError> {
2673 let SqlServerConnectionDetails {
2674 tunnel,
2675 host: _,
2677 port: _,
2678 database: _,
2679 user: _,
2680 password: _,
2681 encryption: _,
2682 certificate_validation_policy: _,
2683 tls_root_cert: _,
2684 } = self;
2685
2686 let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
2687
2688 for (compatible, field) in compatibility_checks {
2689 if !compatible {
2690 tracing::warn!(
2691 "SqlServerConnectionDetails incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2692 self,
2693 other
2694 );
2695
2696 return Err(AlterError { id });
2697 }
2698 }
2699 Ok(())
2700 }
2701}
2702
2703#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2705pub struct SshConnection {
2706 pub host: String,
2707 pub port: u16,
2708 pub user: String,
2709}
2710
2711use self::inline::{
2712 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
2713 ReferencedConnection,
2714};
2715
2716impl AlterCompatible for SshConnection {
2717 fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
2718 Ok(())
2720 }
2721}
2722
2723#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2725pub struct AwsPrivatelink {
2726 pub connection_id: CatalogItemId,
2728 pub availability_zone: Option<String>,
2730 pub port: Option<u16>,
2733}
2734
2735impl AlterCompatible for AwsPrivatelink {
2736 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2737 let AwsPrivatelink {
2738 connection_id,
2739 availability_zone: _,
2740 port: _,
2741 } = self;
2742
2743 let compatibility_checks = [(connection_id == &other.connection_id, "connection_id")];
2744
2745 for (compatible, field) in compatibility_checks {
2746 if !compatible {
2747 tracing::warn!(
2748 "AwsPrivatelink incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2749 self,
2750 other
2751 );
2752
2753 return Err(AlterError { id });
2754 }
2755 }
2756
2757 Ok(())
2758 }
2759}
2760
2761#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2762pub struct AwsPrivatelinks {
2763 pub rules: Vec<AwsPrivatelinkRule>,
2767}
2768
2769#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2770pub struct AwsPrivatelinkRule {
2771 pub pattern: ConnectionRulePattern,
2773 pub to: AwsPrivatelink,
2775}
2776
2777#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2779pub struct SshTunnel<C: ConnectionAccess = InlinedConnection> {
2780 pub connection_id: CatalogItemId,
2782 pub connection: C::Ssh,
2784}
2785
2786impl<R: ConnectionResolver> IntoInlineConnection<SshTunnel, R> for SshTunnel<ReferencedConnection> {
2787 fn into_inline_connection(self, r: R) -> SshTunnel {
2788 let SshTunnel {
2789 connection,
2790 connection_id,
2791 } = self;
2792
2793 SshTunnel {
2794 connection: r.resolve_connection(connection).unwrap_ssh(),
2795 connection_id,
2796 }
2797 }
2798}
2799
2800impl SshTunnel<InlinedConnection> {
2801 async fn connect(
2804 &self,
2805 storage_configuration: &StorageConfiguration,
2806 remote_host: &str,
2807 remote_port: u16,
2808 in_task: InTask,
2809 ) -> Result<ManagedSshTunnelHandle, anyhow::Error> {
2810 let resolved = resolve_address(
2812 &self.connection.host,
2813 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2814 )
2815 .await?;
2816 storage_configuration
2817 .connection_context
2818 .ssh_tunnel_manager
2819 .connect(
2820 SshTunnelConfig {
2821 host: resolved
2822 .iter()
2823 .map(|a| a.to_string())
2824 .collect::<BTreeSet<_>>(),
2825 port: self.connection.port,
2826 user: self.connection.user.clone(),
2827 key_pair: SshKeyPair::from_bytes(
2828 &storage_configuration
2829 .connection_context
2830 .secrets_reader
2831 .read_in_task_if(in_task, self.connection_id)
2832 .await?,
2833 )?,
2834 },
2835 remote_host,
2836 remote_port,
2837 storage_configuration.parameters.ssh_timeout_config,
2838 in_task,
2839 )
2840 .await
2841 }
2842}
2843
2844impl<C: ConnectionAccess> AlterCompatible for SshTunnel<C> {
2845 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2846 let SshTunnel {
2847 connection_id,
2848 connection,
2849 } = self;
2850
2851 let compatibility_checks = [
2852 (connection_id == &other.connection_id, "connection_id"),
2853 (
2854 connection.alter_compatible(id, &other.connection).is_ok(),
2855 "connection",
2856 ),
2857 ];
2858
2859 for (compatible, field) in compatibility_checks {
2860 if !compatible {
2861 tracing::warn!(
2862 "SshTunnel incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2863 self,
2864 other
2865 );
2866
2867 return Err(AlterError { id });
2868 }
2869 }
2870
2871 Ok(())
2872 }
2873}
2874
2875impl SshConnection {
2876 #[allow(clippy::unused_async)]
2877 async fn validate(
2878 &self,
2879 id: CatalogItemId,
2880 storage_configuration: &StorageConfiguration,
2881 ) -> Result<(), anyhow::Error> {
2882 let secret = storage_configuration
2883 .connection_context
2884 .secrets_reader
2885 .read_in_task_if(
2886 InTask::No,
2888 id,
2889 )
2890 .await?;
2891 let key_pair = SshKeyPair::from_bytes(&secret)?;
2892
2893 let resolved = resolve_address(
2895 &self.host,
2896 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2897 )
2898 .await?;
2899
2900 let config = SshTunnelConfig {
2901 host: resolved
2902 .iter()
2903 .map(|a| a.to_string())
2904 .collect::<BTreeSet<_>>(),
2905 port: self.port,
2906 user: self.user.clone(),
2907 key_pair,
2908 };
2909 config
2912 .validate(storage_configuration.parameters.ssh_timeout_config)
2913 .await
2914 }
2915
2916 fn validate_by_default(&self) -> bool {
2917 false
2918 }
2919}
2920
2921impl AwsPrivatelinkConnection {
2922 #[allow(clippy::unused_async)]
2923 async fn validate(
2924 &self,
2925 id: CatalogItemId,
2926 storage_configuration: &StorageConfiguration,
2927 ) -> Result<(), anyhow::Error> {
2928 let Some(ref cloud_resource_reader) = storage_configuration
2929 .connection_context
2930 .cloud_resource_reader
2931 else {
2932 return Err(anyhow!("AWS PrivateLink connections are unsupported"));
2933 };
2934
2935 let status = cloud_resource_reader.read(id).await?;
2937
2938 let availability = status
2939 .conditions
2940 .as_ref()
2941 .and_then(|conditions| conditions.iter().find(|c| c.type_ == "Available"));
2942
2943 match availability {
2944 Some(condition) if condition.status == "True" => Ok(()),
2945 Some(condition) => Err(anyhow!("{}", condition.message)),
2946 None => Err(anyhow!("Endpoint availability is unknown")),
2947 }
2948 }
2949
2950 fn validate_by_default(&self) -> bool {
2951 false
2952 }
2953}