1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::net::SocketAddr;
15use std::sync::Arc;
16
17use anyhow::{Context, anyhow};
18use async_trait::async_trait;
19use aws_credential_types::provider::ProvideCredentials;
20use iceberg::Catalog;
21use iceberg::CatalogBuilder;
22use iceberg::io::{
23 AwsCredential, AwsCredentialLoad, CustomAwsCredentialLoader, S3_ACCESS_KEY_ID,
24 S3_DISABLE_EC2_METADATA, S3_REGION, S3_SECRET_ACCESS_KEY,
25};
26use iceberg_catalog_rest::{
27 REST_CATALOG_PROP_URI, REST_CATALOG_PROP_WAREHOUSE, RestCatalogBuilder,
28};
29use itertools::Itertools;
30use mz_ccsr::tls::{Certificate, Identity};
31use mz_cloud_resources::{AwsExternalIdPrefix, CloudResourceReader, vpc_endpoint_host};
32use mz_dyncfg::ConfigSet;
33use mz_kafka_util::client::{
34 BrokerAddr, BrokerRewrite, MzClientContext, MzKafkaError, TunnelConfig, TunnelingClientContext,
35};
36use mz_mysql_util::{MySqlConn, MySqlError};
37use mz_ore::assert_none;
38use mz_ore::error::ErrorExt;
39use mz_ore::future::{InTask, OreFutureExt};
40use mz_ore::netio::resolve_address;
41use mz_ore::num::NonNeg;
42use mz_repr::{CatalogItemId, GlobalId};
43use mz_secrets::SecretsReader;
44use mz_ssh_util::keys::SshKeyPair;
45use mz_ssh_util::tunnel::SshTunnelConfig;
46use mz_ssh_util::tunnel_manager::{ManagedSshTunnelHandle, SshTunnelManager};
47use mz_tls_util::Pkcs12Archive;
48use mz_tracing::CloneableEnvFilter;
49use rdkafka::ClientContext;
50use rdkafka::config::FromClientConfigAndContext;
51use rdkafka::consumer::{BaseConsumer, Consumer};
52use regex::Regex;
53use serde::{Deserialize, Deserializer, Serialize};
54use tokio::net;
55use tokio::runtime::Handle;
56use tokio_postgres::config::SslMode;
57use tracing::{debug, warn};
58use url::Url;
59
60use crate::AlterCompatible;
61use crate::configuration::StorageConfiguration;
62use crate::connections::aws::{
63 AwsAuth, AwsConnection, AwsConnectionReference, AwsConnectionValidationError,
64};
65use crate::connections::string_or_secret::StringOrSecret;
66use crate::controller::AlterError;
67use crate::dyncfgs::{
68 ENFORCE_EXTERNAL_ADDRESSES, KAFKA_CLIENT_ID_ENRICHMENT_RULES,
69 KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM, KAFKA_RECONNECT_BACKOFF,
70 KAFKA_RECONNECT_BACKOFF_MAX, KAFKA_RETRY_BACKOFF, KAFKA_RETRY_BACKOFF_MAX,
71};
72use crate::errors::{ContextCreationError, CsrConnectError};
73
74pub mod aws;
75pub mod inline;
76pub mod string_or_secret;
77
78const REST_CATALOG_PROP_SCOPE: &str = "scope";
79const REST_CATALOG_PROP_CREDENTIAL: &str = "credential";
80
81struct AwsSdkCredentialLoader {
89 provider: aws_credential_types::provider::SharedCredentialsProvider,
92}
93
94impl AwsSdkCredentialLoader {
95 fn new(provider: aws_credential_types::provider::SharedCredentialsProvider) -> Self {
96 Self { provider }
97 }
98}
99
100#[async_trait]
101impl AwsCredentialLoad for AwsSdkCredentialLoader {
102 async fn load_credential(
103 &self,
104 _client: reqwest::Client,
105 ) -> anyhow::Result<Option<AwsCredential>> {
106 let creds = self
107 .provider
108 .provide_credentials()
109 .await
110 .map_err(|e| {
111 warn!(
112 error = %e.display_with_causes(),
113 "failed to load AWS credentials for Iceberg FileIO from SDK provider"
114 );
115 e
116 })
117 .context(
118 "failed to load AWS credentials from SDK provider for Iceberg FileIO \
119 (credential source may be temporarily unavailable)",
120 )?;
121
122 Ok(Some(AwsCredential {
123 access_key_id: creds.access_key_id().to_string(),
124 secret_access_key: creds.secret_access_key().to_string(),
125 session_token: creds.session_token().map(|s| s.to_string()),
126 expires_in: creds.expiry().map(|t| t.into()),
127 }))
128 }
129}
130
131#[async_trait::async_trait]
133trait SecretsReaderExt {
134 async fn read_in_task_if(
136 &self,
137 in_task: InTask,
138 id: CatalogItemId,
139 ) -> Result<Vec<u8>, anyhow::Error>;
140
141 async fn read_string_in_task_if(
143 &self,
144 in_task: InTask,
145 id: CatalogItemId,
146 ) -> Result<String, anyhow::Error>;
147}
148
149#[async_trait::async_trait]
150impl SecretsReaderExt for Arc<dyn SecretsReader> {
151 async fn read_in_task_if(
152 &self,
153 in_task: InTask,
154 id: CatalogItemId,
155 ) -> Result<Vec<u8>, anyhow::Error> {
156 let sr = Arc::clone(self);
157 async move { sr.read(id).await }
158 .run_in_task_if(in_task, || "secrets_reader_read".to_string())
159 .await
160 }
161 async fn read_string_in_task_if(
162 &self,
163 in_task: InTask,
164 id: CatalogItemId,
165 ) -> Result<String, anyhow::Error> {
166 let sr = Arc::clone(self);
167 async move { sr.read_string(id).await }
168 .run_in_task_if(in_task, || "secrets_reader_read".to_string())
169 .await
170 }
171}
172
173#[derive(Debug, Clone)]
178pub struct ConnectionContext {
179 pub environment_id: String,
186 pub librdkafka_log_level: tracing::Level,
188 pub aws_external_id_prefix: Option<AwsExternalIdPrefix>,
190 pub aws_connection_role_arn: Option<String>,
193 pub secrets_reader: Arc<dyn SecretsReader>,
195 pub cloud_resource_reader: Option<Arc<dyn CloudResourceReader>>,
197 pub ssh_tunnel_manager: SshTunnelManager,
199}
200
201impl ConnectionContext {
202 pub fn from_cli_args(
210 environment_id: String,
211 startup_log_level: &CloneableEnvFilter,
212 aws_external_id_prefix: Option<AwsExternalIdPrefix>,
213 aws_connection_role_arn: Option<String>,
214 secrets_reader: Arc<dyn SecretsReader>,
215 cloud_resource_reader: Option<Arc<dyn CloudResourceReader>>,
216 ) -> ConnectionContext {
217 ConnectionContext {
218 environment_id,
219 librdkafka_log_level: mz_ore::tracing::crate_level(
220 &startup_log_level.clone().into(),
221 "librdkafka",
222 ),
223 aws_external_id_prefix,
224 aws_connection_role_arn,
225 secrets_reader,
226 cloud_resource_reader,
227 ssh_tunnel_manager: SshTunnelManager::default(),
228 }
229 }
230
231 pub fn for_tests(secrets_reader: Arc<dyn SecretsReader>) -> ConnectionContext {
233 ConnectionContext {
234 environment_id: "test-environment-id".into(),
235 librdkafka_log_level: tracing::Level::INFO,
236 aws_external_id_prefix: Some(
237 AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable(
238 "test-aws-external-id-prefix",
239 )
240 .expect("infallible"),
241 ),
242 aws_connection_role_arn: Some(
243 "arn:aws:iam::123456789000:role/MaterializeConnection".into(),
244 ),
245 secrets_reader,
246 cloud_resource_reader: None,
247 ssh_tunnel_manager: SshTunnelManager::default(),
248 }
249 }
250}
251
252#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
253pub enum Connection<C: ConnectionAccess = InlinedConnection> {
254 Kafka(KafkaConnection<C>),
255 Csr(CsrConnection<C>),
256 Postgres(PostgresConnection<C>),
257 Ssh(SshConnection),
258 Aws(AwsConnection),
259 AwsPrivatelink(AwsPrivatelinkConnection),
260 MySql(MySqlConnection<C>),
261 SqlServer(SqlServerConnectionDetails<C>),
262 IcebergCatalog(IcebergCatalogConnection<C>),
263}
264
265impl<R: ConnectionResolver> IntoInlineConnection<Connection, R>
266 for Connection<ReferencedConnection>
267{
268 fn into_inline_connection(self, r: R) -> Connection {
269 match self {
270 Connection::Kafka(kafka) => Connection::Kafka(kafka.into_inline_connection(r)),
271 Connection::Csr(csr) => Connection::Csr(csr.into_inline_connection(r)),
272 Connection::Postgres(pg) => Connection::Postgres(pg.into_inline_connection(r)),
273 Connection::Ssh(ssh) => Connection::Ssh(ssh),
274 Connection::Aws(aws) => Connection::Aws(aws),
275 Connection::AwsPrivatelink(awspl) => Connection::AwsPrivatelink(awspl),
276 Connection::MySql(mysql) => Connection::MySql(mysql.into_inline_connection(r)),
277 Connection::SqlServer(sql_server) => {
278 Connection::SqlServer(sql_server.into_inline_connection(r))
279 }
280 Connection::IcebergCatalog(iceberg) => {
281 Connection::IcebergCatalog(iceberg.into_inline_connection(r))
282 }
283 }
284 }
285}
286
287impl<C: ConnectionAccess> Connection<C> {
288 pub fn validate_by_default(&self) -> bool {
290 match self {
291 Connection::Kafka(conn) => conn.validate_by_default(),
292 Connection::Csr(conn) => conn.validate_by_default(),
293 Connection::Postgres(conn) => conn.validate_by_default(),
294 Connection::Ssh(conn) => conn.validate_by_default(),
295 Connection::Aws(conn) => conn.validate_by_default(),
296 Connection::AwsPrivatelink(conn) => conn.validate_by_default(),
297 Connection::MySql(conn) => conn.validate_by_default(),
298 Connection::SqlServer(conn) => conn.validate_by_default(),
299 Connection::IcebergCatalog(conn) => conn.validate_by_default(),
300 }
301 }
302}
303
304impl Connection<InlinedConnection> {
305 pub async fn validate(
307 &self,
308 id: CatalogItemId,
309 storage_configuration: &StorageConfiguration,
310 ) -> Result<(), ConnectionValidationError> {
311 match self {
312 Connection::Kafka(conn) => conn.validate(id, storage_configuration).await?,
313 Connection::Csr(conn) => conn.validate(id, storage_configuration).await?,
314 Connection::Postgres(conn) => {
315 conn.validate(id, storage_configuration).await?;
316 }
317 Connection::Ssh(conn) => conn.validate(id, storage_configuration).await?,
318 Connection::Aws(conn) => conn.validate(id, storage_configuration).await?,
319 Connection::AwsPrivatelink(conn) => conn.validate(id, storage_configuration).await?,
320 Connection::MySql(conn) => {
321 conn.validate(id, storage_configuration).await?;
322 }
323 Connection::SqlServer(conn) => {
324 conn.validate(id, storage_configuration).await?;
325 }
326 Connection::IcebergCatalog(conn) => conn.validate(id, storage_configuration).await?,
327 }
328 Ok(())
329 }
330
331 pub fn unwrap_kafka(self) -> <InlinedConnection as ConnectionAccess>::Kafka {
332 match self {
333 Self::Kafka(conn) => conn,
334 o => unreachable!("{o:?} is not a Kafka connection"),
335 }
336 }
337
338 pub fn unwrap_pg(self) -> <InlinedConnection as ConnectionAccess>::Pg {
339 match self {
340 Self::Postgres(conn) => conn,
341 o => unreachable!("{o:?} is not a Postgres connection"),
342 }
343 }
344
345 pub fn unwrap_mysql(self) -> <InlinedConnection as ConnectionAccess>::MySql {
346 match self {
347 Self::MySql(conn) => conn,
348 o => unreachable!("{o:?} is not a MySQL connection"),
349 }
350 }
351
352 pub fn unwrap_sql_server(self) -> <InlinedConnection as ConnectionAccess>::SqlServer {
353 match self {
354 Self::SqlServer(conn) => conn,
355 o => unreachable!("{o:?} is not a SQL Server connection"),
356 }
357 }
358
359 pub fn unwrap_aws(self) -> <InlinedConnection as ConnectionAccess>::Aws {
360 match self {
361 Self::Aws(conn) => conn,
362 o => unreachable!("{o:?} is not an AWS connection"),
363 }
364 }
365
366 pub fn unwrap_ssh(self) -> <InlinedConnection as ConnectionAccess>::Ssh {
367 match self {
368 Self::Ssh(conn) => conn,
369 o => unreachable!("{o:?} is not an SSH connection"),
370 }
371 }
372
373 pub fn unwrap_csr(self) -> <InlinedConnection as ConnectionAccess>::Csr {
374 match self {
375 Self::Csr(conn) => conn,
376 o => unreachable!("{o:?} is not a Kafka connection"),
377 }
378 }
379
380 pub fn unwrap_iceberg_catalog(self) -> <InlinedConnection as ConnectionAccess>::IcebergCatalog {
381 match self {
382 Self::IcebergCatalog(conn) => conn,
383 o => unreachable!("{o:?} is not an Iceberg catalog connection"),
384 }
385 }
386}
387
388#[derive(thiserror::Error, Debug)]
390pub enum ConnectionValidationError {
391 #[error(transparent)]
392 Postgres(#[from] PostgresConnectionValidationError),
393 #[error(transparent)]
394 MySql(#[from] MySqlConnectionValidationError),
395 #[error(transparent)]
396 SqlServer(#[from] SqlServerConnectionValidationError),
397 #[error(transparent)]
398 Aws(#[from] AwsConnectionValidationError),
399 #[error("{}", .0.display_with_causes())]
400 Other(#[from] anyhow::Error),
401}
402
403impl ConnectionValidationError {
404 pub fn detail(&self) -> Option<String> {
406 match self {
407 ConnectionValidationError::Postgres(e) => e.detail(),
408 ConnectionValidationError::MySql(e) => e.detail(),
409 ConnectionValidationError::SqlServer(e) => e.detail(),
410 ConnectionValidationError::Aws(e) => e.detail(),
411 ConnectionValidationError::Other(_) => None,
412 }
413 }
414
415 pub fn hint(&self) -> Option<String> {
417 match self {
418 ConnectionValidationError::Postgres(e) => e.hint(),
419 ConnectionValidationError::MySql(e) => e.hint(),
420 ConnectionValidationError::SqlServer(e) => e.hint(),
421 ConnectionValidationError::Aws(e) => e.hint(),
422 ConnectionValidationError::Other(_) => None,
423 }
424 }
425}
426
427impl<C: ConnectionAccess> AlterCompatible for Connection<C> {
428 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
429 match (self, other) {
430 (Self::Aws(s), Self::Aws(o)) => s.alter_compatible(id, o),
431 (Self::AwsPrivatelink(s), Self::AwsPrivatelink(o)) => s.alter_compatible(id, o),
432 (Self::Ssh(s), Self::Ssh(o)) => s.alter_compatible(id, o),
433 (Self::Csr(s), Self::Csr(o)) => s.alter_compatible(id, o),
434 (Self::Kafka(s), Self::Kafka(o)) => s.alter_compatible(id, o),
435 (Self::Postgres(s), Self::Postgres(o)) => s.alter_compatible(id, o),
436 (Self::MySql(s), Self::MySql(o)) => s.alter_compatible(id, o),
437 _ => {
438 tracing::warn!(
439 "Connection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
440 self,
441 other
442 );
443 Err(AlterError { id })
444 }
445 }
446 }
447}
448
449#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
450pub struct RestIcebergCatalog {
451 pub credential: StringOrSecret,
453 pub scope: Option<String>,
455 pub warehouse: Option<String>,
457}
458
459#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
460pub struct S3TablesRestIcebergCatalog<C: ConnectionAccess = InlinedConnection> {
461 pub aws_connection: AwsConnectionReference<C>,
463 pub warehouse: String,
465}
466
467impl<R: ConnectionResolver> IntoInlineConnection<S3TablesRestIcebergCatalog, R>
468 for S3TablesRestIcebergCatalog<ReferencedConnection>
469{
470 fn into_inline_connection(self, r: R) -> S3TablesRestIcebergCatalog {
471 S3TablesRestIcebergCatalog {
472 aws_connection: self.aws_connection.into_inline_connection(&r),
473 warehouse: self.warehouse,
474 }
475 }
476}
477
478#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
479pub enum IcebergCatalogType {
480 Rest,
481 S3TablesRest,
482}
483
484#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
485pub enum IcebergCatalogImpl<C: ConnectionAccess = InlinedConnection> {
486 Rest(RestIcebergCatalog),
487 S3TablesRest(S3TablesRestIcebergCatalog<C>),
488}
489
490impl<R: ConnectionResolver> IntoInlineConnection<IcebergCatalogImpl, R>
491 for IcebergCatalogImpl<ReferencedConnection>
492{
493 fn into_inline_connection(self, r: R) -> IcebergCatalogImpl {
494 match self {
495 IcebergCatalogImpl::Rest(rest) => IcebergCatalogImpl::Rest(rest),
496 IcebergCatalogImpl::S3TablesRest(s3tables) => {
497 IcebergCatalogImpl::S3TablesRest(s3tables.into_inline_connection(r))
498 }
499 }
500 }
501}
502
503#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
504pub struct IcebergCatalogConnection<C: ConnectionAccess = InlinedConnection> {
505 pub catalog: IcebergCatalogImpl<C>,
507 pub uri: reqwest::Url,
509}
510
511impl AlterCompatible for IcebergCatalogConnection {
512 fn alter_compatible(&self, id: GlobalId, _other: &Self) -> Result<(), AlterError> {
513 Err(AlterError { id })
514 }
515}
516
517impl<R: ConnectionResolver> IntoInlineConnection<IcebergCatalogConnection, R>
518 for IcebergCatalogConnection<ReferencedConnection>
519{
520 fn into_inline_connection(self, r: R) -> IcebergCatalogConnection {
521 IcebergCatalogConnection {
522 catalog: self.catalog.into_inline_connection(&r),
523 uri: self.uri,
524 }
525 }
526}
527
528impl<C: ConnectionAccess> IcebergCatalogConnection<C> {
529 fn validate_by_default(&self) -> bool {
530 true
531 }
532}
533
534impl IcebergCatalogConnection<InlinedConnection> {
535 pub async fn connect(
536 &self,
537 storage_configuration: &StorageConfiguration,
538 in_task: InTask,
539 ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
540 match self.catalog {
541 IcebergCatalogImpl::S3TablesRest(ref s3tables) => {
542 self.connect_s3tables(s3tables, storage_configuration, in_task)
543 .await
544 }
545 IcebergCatalogImpl::Rest(ref rest) => {
546 self.connect_rest(rest, storage_configuration, in_task)
547 .await
548 }
549 }
550 }
551
552 pub fn catalog_type(&self) -> IcebergCatalogType {
553 match self.catalog {
554 IcebergCatalogImpl::S3TablesRest(_) => IcebergCatalogType::S3TablesRest,
555 IcebergCatalogImpl::Rest(_) => IcebergCatalogType::Rest,
556 }
557 }
558
559 pub fn s3tables_catalog(&self) -> Option<&S3TablesRestIcebergCatalog> {
560 match &self.catalog {
561 IcebergCatalogImpl::S3TablesRest(s3tables) => Some(s3tables),
562 _ => None,
563 }
564 }
565
566 pub fn rest_catalog(&self) -> Option<&RestIcebergCatalog> {
567 match &self.catalog {
568 IcebergCatalogImpl::Rest(rest) => Some(rest),
569 _ => None,
570 }
571 }
572
573 async fn connect_s3tables(
574 &self,
575 s3tables: &S3TablesRestIcebergCatalog,
576 storage_configuration: &StorageConfiguration,
577 in_task: InTask,
578 ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
579 let secret_reader = &storage_configuration.connection_context.secrets_reader;
580 let aws_ref = &s3tables.aws_connection;
581 let aws_config = aws_ref
582 .connection
583 .load_sdk_config(
584 &storage_configuration.connection_context,
585 aws_ref.connection_id,
586 in_task,
587 )
588 .await
589 .with_context(|| {
590 format!(
591 "failed to load AWS SDK config for S3 Tables Iceberg catalog \
592 (connection id: {}, auth method: {}, catalog uri: {}, warehouse: {})",
593 aws_ref.connection_id,
594 aws_ref.connection.auth_method(),
595 self.uri,
596 s3tables.warehouse
597 )
598 })?;
599
600 let aws_region = aws_ref
601 .connection
602 .region
603 .clone()
604 .unwrap_or_else(|| "us-east-1".to_string());
605
606 let mut props = vec![
607 (S3_REGION.to_string(), aws_region),
608 (S3_DISABLE_EC2_METADATA.to_string(), "true".to_string()),
609 (
610 REST_CATALOG_PROP_WAREHOUSE.to_string(),
611 s3tables.warehouse.clone(),
612 ),
613 (REST_CATALOG_PROP_URI.to_string(), self.uri.to_string()),
614 ];
615
616 let aws_auth = aws_ref.connection.auth.clone();
617
618 if let AwsAuth::Credentials(creds) = &aws_auth {
619 props.push((
620 S3_ACCESS_KEY_ID.to_string(),
621 creds
622 .access_key_id
623 .get_string(in_task, secret_reader)
624 .await?,
625 ));
626 props.push((
627 S3_SECRET_ACCESS_KEY.to_string(),
628 secret_reader.read_string(creds.secret_access_key).await?,
629 ));
630 }
631
632 let catalog = RestCatalogBuilder::default()
636 .with_aws_config(aws_config.clone())
637 .load("IcebergCatalog", props.into_iter().collect())
638 .await
639 .with_context(|| {
640 format!(
641 "failed to create S3 Tables Iceberg catalog \
642 (connection id: {}, catalog uri: {}, warehouse: {})",
643 aws_ref.connection_id, self.uri, s3tables.warehouse
644 )
645 })?;
646
647 let catalog = if matches!(aws_auth, AwsAuth::AssumeRole(_)) {
648 let credentials_provider = aws_config
649 .credentials_provider()
650 .ok_or_else(|| anyhow!("aws_config missing credentials provider"))?;
651 let file_io_loader = CustomAwsCredentialLoader::new(Arc::new(
652 AwsSdkCredentialLoader::new(credentials_provider),
653 ));
654 catalog.with_file_io_extension(file_io_loader)
655 } else {
656 catalog
657 };
658
659 Ok(Arc::new(catalog))
660 }
661
662 async fn connect_rest(
663 &self,
664 rest: &RestIcebergCatalog,
665 storage_configuration: &StorageConfiguration,
666 in_task: InTask,
667 ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
668 let mut props = BTreeMap::from([(
669 REST_CATALOG_PROP_URI.to_string(),
670 self.uri.to_string().clone(),
671 )]);
672
673 if let Some(warehouse) = &rest.warehouse {
674 props.insert(REST_CATALOG_PROP_WAREHOUSE.to_string(), warehouse.clone());
675 }
676
677 let credential = rest
678 .credential
679 .get_string(
680 in_task,
681 &storage_configuration.connection_context.secrets_reader,
682 )
683 .await
684 .map_err(|e| anyhow!("failed to read Iceberg catalog credential: {e}"))?;
685 props.insert(REST_CATALOG_PROP_CREDENTIAL.to_string(), credential);
686
687 if let Some(scope) = &rest.scope {
688 props.insert(REST_CATALOG_PROP_SCOPE.to_string(), scope.clone());
689 }
690
691 let catalog = RestCatalogBuilder::default()
692 .load("IcebergCatalog", props.into_iter().collect())
693 .await
694 .map_err(|e| anyhow!("failed to create Iceberg catalog: {e}"))?;
695 Ok(Arc::new(catalog))
696 }
697
698 async fn validate(
699 &self,
700 _id: CatalogItemId,
701 storage_configuration: &StorageConfiguration,
702 ) -> Result<(), ConnectionValidationError> {
703 let catalog = self
704 .connect(storage_configuration, InTask::No)
705 .await
706 .map_err(|e| {
707 ConnectionValidationError::Other(anyhow!("failed to connect to catalog: {e}"))
708 })?;
709
710 catalog.list_namespaces(None).await.map_err(|e| {
712 ConnectionValidationError::Other(anyhow!("failed to list namespaces: {e}"))
713 })?;
714
715 Ok(())
716 }
717}
718
719#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
720pub struct AwsPrivatelinkConnection {
721 pub service_name: String,
722 pub availability_zones: Vec<String>,
723}
724
725impl AlterCompatible for AwsPrivatelinkConnection {
726 fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
727 Ok(())
729 }
730}
731
732#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
733pub struct KafkaTlsConfig {
734 pub identity: Option<TlsIdentity>,
735 pub root_cert: Option<StringOrSecret>,
736}
737
738#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
739pub struct KafkaSaslConfig<C: ConnectionAccess = InlinedConnection> {
740 pub mechanism: String,
741 pub username: StringOrSecret,
742 pub password: Option<CatalogItemId>,
743 pub aws: Option<AwsConnectionReference<C>>,
744}
745
746impl<R: ConnectionResolver> IntoInlineConnection<KafkaSaslConfig, R>
747 for KafkaSaslConfig<ReferencedConnection>
748{
749 fn into_inline_connection(self, r: R) -> KafkaSaslConfig {
750 KafkaSaslConfig {
751 mechanism: self.mechanism,
752 username: self.username,
753 password: self.password,
754 aws: self.aws.map(|aws| aws.into_inline_connection(&r)),
755 }
756 }
757}
758
759#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
761pub struct KafkaBroker<C: ConnectionAccess = InlinedConnection> {
762 pub address: String,
764 pub tunnel: Tunnel<C>,
766}
767
768impl<R: ConnectionResolver> IntoInlineConnection<KafkaBroker, R>
769 for KafkaBroker<ReferencedConnection>
770{
771 fn into_inline_connection(self, r: R) -> KafkaBroker {
772 let KafkaBroker { address, tunnel } = self;
773 KafkaBroker {
774 address,
775 tunnel: tunnel.into_inline_connection(r),
776 }
777 }
778}
779
780#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Default)]
781pub struct KafkaTopicOptions {
782 pub replication_factor: Option<NonNeg<i32>>,
785 pub partition_count: Option<NonNeg<i32>>,
788 pub topic_config: BTreeMap<String, String>,
790}
791
792#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
793pub struct KafkaConnection<C: ConnectionAccess = InlinedConnection> {
794 pub brokers: Vec<KafkaBroker<C>>,
795 pub default_tunnel: Tunnel<C>,
799 pub progress_topic: Option<String>,
800 pub progress_topic_options: KafkaTopicOptions,
801 pub options: BTreeMap<String, StringOrSecret>,
802 pub tls: Option<KafkaTlsConfig>,
803 pub sasl: Option<KafkaSaslConfig<C>>,
804}
805
806impl<R: ConnectionResolver> IntoInlineConnection<KafkaConnection, R>
807 for KafkaConnection<ReferencedConnection>
808{
809 fn into_inline_connection(self, r: R) -> KafkaConnection {
810 let KafkaConnection {
811 brokers,
812 progress_topic,
813 progress_topic_options,
814 default_tunnel,
815 options,
816 tls,
817 sasl,
818 } = self;
819
820 let brokers = brokers
821 .into_iter()
822 .map(|broker| broker.into_inline_connection(&r))
823 .collect();
824
825 KafkaConnection {
826 brokers,
827 progress_topic,
828 progress_topic_options,
829 default_tunnel: default_tunnel.into_inline_connection(&r),
830 options,
831 tls,
832 sasl: sasl.map(|sasl| sasl.into_inline_connection(&r)),
833 }
834 }
835}
836
837impl<C: ConnectionAccess> KafkaConnection<C> {
838 pub fn progress_topic(
843 &self,
844 connection_context: &ConnectionContext,
845 connection_id: CatalogItemId,
846 ) -> Cow<'_, str> {
847 if let Some(progress_topic) = &self.progress_topic {
848 Cow::Borrowed(progress_topic)
849 } else {
850 Cow::Owned(format!(
851 "_materialize-progress-{}-{}",
852 connection_context.environment_id, connection_id,
853 ))
854 }
855 }
856
857 fn validate_by_default(&self) -> bool {
858 true
859 }
860}
861
862impl KafkaConnection {
863 pub fn id_base(
867 connection_context: &ConnectionContext,
868 connection_id: CatalogItemId,
869 object_id: GlobalId,
870 ) -> String {
871 format!(
872 "materialize-{}-{}-{}",
873 connection_context.environment_id, connection_id, object_id,
874 )
875 }
876
877 pub fn enrich_client_id(&self, configs: &ConfigSet, client_id: &mut String) {
880 #[derive(Debug, Deserialize)]
881 struct EnrichmentRule {
882 #[serde(deserialize_with = "deserialize_regex")]
883 pattern: Regex,
884 payload: String,
885 }
886
887 fn deserialize_regex<'de, D>(deserializer: D) -> Result<Regex, D::Error>
888 where
889 D: Deserializer<'de>,
890 {
891 let buf = String::deserialize(deserializer)?;
892 Regex::new(&buf).map_err(serde::de::Error::custom)
893 }
894
895 let rules = KAFKA_CLIENT_ID_ENRICHMENT_RULES.get(configs);
896 let rules = match serde_json::from_value::<Vec<EnrichmentRule>>(rules) {
897 Ok(rules) => rules,
898 Err(e) => {
899 warn!(%e, "failed to decode kafka_client_id_enrichment_rules");
900 return;
901 }
902 };
903
904 debug!(?self.brokers, "evaluating client ID enrichment rules");
909 for rule in rules {
910 let is_match = self
911 .brokers
912 .iter()
913 .any(|b| rule.pattern.is_match(&b.address));
914 debug!(?rule, is_match, "evaluated client ID enrichment rule");
915 if is_match {
916 client_id.push('-');
917 client_id.push_str(&rule.payload);
918 }
919 }
920 }
921
922 pub async fn create_with_context<C, T>(
924 &self,
925 storage_configuration: &StorageConfiguration,
926 context: C,
927 extra_options: &BTreeMap<&str, String>,
928 in_task: InTask,
929 ) -> Result<T, ContextCreationError>
930 where
931 C: ClientContext,
932 T: FromClientConfigAndContext<TunnelingClientContext<C>>,
933 {
934 let mut options = self.options.clone();
935
936 options.insert("allow.auto.create.topics".into(), "false".into());
941
942 let brokers = match &self.default_tunnel {
943 Tunnel::AwsPrivatelink(t) => {
944 assert!(&self.brokers.is_empty());
945
946 let algo = KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM
947 .get(storage_configuration.config_set());
948 options.insert("ssl.endpoint.identification.algorithm".into(), algo.into());
949
950 format!(
953 "{}:{}",
954 vpc_endpoint_host(
955 t.connection_id,
956 None, ),
958 t.port.unwrap_or(9092)
959 )
960 }
961 _ => self.brokers.iter().map(|b| &b.address).join(","),
962 };
963 options.insert("bootstrap.servers".into(), brokers.into());
964 let security_protocol = match (self.tls.is_some(), self.sasl.is_some()) {
965 (false, false) => "PLAINTEXT",
966 (true, false) => "SSL",
967 (false, true) => "SASL_PLAINTEXT",
968 (true, true) => "SASL_SSL",
969 };
970 options.insert("security.protocol".into(), security_protocol.into());
971 if let Some(tls) = &self.tls {
972 if let Some(root_cert) = &tls.root_cert {
973 options.insert("ssl.ca.pem".into(), root_cert.clone());
974 }
975 if let Some(identity) = &tls.identity {
976 options.insert("ssl.key.pem".into(), StringOrSecret::Secret(identity.key));
977 options.insert("ssl.certificate.pem".into(), identity.cert.clone());
978 }
979 }
980 if let Some(sasl) = &self.sasl {
981 options.insert("sasl.mechanisms".into(), (&sasl.mechanism).into());
982 options.insert("sasl.username".into(), sasl.username.clone());
983 if let Some(password) = sasl.password {
984 options.insert("sasl.password".into(), StringOrSecret::Secret(password));
985 }
986 }
987
988 options.insert(
989 "retry.backoff.ms".into(),
990 KAFKA_RETRY_BACKOFF
991 .get(storage_configuration.config_set())
992 .as_millis()
993 .into(),
994 );
995 options.insert(
996 "retry.backoff.max.ms".into(),
997 KAFKA_RETRY_BACKOFF_MAX
998 .get(storage_configuration.config_set())
999 .as_millis()
1000 .into(),
1001 );
1002 options.insert(
1003 "reconnect.backoff.ms".into(),
1004 KAFKA_RECONNECT_BACKOFF
1005 .get(storage_configuration.config_set())
1006 .as_millis()
1007 .into(),
1008 );
1009 options.insert(
1010 "reconnect.backoff.max.ms".into(),
1011 KAFKA_RECONNECT_BACKOFF_MAX
1012 .get(storage_configuration.config_set())
1013 .as_millis()
1014 .into(),
1015 );
1016
1017 let mut config = mz_kafka_util::client::create_new_client_config(
1018 storage_configuration
1019 .connection_context
1020 .librdkafka_log_level,
1021 storage_configuration.parameters.kafka_timeout_config,
1022 );
1023 for (k, v) in options {
1024 config.set(
1025 k,
1026 v.get_string(
1027 in_task,
1028 &storage_configuration.connection_context.secrets_reader,
1029 )
1030 .await
1031 .context("reading kafka secret")?,
1032 );
1033 }
1034 for (k, v) in extra_options {
1035 config.set(*k, v);
1036 }
1037
1038 let aws_config = match self.sasl.as_ref().and_then(|sasl| sasl.aws.as_ref()) {
1039 None => None,
1040 Some(aws) => Some(
1041 aws.connection
1042 .load_sdk_config(
1043 &storage_configuration.connection_context,
1044 aws.connection_id,
1045 in_task,
1046 )
1047 .await?,
1048 ),
1049 };
1050
1051 let mut context = TunnelingClientContext::new(
1055 context,
1056 Handle::current(),
1057 storage_configuration
1058 .connection_context
1059 .ssh_tunnel_manager
1060 .clone(),
1061 storage_configuration.parameters.ssh_timeout_config,
1062 aws_config,
1063 in_task,
1064 );
1065
1066 match &self.default_tunnel {
1067 Tunnel::Direct => {
1068 }
1070 Tunnel::AwsPrivatelink(pl) => {
1071 context.set_default_tunnel(TunnelConfig::StaticHost(vpc_endpoint_host(
1072 pl.connection_id,
1073 None, )));
1075 }
1076 Tunnel::Ssh(ssh_tunnel) => {
1077 let secret = storage_configuration
1078 .connection_context
1079 .secrets_reader
1080 .read_in_task_if(in_task, ssh_tunnel.connection_id)
1081 .await?;
1082 let key_pair = SshKeyPair::from_bytes(&secret)?;
1083
1084 let resolved = resolve_address(
1086 &ssh_tunnel.connection.host,
1087 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1088 )
1089 .await?;
1090 context.set_default_tunnel(TunnelConfig::Ssh(SshTunnelConfig {
1091 host: resolved
1092 .iter()
1093 .map(|a| a.to_string())
1094 .collect::<BTreeSet<_>>(),
1095 port: ssh_tunnel.connection.port,
1096 user: ssh_tunnel.connection.user.clone(),
1097 key_pair,
1098 }));
1099 }
1100 }
1101
1102 for broker in &self.brokers {
1103 let mut addr_parts = broker.address.splitn(2, ':');
1104 let addr = BrokerAddr {
1105 host: addr_parts
1106 .next()
1107 .context("BROKER is not address:port")?
1108 .into(),
1109 port: addr_parts
1110 .next()
1111 .unwrap_or("9092")
1112 .parse()
1113 .context("parsing BROKER port")?,
1114 };
1115 match &broker.tunnel {
1116 Tunnel::Direct => {
1117 }
1127 Tunnel::AwsPrivatelink(aws_privatelink) => {
1128 let host = mz_cloud_resources::vpc_endpoint_host(
1129 aws_privatelink.connection_id,
1130 aws_privatelink.availability_zone.as_deref(),
1131 );
1132 let port = aws_privatelink.port;
1133 context.add_broker_rewrite(
1134 addr,
1135 BrokerRewrite {
1136 host: host.clone(),
1137 port,
1138 },
1139 );
1140 }
1141 Tunnel::Ssh(ssh_tunnel) => {
1142 let ssh_host_resolved = resolve_address(
1144 &ssh_tunnel.connection.host,
1145 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1146 )
1147 .await?;
1148 context
1149 .add_ssh_tunnel(
1150 addr,
1151 SshTunnelConfig {
1152 host: ssh_host_resolved
1153 .iter()
1154 .map(|a| a.to_string())
1155 .collect::<BTreeSet<_>>(),
1156 port: ssh_tunnel.connection.port,
1157 user: ssh_tunnel.connection.user.clone(),
1158 key_pair: SshKeyPair::from_bytes(
1159 &storage_configuration
1160 .connection_context
1161 .secrets_reader
1162 .read_in_task_if(in_task, ssh_tunnel.connection_id)
1163 .await?,
1164 )?,
1165 },
1166 )
1167 .await
1168 .map_err(ContextCreationError::Ssh)?;
1169 }
1170 }
1171 }
1172
1173 Ok(config.create_with_context(context)?)
1174 }
1175
1176 async fn validate(
1177 &self,
1178 _id: CatalogItemId,
1179 storage_configuration: &StorageConfiguration,
1180 ) -> Result<(), anyhow::Error> {
1181 let (context, error_rx) = MzClientContext::with_errors();
1182 let consumer: BaseConsumer<_> = self
1183 .create_with_context(
1184 storage_configuration,
1185 context,
1186 &BTreeMap::new(),
1187 InTask::No,
1189 )
1190 .await?;
1191 let consumer = Arc::new(consumer);
1192
1193 let timeout = storage_configuration
1194 .parameters
1195 .kafka_timeout_config
1196 .fetch_metadata_timeout;
1197
1198 let result = mz_ore::task::spawn_blocking(|| "kafka_get_metadata", {
1209 let consumer = Arc::clone(&consumer);
1210 move || consumer.fetch_metadata(None, timeout)
1211 })
1212 .await;
1213 match result {
1214 Ok(_) => Ok(()),
1215 Err(err) => {
1220 let main_err = error_rx.try_iter().reduce(|cur, new| match cur {
1224 MzKafkaError::Internal(_) => new,
1225 _ => cur,
1226 });
1227
1228 drop(consumer);
1232
1233 match main_err {
1234 Some(err) => Err(err.into()),
1235 None => Err(err.into()),
1236 }
1237 }
1238 }
1239 }
1240}
1241
1242impl<C: ConnectionAccess> AlterCompatible for KafkaConnection<C> {
1243 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1244 let KafkaConnection {
1245 brokers: _,
1246 default_tunnel: _,
1247 progress_topic,
1248 progress_topic_options,
1249 options: _,
1250 tls: _,
1251 sasl: _,
1252 } = self;
1253
1254 let compatibility_checks = [
1255 (progress_topic == &other.progress_topic, "progress_topic"),
1256 (
1257 progress_topic_options == &other.progress_topic_options,
1258 "progress_topic_options",
1259 ),
1260 ];
1261
1262 for (compatible, field) in compatibility_checks {
1263 if !compatible {
1264 tracing::warn!(
1265 "KafkaConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1266 self,
1267 other
1268 );
1269
1270 return Err(AlterError { id });
1271 }
1272 }
1273
1274 Ok(())
1275 }
1276}
1277
1278#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1280pub struct CsrConnection<C: ConnectionAccess = InlinedConnection> {
1281 pub url: Url,
1283 pub tls_root_cert: Option<StringOrSecret>,
1285 pub tls_identity: Option<TlsIdentity>,
1288 pub http_auth: Option<CsrConnectionHttpAuth>,
1290 pub tunnel: Tunnel<C>,
1292}
1293
1294impl<R: ConnectionResolver> IntoInlineConnection<CsrConnection, R>
1295 for CsrConnection<ReferencedConnection>
1296{
1297 fn into_inline_connection(self, r: R) -> CsrConnection {
1298 let CsrConnection {
1299 url,
1300 tls_root_cert,
1301 tls_identity,
1302 http_auth,
1303 tunnel,
1304 } = self;
1305 CsrConnection {
1306 url,
1307 tls_root_cert,
1308 tls_identity,
1309 http_auth,
1310 tunnel: tunnel.into_inline_connection(r),
1311 }
1312 }
1313}
1314
1315impl<C: ConnectionAccess> CsrConnection<C> {
1316 fn validate_by_default(&self) -> bool {
1317 true
1318 }
1319}
1320
1321impl CsrConnection {
1322 pub async fn connect(
1324 &self,
1325 storage_configuration: &StorageConfiguration,
1326 in_task: InTask,
1327 ) -> Result<mz_ccsr::Client, CsrConnectError> {
1328 let mut client_config = mz_ccsr::ClientConfig::new(self.url.clone());
1329 if let Some(root_cert) = &self.tls_root_cert {
1330 let root_cert = root_cert
1331 .get_string(
1332 in_task,
1333 &storage_configuration.connection_context.secrets_reader,
1334 )
1335 .await?;
1336 let root_cert = Certificate::from_pem(root_cert.as_bytes())?;
1337 client_config = client_config.add_root_certificate(root_cert);
1338 }
1339
1340 if let Some(tls_identity) = &self.tls_identity {
1341 let key = &storage_configuration
1342 .connection_context
1343 .secrets_reader
1344 .read_string_in_task_if(in_task, tls_identity.key)
1345 .await?;
1346 let cert = tls_identity
1347 .cert
1348 .get_string(
1349 in_task,
1350 &storage_configuration.connection_context.secrets_reader,
1351 )
1352 .await?;
1353 let ident = Identity::from_pem(key.as_bytes(), cert.as_bytes())?;
1354 client_config = client_config.identity(ident);
1355 }
1356
1357 if let Some(http_auth) = &self.http_auth {
1358 let username = http_auth
1359 .username
1360 .get_string(
1361 in_task,
1362 &storage_configuration.connection_context.secrets_reader,
1363 )
1364 .await?;
1365 let password = match http_auth.password {
1366 None => None,
1367 Some(password) => Some(
1368 storage_configuration
1369 .connection_context
1370 .secrets_reader
1371 .read_string_in_task_if(in_task, password)
1372 .await?,
1373 ),
1374 };
1375 client_config = client_config.auth(username, password);
1376 }
1377
1378 let host = self
1380 .url
1381 .host_str()
1382 .ok_or_else(|| anyhow!("url missing host"))?;
1383 match &self.tunnel {
1384 Tunnel::Direct => {
1385 let resolved = resolve_address(
1387 host,
1388 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1389 )
1390 .await?;
1391 client_config = client_config.resolve_to_addrs(
1392 host,
1393 &resolved
1394 .iter()
1395 .map(|addr| SocketAddr::new(*addr, 0))
1396 .collect::<Vec<_>>(),
1397 )
1398 }
1399 Tunnel::Ssh(ssh_tunnel) => {
1400 let ssh_tunnel = ssh_tunnel
1401 .connect(
1402 storage_configuration,
1403 host,
1404 self.url.port().unwrap_or(80),
1407 in_task,
1408 )
1409 .await
1410 .map_err(CsrConnectError::Ssh)?;
1411
1412 client_config = client_config
1418 .resolve_to_addrs(host, &[SocketAddr::new(ssh_tunnel.local_addr().ip(), 0)])
1425 .dynamic_url({
1436 let remote_url = self.url.clone();
1437 move || {
1438 let mut url = remote_url.clone();
1439 url.set_port(Some(ssh_tunnel.local_addr().port()))
1440 .expect("cannot fail");
1441 url
1442 }
1443 });
1444 }
1445 Tunnel::AwsPrivatelink(connection) => {
1446 assert_none!(connection.port);
1447
1448 let privatelink_host = mz_cloud_resources::vpc_endpoint_host(
1449 connection.connection_id,
1450 connection.availability_zone.as_deref(),
1451 );
1452 let addrs: Vec<_> = net::lookup_host((privatelink_host, 0))
1453 .await
1454 .context("resolving PrivateLink host")?
1455 .collect();
1456 client_config = client_config.resolve_to_addrs(host, &addrs)
1457 }
1458 }
1459
1460 Ok(client_config.build()?)
1461 }
1462
1463 async fn validate(
1464 &self,
1465 _id: CatalogItemId,
1466 storage_configuration: &StorageConfiguration,
1467 ) -> Result<(), anyhow::Error> {
1468 let client = self
1469 .connect(
1470 storage_configuration,
1471 InTask::No,
1473 )
1474 .await?;
1475 client.list_subjects().await?;
1476 Ok(())
1477 }
1478}
1479
1480impl<C: ConnectionAccess> AlterCompatible for CsrConnection<C> {
1481 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1482 let CsrConnection {
1483 tunnel,
1484 url: _,
1486 tls_root_cert: _,
1487 tls_identity: _,
1488 http_auth: _,
1489 } = self;
1490
1491 let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
1492
1493 for (compatible, field) in compatibility_checks {
1494 if !compatible {
1495 tracing::warn!(
1496 "CsrConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1497 self,
1498 other
1499 );
1500
1501 return Err(AlterError { id });
1502 }
1503 }
1504 Ok(())
1505 }
1506}
1507
1508#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1510pub struct TlsIdentity {
1511 pub cert: StringOrSecret,
1513 pub key: CatalogItemId,
1516}
1517
1518#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1520pub struct CsrConnectionHttpAuth {
1521 pub username: StringOrSecret,
1523 pub password: Option<CatalogItemId>,
1525}
1526
1527#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1529pub struct PostgresConnection<C: ConnectionAccess = InlinedConnection> {
1530 pub host: String,
1532 pub port: u16,
1534 pub database: String,
1536 pub user: StringOrSecret,
1538 pub password: Option<CatalogItemId>,
1540 pub tunnel: Tunnel<C>,
1542 pub tls_mode: SslMode,
1544 pub tls_root_cert: Option<StringOrSecret>,
1547 pub tls_identity: Option<TlsIdentity>,
1549}
1550
1551impl<R: ConnectionResolver> IntoInlineConnection<PostgresConnection, R>
1552 for PostgresConnection<ReferencedConnection>
1553{
1554 fn into_inline_connection(self, r: R) -> PostgresConnection {
1555 let PostgresConnection {
1556 host,
1557 port,
1558 database,
1559 user,
1560 password,
1561 tunnel,
1562 tls_mode,
1563 tls_root_cert,
1564 tls_identity,
1565 } = self;
1566
1567 PostgresConnection {
1568 host,
1569 port,
1570 database,
1571 user,
1572 password,
1573 tunnel: tunnel.into_inline_connection(r),
1574 tls_mode,
1575 tls_root_cert,
1576 tls_identity,
1577 }
1578 }
1579}
1580
1581impl<C: ConnectionAccess> PostgresConnection<C> {
1582 fn validate_by_default(&self) -> bool {
1583 true
1584 }
1585}
1586
1587impl PostgresConnection<InlinedConnection> {
1588 pub async fn config(
1589 &self,
1590 secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
1591 storage_configuration: &StorageConfiguration,
1592 in_task: InTask,
1593 ) -> Result<mz_postgres_util::Config, anyhow::Error> {
1594 let params = &storage_configuration.parameters;
1595
1596 let mut config = tokio_postgres::Config::new();
1597 config
1598 .host(&self.host)
1599 .port(self.port)
1600 .dbname(&self.database)
1601 .user(&self.user.get_string(in_task, secrets_reader).await?)
1602 .ssl_mode(self.tls_mode);
1603 if let Some(password) = self.password {
1604 let password = secrets_reader
1605 .read_string_in_task_if(in_task, password)
1606 .await?;
1607 config.password(password);
1608 }
1609 if let Some(tls_root_cert) = &self.tls_root_cert {
1610 let tls_root_cert = tls_root_cert.get_string(in_task, secrets_reader).await?;
1611 config.ssl_root_cert(tls_root_cert.as_bytes());
1612 }
1613 if let Some(tls_identity) = &self.tls_identity {
1614 let cert = tls_identity
1615 .cert
1616 .get_string(in_task, secrets_reader)
1617 .await?;
1618 let key = secrets_reader
1619 .read_string_in_task_if(in_task, tls_identity.key)
1620 .await?;
1621 config.ssl_cert(cert.as_bytes()).ssl_key(key.as_bytes());
1622 }
1623
1624 if let Some(connect_timeout) = params.pg_source_connect_timeout {
1625 config.connect_timeout(connect_timeout);
1626 }
1627 if let Some(keepalives_retries) = params.pg_source_tcp_keepalives_retries {
1628 config.keepalives_retries(keepalives_retries);
1629 }
1630 if let Some(keepalives_idle) = params.pg_source_tcp_keepalives_idle {
1631 config.keepalives_idle(keepalives_idle);
1632 }
1633 if let Some(keepalives_interval) = params.pg_source_tcp_keepalives_interval {
1634 config.keepalives_interval(keepalives_interval);
1635 }
1636 if let Some(tcp_user_timeout) = params.pg_source_tcp_user_timeout {
1637 config.tcp_user_timeout(tcp_user_timeout);
1638 }
1639
1640 let mut options = vec![];
1641 if let Some(wal_sender_timeout) = params.pg_source_wal_sender_timeout {
1642 options.push(format!(
1643 "--wal_sender_timeout={}",
1644 wal_sender_timeout.as_millis()
1645 ));
1646 };
1647 if params.pg_source_tcp_configure_server {
1648 if let Some(keepalives_retries) = params.pg_source_tcp_keepalives_retries {
1649 options.push(format!("--tcp_keepalives_count={}", keepalives_retries));
1650 }
1651 if let Some(keepalives_idle) = params.pg_source_tcp_keepalives_idle {
1652 options.push(format!(
1653 "--tcp_keepalives_idle={}",
1654 keepalives_idle.as_secs()
1655 ));
1656 }
1657 if let Some(keepalives_interval) = params.pg_source_tcp_keepalives_interval {
1658 options.push(format!(
1659 "--tcp_keepalives_interval={}",
1660 keepalives_interval.as_secs()
1661 ));
1662 }
1663 if let Some(tcp_user_timeout) = params.pg_source_tcp_user_timeout {
1664 options.push(format!(
1665 "--tcp_user_timeout={}",
1666 tcp_user_timeout.as_millis()
1667 ));
1668 }
1669 }
1670 config.options(options.join(" ").as_str());
1671
1672 let tunnel = match &self.tunnel {
1673 Tunnel::Direct => {
1674 let resolved = resolve_address(
1676 &self.host,
1677 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1678 )
1679 .await?;
1680 mz_postgres_util::TunnelConfig::Direct {
1681 resolved_ips: Some(resolved),
1682 }
1683 }
1684 Tunnel::Ssh(SshTunnel {
1685 connection_id,
1686 connection,
1687 }) => {
1688 let secret = secrets_reader
1689 .read_in_task_if(in_task, *connection_id)
1690 .await?;
1691 let key_pair = SshKeyPair::from_bytes(&secret)?;
1692 let resolved = resolve_address(
1694 &connection.host,
1695 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1696 )
1697 .await?;
1698 mz_postgres_util::TunnelConfig::Ssh {
1699 config: SshTunnelConfig {
1700 host: resolved
1701 .iter()
1702 .map(|a| a.to_string())
1703 .collect::<BTreeSet<_>>(),
1704 port: connection.port,
1705 user: connection.user.clone(),
1706 key_pair,
1707 },
1708 }
1709 }
1710 Tunnel::AwsPrivatelink(connection) => {
1711 assert_none!(connection.port);
1712 mz_postgres_util::TunnelConfig::AwsPrivatelink {
1713 connection_id: connection.connection_id,
1714 }
1715 }
1716 };
1717
1718 Ok(mz_postgres_util::Config::new(
1719 config,
1720 tunnel,
1721 params.ssh_timeout_config,
1722 in_task,
1723 )?)
1724 }
1725
1726 pub async fn validate(
1727 &self,
1728 _id: CatalogItemId,
1729 storage_configuration: &StorageConfiguration,
1730 ) -> Result<mz_postgres_util::Client, anyhow::Error> {
1731 let config = self
1732 .config(
1733 &storage_configuration.connection_context.secrets_reader,
1734 storage_configuration,
1735 InTask::No,
1737 )
1738 .await?;
1739 let client = config
1740 .connect(
1741 "connection validation",
1742 &storage_configuration.connection_context.ssh_tunnel_manager,
1743 )
1744 .await?;
1745
1746 let wal_level = mz_postgres_util::get_wal_level(&client).await?;
1747
1748 if wal_level < mz_postgres_util::replication::WalLevel::Logical {
1749 Err(PostgresConnectionValidationError::InsufficientWalLevel { wal_level })?;
1750 }
1751
1752 let max_wal_senders = mz_postgres_util::get_max_wal_senders(&client).await?;
1753
1754 if max_wal_senders < 1 {
1755 Err(PostgresConnectionValidationError::ReplicationDisabled)?;
1756 }
1757
1758 let available_replication_slots =
1759 mz_postgres_util::available_replication_slots(&client).await?;
1760
1761 if available_replication_slots < 2 {
1763 Err(
1764 PostgresConnectionValidationError::InsufficientReplicationSlotsAvailable {
1765 count: 2,
1766 },
1767 )?;
1768 }
1769
1770 Ok(client)
1771 }
1772}
1773
1774#[derive(Debug, Clone, thiserror::Error)]
1775pub enum PostgresConnectionValidationError {
1776 #[error("PostgreSQL server has insufficient number of replication slots available")]
1777 InsufficientReplicationSlotsAvailable { count: usize },
1778 #[error("server must have wal_level >= logical, but has {wal_level}")]
1779 InsufficientWalLevel {
1780 wal_level: mz_postgres_util::replication::WalLevel,
1781 },
1782 #[error("replication disabled on server")]
1783 ReplicationDisabled,
1784}
1785
1786impl PostgresConnectionValidationError {
1787 pub fn detail(&self) -> Option<String> {
1788 match self {
1789 Self::InsufficientReplicationSlotsAvailable { count } => Some(format!(
1790 "executing this statement requires {} replication slot{}",
1791 count,
1792 if *count == 1 { "" } else { "s" }
1793 )),
1794 _ => None,
1795 }
1796 }
1797
1798 pub fn hint(&self) -> Option<String> {
1799 match self {
1800 Self::InsufficientReplicationSlotsAvailable { .. } => Some(
1801 "you might be able to wait for other sources to finish snapshotting and try again"
1802 .into(),
1803 ),
1804 Self::ReplicationDisabled => Some("set max_wal_senders to a value > 0".into()),
1805 _ => None,
1806 }
1807 }
1808}
1809
1810impl<C: ConnectionAccess> AlterCompatible for PostgresConnection<C> {
1811 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1812 let PostgresConnection {
1813 tunnel,
1814 host: _,
1816 port: _,
1817 database: _,
1818 user: _,
1819 password: _,
1820 tls_mode: _,
1821 tls_root_cert: _,
1822 tls_identity: _,
1823 } = self;
1824
1825 let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
1826
1827 for (compatible, field) in compatibility_checks {
1828 if !compatible {
1829 tracing::warn!(
1830 "PostgresConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1831 self,
1832 other
1833 );
1834
1835 return Err(AlterError { id });
1836 }
1837 }
1838 Ok(())
1839 }
1840}
1841
1842#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1844pub enum Tunnel<C: ConnectionAccess = InlinedConnection> {
1845 Direct,
1847 Ssh(SshTunnel<C>),
1849 AwsPrivatelink(AwsPrivatelink),
1851}
1852
1853impl<R: ConnectionResolver> IntoInlineConnection<Tunnel, R> for Tunnel<ReferencedConnection> {
1854 fn into_inline_connection(self, r: R) -> Tunnel {
1855 match self {
1856 Tunnel::Direct => Tunnel::Direct,
1857 Tunnel::Ssh(ssh) => Tunnel::Ssh(ssh.into_inline_connection(r)),
1858 Tunnel::AwsPrivatelink(awspl) => Tunnel::AwsPrivatelink(awspl),
1859 }
1860 }
1861}
1862
1863impl<C: ConnectionAccess> AlterCompatible for Tunnel<C> {
1864 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1865 let compatible = match (self, other) {
1866 (Self::Ssh(s), Self::Ssh(o)) => s.alter_compatible(id, o).is_ok(),
1867 (s, o) => s == o,
1868 };
1869
1870 if !compatible {
1871 tracing::warn!(
1872 "Tunnel incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
1873 self,
1874 other
1875 );
1876
1877 return Err(AlterError { id });
1878 }
1879
1880 Ok(())
1881 }
1882}
1883
1884#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1888pub enum MySqlSslMode {
1889 Disabled,
1890 Required,
1891 VerifyCa,
1892 VerifyIdentity,
1893}
1894
1895#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1897pub struct MySqlConnection<C: ConnectionAccess = InlinedConnection> {
1898 pub host: String,
1900 pub port: u16,
1902 pub user: StringOrSecret,
1904 pub password: Option<CatalogItemId>,
1906 pub tunnel: Tunnel<C>,
1908 pub tls_mode: MySqlSslMode,
1910 pub tls_root_cert: Option<StringOrSecret>,
1913 pub tls_identity: Option<TlsIdentity>,
1915 pub aws_connection: Option<AwsConnectionReference<C>>,
1918}
1919
1920impl<R: ConnectionResolver> IntoInlineConnection<MySqlConnection, R>
1921 for MySqlConnection<ReferencedConnection>
1922{
1923 fn into_inline_connection(self, r: R) -> MySqlConnection {
1924 let MySqlConnection {
1925 host,
1926 port,
1927 user,
1928 password,
1929 tunnel,
1930 tls_mode,
1931 tls_root_cert,
1932 tls_identity,
1933 aws_connection,
1934 } = self;
1935
1936 MySqlConnection {
1937 host,
1938 port,
1939 user,
1940 password,
1941 tunnel: tunnel.into_inline_connection(&r),
1942 tls_mode,
1943 tls_root_cert,
1944 tls_identity,
1945 aws_connection: aws_connection.map(|aws| aws.into_inline_connection(&r)),
1946 }
1947 }
1948}
1949
1950impl<C: ConnectionAccess> MySqlConnection<C> {
1951 fn validate_by_default(&self) -> bool {
1952 true
1953 }
1954}
1955
1956impl MySqlConnection<InlinedConnection> {
1957 pub async fn config(
1958 &self,
1959 secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
1960 storage_configuration: &StorageConfiguration,
1961 in_task: InTask,
1962 ) -> Result<mz_mysql_util::Config, anyhow::Error> {
1963 let mut opts = mysql_async::OptsBuilder::default()
1965 .ip_or_hostname(&self.host)
1966 .tcp_port(self.port)
1967 .user(Some(&self.user.get_string(in_task, secrets_reader).await?));
1968
1969 if let Some(password) = self.password {
1970 let password = secrets_reader
1971 .read_string_in_task_if(in_task, password)
1972 .await?;
1973 opts = opts.pass(Some(password));
1974 }
1975
1976 let mut ssl_opts = match self.tls_mode {
1981 MySqlSslMode::Disabled => None,
1982 MySqlSslMode::Required => Some(
1983 mysql_async::SslOpts::default()
1984 .with_danger_accept_invalid_certs(true)
1985 .with_danger_skip_domain_validation(true),
1986 ),
1987 MySqlSslMode::VerifyCa => {
1988 Some(mysql_async::SslOpts::default().with_danger_skip_domain_validation(true))
1989 }
1990 MySqlSslMode::VerifyIdentity => Some(mysql_async::SslOpts::default()),
1991 };
1992
1993 if matches!(
1994 self.tls_mode,
1995 MySqlSslMode::VerifyCa | MySqlSslMode::VerifyIdentity
1996 ) {
1997 if let Some(tls_root_cert) = &self.tls_root_cert {
1998 let tls_root_cert = tls_root_cert.get_string(in_task, secrets_reader).await?;
1999 ssl_opts = ssl_opts.map(|opts| {
2000 opts.with_root_certs(vec![tls_root_cert.as_bytes().to_vec().into()])
2001 });
2002 }
2003 }
2004
2005 if let Some(identity) = &self.tls_identity {
2006 let key = secrets_reader
2007 .read_string_in_task_if(in_task, identity.key)
2008 .await?;
2009 let cert = identity.cert.get_string(in_task, secrets_reader).await?;
2010 let Pkcs12Archive { der, pass } =
2011 mz_tls_util::pkcs12der_from_pem(key.as_bytes(), cert.as_bytes())?;
2012
2013 ssl_opts = ssl_opts.map(|opts| {
2015 opts.with_client_identity(Some(
2016 mysql_async::ClientIdentity::new(der.into()).with_password(pass),
2017 ))
2018 });
2019 }
2020
2021 opts = opts.ssl_opts(ssl_opts);
2022
2023 let tunnel = match &self.tunnel {
2024 Tunnel::Direct => {
2025 let resolved = resolve_address(
2027 &self.host,
2028 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2029 )
2030 .await?;
2031 mz_mysql_util::TunnelConfig::Direct {
2032 resolved_ips: Some(resolved),
2033 }
2034 }
2035 Tunnel::Ssh(SshTunnel {
2036 connection_id,
2037 connection,
2038 }) => {
2039 let secret = secrets_reader
2040 .read_in_task_if(in_task, *connection_id)
2041 .await?;
2042 let key_pair = SshKeyPair::from_bytes(&secret)?;
2043 let resolved = resolve_address(
2045 &connection.host,
2046 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2047 )
2048 .await?;
2049 mz_mysql_util::TunnelConfig::Ssh {
2050 config: SshTunnelConfig {
2051 host: resolved
2052 .iter()
2053 .map(|a| a.to_string())
2054 .collect::<BTreeSet<_>>(),
2055 port: connection.port,
2056 user: connection.user.clone(),
2057 key_pair,
2058 },
2059 }
2060 }
2061 Tunnel::AwsPrivatelink(connection) => {
2062 assert_none!(connection.port);
2063 mz_mysql_util::TunnelConfig::AwsPrivatelink {
2064 connection_id: connection.connection_id,
2065 }
2066 }
2067 };
2068
2069 let aws_config = match self.aws_connection.as_ref() {
2070 None => None,
2071 Some(aws_ref) => Some(
2072 aws_ref
2073 .connection
2074 .load_sdk_config(
2075 &storage_configuration.connection_context,
2076 aws_ref.connection_id,
2077 in_task,
2078 )
2079 .await?,
2080 ),
2081 };
2082
2083 Ok(mz_mysql_util::Config::new(
2084 opts,
2085 tunnel,
2086 storage_configuration.parameters.ssh_timeout_config,
2087 in_task,
2088 storage_configuration
2089 .parameters
2090 .mysql_source_timeouts
2091 .clone(),
2092 aws_config,
2093 )?)
2094 }
2095
2096 pub async fn validate(
2097 &self,
2098 _id: CatalogItemId,
2099 storage_configuration: &StorageConfiguration,
2100 ) -> Result<MySqlConn, MySqlConnectionValidationError> {
2101 let config = self
2102 .config(
2103 &storage_configuration.connection_context.secrets_reader,
2104 storage_configuration,
2105 InTask::No,
2107 )
2108 .await?;
2109 let mut conn = config
2110 .connect(
2111 "connection validation",
2112 &storage_configuration.connection_context.ssh_tunnel_manager,
2113 )
2114 .await?;
2115
2116 let mut setting_errors = vec![];
2118 let gtid_res = mz_mysql_util::ensure_gtid_consistency(&mut conn).await;
2119 let binlog_res = mz_mysql_util::ensure_full_row_binlog_format(&mut conn).await;
2120 let order_res = mz_mysql_util::ensure_replication_commit_order(&mut conn).await;
2121 for res in [gtid_res, binlog_res, order_res] {
2122 match res {
2123 Err(MySqlError::InvalidSystemSetting {
2124 setting,
2125 expected,
2126 actual,
2127 }) => {
2128 setting_errors.push((setting, expected, actual));
2129 }
2130 Err(err) => Err(err)?,
2131 Ok(()) => {}
2132 }
2133 }
2134 if !setting_errors.is_empty() {
2135 Err(MySqlConnectionValidationError::ReplicationSettingsError(
2136 setting_errors,
2137 ))?;
2138 }
2139
2140 Ok(conn)
2141 }
2142}
2143
2144#[derive(Debug, thiserror::Error)]
2145pub enum MySqlConnectionValidationError {
2146 #[error("Invalid MySQL system replication settings")]
2147 ReplicationSettingsError(Vec<(String, String, String)>),
2148 #[error(transparent)]
2149 Client(#[from] MySqlError),
2150 #[error("{}", .0.display_with_causes())]
2151 Other(#[from] anyhow::Error),
2152}
2153
2154impl MySqlConnectionValidationError {
2155 pub fn detail(&self) -> Option<String> {
2156 match self {
2157 Self::ReplicationSettingsError(settings) => Some(format!(
2158 "Invalid MySQL system replication settings: {}",
2159 itertools::join(
2160 settings.iter().map(|(setting, expected, actual)| format!(
2161 "{}: expected {}, got {}",
2162 setting, expected, actual
2163 )),
2164 "; "
2165 )
2166 )),
2167 _ => None,
2168 }
2169 }
2170
2171 pub fn hint(&self) -> Option<String> {
2172 match self {
2173 Self::ReplicationSettingsError(_) => {
2174 Some("Set the necessary MySQL database system settings.".into())
2175 }
2176 _ => None,
2177 }
2178 }
2179}
2180
2181impl<C: ConnectionAccess> AlterCompatible for MySqlConnection<C> {
2182 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2183 let MySqlConnection {
2184 tunnel,
2185 host: _,
2187 port: _,
2188 user: _,
2189 password: _,
2190 tls_mode: _,
2191 tls_root_cert: _,
2192 tls_identity: _,
2193 aws_connection: _,
2194 } = self;
2195
2196 let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
2197
2198 for (compatible, field) in compatibility_checks {
2199 if !compatible {
2200 tracing::warn!(
2201 "MySqlConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2202 self,
2203 other
2204 );
2205
2206 return Err(AlterError { id });
2207 }
2208 }
2209 Ok(())
2210 }
2211}
2212
2213#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2220pub struct SqlServerConnectionDetails<C: ConnectionAccess = InlinedConnection> {
2221 pub host: String,
2223 pub port: u16,
2225 pub database: String,
2227 pub user: StringOrSecret,
2229 pub password: CatalogItemId,
2231 pub tunnel: Tunnel<C>,
2233 pub encryption: mz_sql_server_util::config::EncryptionLevel,
2235 pub certificate_validation_policy: mz_sql_server_util::config::CertificateValidationPolicy,
2237 pub tls_root_cert: Option<StringOrSecret>,
2239}
2240
2241impl<C: ConnectionAccess> SqlServerConnectionDetails<C> {
2242 fn validate_by_default(&self) -> bool {
2243 true
2244 }
2245}
2246
2247impl SqlServerConnectionDetails<InlinedConnection> {
2248 pub async fn validate(
2250 &self,
2251 _id: CatalogItemId,
2252 storage_configuration: &StorageConfiguration,
2253 ) -> Result<mz_sql_server_util::Client, anyhow::Error> {
2254 let config = self
2255 .resolve_config(
2256 &storage_configuration.connection_context.secrets_reader,
2257 storage_configuration,
2258 InTask::No,
2259 )
2260 .await?;
2261 tracing::debug!(?config, "Validating SQL Server connection");
2262
2263 let mut client = mz_sql_server_util::Client::connect(config).await?;
2264
2265 let mut replication_errors = vec![];
2270 for error in [
2271 mz_sql_server_util::inspect::ensure_database_cdc_enabled(&mut client).await,
2272 mz_sql_server_util::inspect::ensure_snapshot_isolation_enabled(&mut client).await,
2273 mz_sql_server_util::inspect::ensure_sql_server_agent_running(&mut client).await,
2274 ] {
2275 match error {
2276 Err(mz_sql_server_util::SqlServerError::InvalidSystemSetting {
2277 name,
2278 expected,
2279 actual,
2280 }) => replication_errors.push((name, expected, actual)),
2281 Err(other) => Err(other)?,
2282 Ok(()) => (),
2283 }
2284 }
2285 if !replication_errors.is_empty() {
2286 Err(SqlServerConnectionValidationError::ReplicationSettingsError(replication_errors))?;
2287 }
2288
2289 Ok(client)
2290 }
2291
2292 pub async fn resolve_config(
2302 &self,
2303 secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
2304 storage_configuration: &StorageConfiguration,
2305 in_task: InTask,
2306 ) -> Result<mz_sql_server_util::Config, anyhow::Error> {
2307 let dyncfg = storage_configuration.config_set();
2308 let mut inner_config = tiberius::Config::new();
2309
2310 inner_config.host(&self.host);
2312 inner_config.port(self.port);
2313 inner_config.database(self.database.clone());
2314 inner_config.encryption(self.encryption.into());
2315 match self.certificate_validation_policy {
2316 mz_sql_server_util::config::CertificateValidationPolicy::TrustAll => {
2317 inner_config.trust_cert()
2318 }
2319 mz_sql_server_util::config::CertificateValidationPolicy::VerifyCA => {
2320 inner_config.trust_cert_ca_pem(
2321 self.tls_root_cert
2322 .as_ref()
2323 .unwrap()
2324 .get_string(in_task, secrets_reader)
2325 .await
2326 .context("ca certificate")?,
2327 );
2328 }
2329 mz_sql_server_util::config::CertificateValidationPolicy::VerifySystem => (), }
2331
2332 inner_config.application_name("materialize");
2333
2334 let user = self
2336 .user
2337 .get_string(in_task, secrets_reader)
2338 .await
2339 .context("username")?;
2340 let password = secrets_reader
2341 .read_string_in_task_if(in_task, self.password)
2342 .await
2343 .context("password")?;
2344 inner_config.authentication(tiberius::AuthMethod::sql_server(user, password));
2347
2348 let enfoce_external_addresses = ENFORCE_EXTERNAL_ADDRESSES.get(dyncfg);
2351
2352 let tunnel = match &self.tunnel {
2353 Tunnel::Direct => mz_sql_server_util::config::TunnelConfig::Direct,
2354 Tunnel::Ssh(SshTunnel {
2355 connection_id,
2356 connection: ssh_connection,
2357 }) => {
2358 let secret = secrets_reader
2359 .read_in_task_if(in_task, *connection_id)
2360 .await
2361 .context("ssh secret")?;
2362 let key_pair = SshKeyPair::from_bytes(&secret).context("ssh key pair")?;
2363 let addresses = resolve_address(&ssh_connection.host, enfoce_external_addresses)
2366 .await
2367 .context("ssh tunnel")?;
2368
2369 let config = SshTunnelConfig {
2370 host: addresses.into_iter().map(|a| a.to_string()).collect(),
2371 port: ssh_connection.port,
2372 user: ssh_connection.user.clone(),
2373 key_pair,
2374 };
2375 mz_sql_server_util::config::TunnelConfig::Ssh {
2376 config,
2377 manager: storage_configuration
2378 .connection_context
2379 .ssh_tunnel_manager
2380 .clone(),
2381 timeout: storage_configuration.parameters.ssh_timeout_config.clone(),
2382 host: self.host.clone(),
2383 port: self.port,
2384 }
2385 }
2386 Tunnel::AwsPrivatelink(private_link_connection) => {
2387 assert_none!(private_link_connection.port);
2388 mz_sql_server_util::config::TunnelConfig::AwsPrivatelink {
2389 connection_id: private_link_connection.connection_id,
2390 port: self.port,
2391 }
2392 }
2393 };
2394
2395 Ok(mz_sql_server_util::Config::new(
2396 inner_config,
2397 tunnel,
2398 in_task,
2399 ))
2400 }
2401}
2402
2403#[derive(Debug, Clone, thiserror::Error)]
2404pub enum SqlServerConnectionValidationError {
2405 #[error("Invalid SQL Server system replication settings")]
2406 ReplicationSettingsError(Vec<(String, String, String)>),
2407}
2408
2409impl SqlServerConnectionValidationError {
2410 pub fn detail(&self) -> Option<String> {
2411 match self {
2412 Self::ReplicationSettingsError(settings) => Some(format!(
2413 "Invalid SQL Server system replication settings: {}",
2414 itertools::join(
2415 settings.iter().map(|(setting, expected, actual)| format!(
2416 "{}: expected {}, got {}",
2417 setting, expected, actual
2418 )),
2419 "; "
2420 )
2421 )),
2422 }
2423 }
2424
2425 pub fn hint(&self) -> Option<String> {
2426 match self {
2427 _ => None,
2428 }
2429 }
2430}
2431
2432impl<R: ConnectionResolver> IntoInlineConnection<SqlServerConnectionDetails, R>
2433 for SqlServerConnectionDetails<ReferencedConnection>
2434{
2435 fn into_inline_connection(self, r: R) -> SqlServerConnectionDetails {
2436 let SqlServerConnectionDetails {
2437 host,
2438 port,
2439 database,
2440 user,
2441 password,
2442 tunnel,
2443 encryption,
2444 certificate_validation_policy,
2445 tls_root_cert,
2446 } = self;
2447
2448 SqlServerConnectionDetails {
2449 host,
2450 port,
2451 database,
2452 user,
2453 password,
2454 tunnel: tunnel.into_inline_connection(&r),
2455 encryption,
2456 certificate_validation_policy,
2457 tls_root_cert,
2458 }
2459 }
2460}
2461
2462impl<C: ConnectionAccess> AlterCompatible for SqlServerConnectionDetails<C> {
2463 fn alter_compatible(
2464 &self,
2465 id: mz_repr::GlobalId,
2466 other: &Self,
2467 ) -> Result<(), crate::controller::AlterError> {
2468 let SqlServerConnectionDetails {
2469 tunnel,
2470 host: _,
2472 port: _,
2473 database: _,
2474 user: _,
2475 password: _,
2476 encryption: _,
2477 certificate_validation_policy: _,
2478 tls_root_cert: _,
2479 } = self;
2480
2481 let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
2482
2483 for (compatible, field) in compatibility_checks {
2484 if !compatible {
2485 tracing::warn!(
2486 "SqlServerConnectionDetails incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2487 self,
2488 other
2489 );
2490
2491 return Err(AlterError { id });
2492 }
2493 }
2494 Ok(())
2495 }
2496}
2497
2498#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2500pub struct SshConnection {
2501 pub host: String,
2502 pub port: u16,
2503 pub user: String,
2504}
2505
2506use self::inline::{
2507 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
2508 ReferencedConnection,
2509};
2510
2511impl AlterCompatible for SshConnection {
2512 fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
2513 Ok(())
2515 }
2516}
2517
2518#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2520pub struct AwsPrivatelink {
2521 pub connection_id: CatalogItemId,
2523 pub availability_zone: Option<String>,
2525 pub port: Option<u16>,
2528}
2529
2530impl AlterCompatible for AwsPrivatelink {
2531 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2532 let AwsPrivatelink {
2533 connection_id,
2534 availability_zone: _,
2535 port: _,
2536 } = self;
2537
2538 let compatibility_checks = [(connection_id == &other.connection_id, "connection_id")];
2539
2540 for (compatible, field) in compatibility_checks {
2541 if !compatible {
2542 tracing::warn!(
2543 "AwsPrivatelink incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2544 self,
2545 other
2546 );
2547
2548 return Err(AlterError { id });
2549 }
2550 }
2551
2552 Ok(())
2553 }
2554}
2555
2556#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2558pub struct SshTunnel<C: ConnectionAccess = InlinedConnection> {
2559 pub connection_id: CatalogItemId,
2561 pub connection: C::Ssh,
2563}
2564
2565impl<R: ConnectionResolver> IntoInlineConnection<SshTunnel, R> for SshTunnel<ReferencedConnection> {
2566 fn into_inline_connection(self, r: R) -> SshTunnel {
2567 let SshTunnel {
2568 connection,
2569 connection_id,
2570 } = self;
2571
2572 SshTunnel {
2573 connection: r.resolve_connection(connection).unwrap_ssh(),
2574 connection_id,
2575 }
2576 }
2577}
2578
2579impl SshTunnel<InlinedConnection> {
2580 async fn connect(
2583 &self,
2584 storage_configuration: &StorageConfiguration,
2585 remote_host: &str,
2586 remote_port: u16,
2587 in_task: InTask,
2588 ) -> Result<ManagedSshTunnelHandle, anyhow::Error> {
2589 let resolved = resolve_address(
2591 &self.connection.host,
2592 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2593 )
2594 .await?;
2595 storage_configuration
2596 .connection_context
2597 .ssh_tunnel_manager
2598 .connect(
2599 SshTunnelConfig {
2600 host: resolved
2601 .iter()
2602 .map(|a| a.to_string())
2603 .collect::<BTreeSet<_>>(),
2604 port: self.connection.port,
2605 user: self.connection.user.clone(),
2606 key_pair: SshKeyPair::from_bytes(
2607 &storage_configuration
2608 .connection_context
2609 .secrets_reader
2610 .read_in_task_if(in_task, self.connection_id)
2611 .await?,
2612 )?,
2613 },
2614 remote_host,
2615 remote_port,
2616 storage_configuration.parameters.ssh_timeout_config,
2617 in_task,
2618 )
2619 .await
2620 }
2621}
2622
2623impl<C: ConnectionAccess> AlterCompatible for SshTunnel<C> {
2624 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2625 let SshTunnel {
2626 connection_id,
2627 connection,
2628 } = self;
2629
2630 let compatibility_checks = [
2631 (connection_id == &other.connection_id, "connection_id"),
2632 (
2633 connection.alter_compatible(id, &other.connection).is_ok(),
2634 "connection",
2635 ),
2636 ];
2637
2638 for (compatible, field) in compatibility_checks {
2639 if !compatible {
2640 tracing::warn!(
2641 "SshTunnel incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2642 self,
2643 other
2644 );
2645
2646 return Err(AlterError { id });
2647 }
2648 }
2649
2650 Ok(())
2651 }
2652}
2653
2654impl SshConnection {
2655 #[allow(clippy::unused_async)]
2656 async fn validate(
2657 &self,
2658 id: CatalogItemId,
2659 storage_configuration: &StorageConfiguration,
2660 ) -> Result<(), anyhow::Error> {
2661 let secret = storage_configuration
2662 .connection_context
2663 .secrets_reader
2664 .read_in_task_if(
2665 InTask::No,
2667 id,
2668 )
2669 .await?;
2670 let key_pair = SshKeyPair::from_bytes(&secret)?;
2671
2672 let resolved = resolve_address(
2674 &self.host,
2675 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2676 )
2677 .await?;
2678
2679 let config = SshTunnelConfig {
2680 host: resolved
2681 .iter()
2682 .map(|a| a.to_string())
2683 .collect::<BTreeSet<_>>(),
2684 port: self.port,
2685 user: self.user.clone(),
2686 key_pair,
2687 };
2688 config
2691 .validate(storage_configuration.parameters.ssh_timeout_config)
2692 .await
2693 }
2694
2695 fn validate_by_default(&self) -> bool {
2696 false
2697 }
2698}
2699
2700impl AwsPrivatelinkConnection {
2701 #[allow(clippy::unused_async)]
2702 async fn validate(
2703 &self,
2704 id: CatalogItemId,
2705 storage_configuration: &StorageConfiguration,
2706 ) -> Result<(), anyhow::Error> {
2707 let Some(ref cloud_resource_reader) = storage_configuration
2708 .connection_context
2709 .cloud_resource_reader
2710 else {
2711 return Err(anyhow!("AWS PrivateLink connections are unsupported"));
2712 };
2713
2714 let status = cloud_resource_reader.read(id).await?;
2716
2717 let availability = status
2718 .conditions
2719 .as_ref()
2720 .and_then(|conditions| conditions.iter().find(|c| c.type_ == "Available"));
2721
2722 match availability {
2723 Some(condition) if condition.status == "True" => Ok(()),
2724 Some(condition) => Err(anyhow!("{}", condition.message)),
2725 None => Err(anyhow!("Endpoint availability is unknown")),
2726 }
2727 }
2728
2729 fn validate_by_default(&self) -> bool {
2730 false
2731 }
2732}