1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::net::SocketAddr;
15use std::sync::Arc;
16use std::time::SystemTime;
17
18use anyhow::{Context, anyhow};
19use async_trait::async_trait;
20use aws_credential_types::provider::{ProvideCredentials, SharedCredentialsProvider};
21use aws_sigv4::http_request::{SignableBody, SignableRequest, SigningSettings, sign};
22use aws_sigv4::sign::v4;
23use aws_smithy_runtime_api::client::identity::Identity as AwsIdentity;
25use base64::Engine;
26use http::{HeaderName, HeaderValue};
27use iceberg::Catalog;
28use iceberg::CatalogBuilder;
29use iceberg::io::{
30 GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, GCS_DISABLE_VM_METADATA, GCS_USER_PROJECT,
31 S3_ACCESS_KEY_ID, S3_DISABLE_EC2_METADATA, S3_REGION, S3_SECRET_ACCESS_KEY,
32};
33use iceberg_catalog_rest::{
34 REST_CATALOG_PROP_URI, REST_CATALOG_PROP_WAREHOUSE, RequestAuthenticator, RestCatalogBuilder,
35};
36use iceberg_storage_opendal::{
37 AwsCredential, AwsCredentialLoad, CustomAwsCredentialLoader, OpenDalStorageFactory,
38};
39use itertools::Itertools;
40use mz_ccsr::tls::{Certificate, Identity};
41use mz_cloud_resources::{AwsExternalIdPrefix, CloudResourceReader, vpc_endpoint_host};
42use mz_dyncfg::ConfigSet;
43use mz_kafka_util::client::{
44 BrokerAddr, BrokerRewrite, HostMappingRules, MzClientContext, MzKafkaError, TunnelConfig,
45 TunnelingClientContext,
46};
47use mz_mysql_util::{MySqlConn, MySqlError};
48use mz_ore::assert_none;
49use mz_ore::error::ErrorExt;
50use mz_ore::future::{InTask, OreFutureExt};
51use mz_ore::netio::resolve_address;
52use mz_ore::num::NonNeg;
53use mz_repr::{CatalogItemId, GlobalId};
54use mz_secrets::SecretsReader;
55use mz_sql_parser::ast::ConnectionRulePattern;
56use mz_ssh_util::keys::SshKeyPair;
57use mz_ssh_util::tunnel::SshTunnelConfig;
58use mz_ssh_util::tunnel_manager::{ManagedSshTunnelHandle, SshTunnelManager};
59use mz_tracing::CloneableEnvFilter;
60use rdkafka::ClientContext;
61use rdkafka::config::FromClientConfigAndContext;
62use rdkafka::consumer::{BaseConsumer, Consumer};
63use regex::Regex;
64use reqwest::Request;
65use serde::{Deserialize, Deserializer, Serialize};
66use tokio::net;
67use tokio::runtime::Handle;
68use tokio_postgres::config::SslMode;
69use tracing::{debug, info, warn};
70use url::Url;
71
72use crate::AlterCompatible;
73use crate::configuration::StorageConfiguration;
74use crate::connections::aws::{
75 AwsAuth, AwsConnection, AwsConnectionReference, AwsConnectionValidationError,
76};
77use crate::connections::gcp::{GcpConnectionReference, GcpTokenProvider};
78use crate::connections::string_or_secret::StringOrSecret;
79use crate::controller::AlterError;
80use crate::dyncfgs::{
81 ENFORCE_EXTERNAL_ADDRESSES, KAFKA_CLIENT_ID_ENRICHMENT_RULES,
82 KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM, KAFKA_RECONNECT_BACKOFF,
83 KAFKA_RECONNECT_BACKOFF_MAX, KAFKA_RETRY_BACKOFF, KAFKA_RETRY_BACKOFF_MAX,
84};
85use crate::errors::{ContextCreationError, CsrConnectError};
86
87pub mod aws;
88pub mod gcp;
89pub mod inline;
90pub mod string_or_secret;
91
92const REST_CATALOG_PROP_SCOPE: &str = "scope";
93const REST_CATALOG_PROP_CREDENTIAL: &str = "credential";
94
95struct AwsSdkCredentialLoader {
103 provider: SharedCredentialsProvider,
106}
107
108impl AwsSdkCredentialLoader {
109 fn new(provider: SharedCredentialsProvider) -> Self {
110 Self { provider }
111 }
112}
113
114#[async_trait]
115impl AwsCredentialLoad for AwsSdkCredentialLoader {
116 async fn load_credential(
117 &self,
118 _client: reqwest::Client,
119 ) -> anyhow::Result<Option<AwsCredential>> {
120 let creds = self
121 .provider
122 .provide_credentials()
123 .await
124 .map_err(|e| {
125 warn!(
126 error = %e.display_with_causes(),
127 "failed to load AWS credentials for Iceberg FileIO from SDK provider"
128 );
129 e
130 })
131 .context(
132 "failed to load AWS credentials from SDK provider for Iceberg FileIO \
133 (credential source may be temporarily unavailable)",
134 )?;
135
136 Ok(Some(AwsCredential {
137 access_key_id: creds.access_key_id().to_string(),
138 secret_access_key: creds.secret_access_key().to_string(),
139 session_token: creds.session_token().map(|s| s.to_string()),
140 expires_in: creds.expiry().map(|t| t.into()),
141 }))
142 }
143}
144
145struct Sigv4Authenticator {
151 provider: SharedCredentialsProvider,
152 region: String,
153 signing_name: String,
155}
156
157impl std::fmt::Debug for Sigv4Authenticator {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 f.debug_struct("Sigv4Authenticator")
160 .field("region", &self.region)
161 .field("signing_name", &self.signing_name)
162 .finish_non_exhaustive()
163 }
164}
165
166fn sigv4_err(e: impl Into<anyhow::Error>) -> iceberg::Error {
167 iceberg::Error::new(iceberg::ErrorKind::DataInvalid, "AWS SigV4").with_source(e)
168}
169
170#[async_trait]
171impl RequestAuthenticator for Sigv4Authenticator {
172 async fn authenticate_request(&self, req: &mut Request) -> iceberg::Result<()> {
173 let creds = self
174 .provider
175 .provide_credentials()
176 .await
177 .map_err(sigv4_err)?;
178 let identity: AwsIdentity = creds.into();
179 let params = v4::SigningParams::builder()
180 .identity(&identity)
181 .region(&self.region)
182 .name(&self.signing_name)
183 .time(SystemTime::now())
184 .settings(SigningSettings::default())
185 .build()
186 .map_err(sigv4_err)?
187 .into();
188 let body: &[u8] = req
189 .body()
190 .map(|b| match b.as_bytes() {
191 Some(b) => Ok(b),
192 None => Err(iceberg::Error::new(
193 iceberg::ErrorKind::FeatureUnsupported,
194 "SigV4 Authenticator cannot sign a streaming request body.",
195 )),
196 })
197 .transpose()?
198 .unwrap_or_default();
199 let headers = req
200 .headers()
201 .iter()
202 .map(|(k, v)| {
203 Ok((
204 k.as_str(),
205 v.to_str().map_err(|_| {
206 iceberg::Error::new(
207 iceberg::ErrorKind::DataInvalid,
208 format!("header '{}' value is not all visible ASCII", k),
209 )
210 })?,
211 ))
212 })
213 .collect::<iceberg::Result<Vec<(&str, &str)>>>()?;
214 let signable = SignableRequest::new(
215 req.method().as_str(),
216 req.url().as_str(),
217 headers.into_iter(),
218 SignableBody::Bytes(body),
219 )
220 .map_err(sigv4_err)?;
221 let (instructions, _sig) = sign(signable, ¶ms).map_err(sigv4_err)?.into_parts();
222 let (new_headers, new_query) = instructions.into_parts();
223 for header in new_headers {
224 let mut value = HeaderValue::from_str(header.value()).map_err(sigv4_err)?;
225 value.set_sensitive(header.sensitive());
226 req.headers_mut()
227 .insert(HeaderName::from_static(header.name()), value);
228 }
229 if !new_query.is_empty() {
230 let url = req.url_mut();
231 let mut pairs = url.query_pairs_mut();
232 for (name, value) in new_query {
233 pairs.append_pair(name, &value);
234 }
235 }
236 Ok(())
237 }
238
239 async fn invalidate_cache(&self) -> iceberg::Result<()> {
241 Ok(())
242 }
243 async fn regenerate_cache(&self) -> iceberg::Result<()> {
244 Ok(())
245 }
246}
247
248#[async_trait::async_trait]
250trait SecretsReaderExt {
251 async fn read_in_task_if(
253 &self,
254 in_task: InTask,
255 id: CatalogItemId,
256 ) -> Result<Vec<u8>, anyhow::Error>;
257
258 async fn read_string_in_task_if(
260 &self,
261 in_task: InTask,
262 id: CatalogItemId,
263 ) -> Result<String, anyhow::Error>;
264}
265
266#[async_trait::async_trait]
267impl SecretsReaderExt for Arc<dyn SecretsReader> {
268 async fn read_in_task_if(
269 &self,
270 in_task: InTask,
271 id: CatalogItemId,
272 ) -> Result<Vec<u8>, anyhow::Error> {
273 let sr = Arc::clone(self);
274 async move { sr.read(id).await }
275 .run_in_task_if(in_task, || "secrets_reader_read".to_string())
276 .await
277 }
278 async fn read_string_in_task_if(
279 &self,
280 in_task: InTask,
281 id: CatalogItemId,
282 ) -> Result<String, anyhow::Error> {
283 let sr = Arc::clone(self);
284 async move { sr.read_string(id).await }
285 .run_in_task_if(in_task, || "secrets_reader_read".to_string())
286 .await
287 }
288}
289
290#[derive(Debug, Clone)]
295pub struct ConnectionContext {
296 pub environment_id: String,
303 pub librdkafka_log_level: tracing::Level,
305 pub aws_external_id_prefix: Option<AwsExternalIdPrefix>,
307 pub aws_connection_role_arn: Option<String>,
310 pub secrets_reader: Arc<dyn SecretsReader>,
312 pub cloud_resource_reader: Option<Arc<dyn CloudResourceReader>>,
314 pub ssh_tunnel_manager: SshTunnelManager,
316}
317
318impl ConnectionContext {
319 pub fn from_cli_args(
327 environment_id: String,
328 startup_log_level: &CloneableEnvFilter,
329 aws_external_id_prefix: Option<AwsExternalIdPrefix>,
330 aws_connection_role_arn: Option<String>,
331 secrets_reader: Arc<dyn SecretsReader>,
332 cloud_resource_reader: Option<Arc<dyn CloudResourceReader>>,
333 ) -> ConnectionContext {
334 ConnectionContext {
335 environment_id,
336 librdkafka_log_level: mz_ore::tracing::crate_level(
337 &startup_log_level.clone().into(),
338 "librdkafka",
339 ),
340 aws_external_id_prefix,
341 aws_connection_role_arn,
342 secrets_reader,
343 cloud_resource_reader,
344 ssh_tunnel_manager: SshTunnelManager::default(),
345 }
346 }
347
348 pub fn for_tests(secrets_reader: Arc<dyn SecretsReader>) -> ConnectionContext {
350 ConnectionContext {
351 environment_id: "test-environment-id".into(),
352 librdkafka_log_level: tracing::Level::INFO,
353 aws_external_id_prefix: Some(
354 AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable(
355 "test-aws-external-id-prefix",
356 )
357 .expect("infallible"),
358 ),
359 aws_connection_role_arn: Some(
360 "arn:aws:iam::123456789000:role/MaterializeConnection".into(),
361 ),
362 secrets_reader,
363 cloud_resource_reader: None,
364 ssh_tunnel_manager: SshTunnelManager::default(),
365 }
366 }
367}
368
369#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
370pub enum Connection<C: ConnectionAccess = InlinedConnection> {
371 Kafka(KafkaConnection<C>),
372 Csr(CsrConnection<C>),
373 GlueSchemaRegistry(GlueSchemaRegistryConnection<C>),
374 Postgres(PostgresConnection<C>),
375 Ssh(SshConnection),
376 Aws(AwsConnection),
377 AwsPrivatelink(AwsPrivatelinkConnection),
378 Gcp(gcp::GcpConnection),
379 MySql(MySqlConnection<C>),
380 SqlServer(SqlServerConnectionDetails<C>),
381 IcebergCatalog(IcebergCatalogConnection<C>),
382}
383
384impl<R: ConnectionResolver> IntoInlineConnection<Connection, R>
385 for Connection<ReferencedConnection>
386{
387 fn into_inline_connection(self, r: R) -> Connection {
388 match self {
389 Connection::Kafka(kafka) => Connection::Kafka(kafka.into_inline_connection(r)),
390 Connection::Csr(csr) => Connection::Csr(csr.into_inline_connection(r)),
391 Connection::GlueSchemaRegistry(glue) => {
392 Connection::GlueSchemaRegistry(glue.into_inline_connection(r))
393 }
394 Connection::Postgres(pg) => Connection::Postgres(pg.into_inline_connection(r)),
395 Connection::Ssh(ssh) => Connection::Ssh(ssh),
396 Connection::Aws(aws) => Connection::Aws(aws),
397 Connection::AwsPrivatelink(awspl) => Connection::AwsPrivatelink(awspl),
398 Connection::Gcp(gcp) => Connection::Gcp(gcp),
399 Connection::MySql(mysql) => Connection::MySql(mysql.into_inline_connection(r)),
400 Connection::SqlServer(sql_server) => {
401 Connection::SqlServer(sql_server.into_inline_connection(r))
402 }
403 Connection::IcebergCatalog(iceberg) => {
404 Connection::IcebergCatalog(iceberg.into_inline_connection(r))
405 }
406 }
407 }
408}
409
410impl<C: ConnectionAccess> Connection<C> {
411 pub fn validate_by_default(&self) -> bool {
413 match self {
414 Connection::Kafka(conn) => conn.validate_by_default(),
415 Connection::Csr(conn) => conn.validate_by_default(),
416 Connection::GlueSchemaRegistry(conn) => conn.validate_by_default(),
417 Connection::Postgres(conn) => conn.validate_by_default(),
418 Connection::Ssh(conn) => conn.validate_by_default(),
419 Connection::Aws(conn) => conn.validate_by_default(),
420 Connection::AwsPrivatelink(conn) => conn.validate_by_default(),
421 Connection::Gcp(conn) => conn.validate_by_default(),
422 Connection::MySql(conn) => conn.validate_by_default(),
423 Connection::SqlServer(conn) => conn.validate_by_default(),
424 Connection::IcebergCatalog(conn) => conn.validate_by_default(),
425 }
426 }
427}
428
429impl Connection<InlinedConnection> {
430 pub async fn validate(
432 &self,
433 id: CatalogItemId,
434 storage_configuration: &StorageConfiguration,
435 ) -> Result<(), ConnectionValidationError> {
436 match self {
437 Connection::Kafka(conn) => conn.validate(id, storage_configuration).await?,
438 Connection::Csr(conn) => conn.validate(id, storage_configuration).await?,
439 Connection::GlueSchemaRegistry(conn) => {
440 conn.validate(id, storage_configuration).await?
441 }
442 Connection::Postgres(conn) => {
443 conn.validate(id, storage_configuration).await?;
444 }
445 Connection::Ssh(conn) => conn.validate(id, storage_configuration).await?,
446 Connection::Aws(conn) => conn.validate(id, storage_configuration).await?,
447 Connection::AwsPrivatelink(conn) => conn.validate(id, storage_configuration).await?,
448 Connection::Gcp(conn) => conn.validate(id, storage_configuration).await?,
449 Connection::MySql(conn) => {
450 conn.validate(id, storage_configuration).await?;
451 }
452 Connection::SqlServer(conn) => {
453 conn.validate(id, storage_configuration).await?;
454 }
455 Connection::IcebergCatalog(conn) => conn.validate(id, storage_configuration).await?,
456 }
457 Ok(())
458 }
459
460 pub fn unwrap_kafka(self) -> <InlinedConnection as ConnectionAccess>::Kafka {
461 match self {
462 Self::Kafka(conn) => conn,
463 o => unreachable!("{o:?} is not a Kafka connection"),
464 }
465 }
466
467 pub fn unwrap_pg(self) -> <InlinedConnection as ConnectionAccess>::Pg {
468 match self {
469 Self::Postgres(conn) => conn,
470 o => unreachable!("{o:?} is not a Postgres connection"),
471 }
472 }
473
474 pub fn unwrap_mysql(self) -> <InlinedConnection as ConnectionAccess>::MySql {
475 match self {
476 Self::MySql(conn) => conn,
477 o => unreachable!("{o:?} is not a MySQL connection"),
478 }
479 }
480
481 pub fn unwrap_sql_server(self) -> <InlinedConnection as ConnectionAccess>::SqlServer {
482 match self {
483 Self::SqlServer(conn) => conn,
484 o => unreachable!("{o:?} is not a SQL Server connection"),
485 }
486 }
487
488 pub fn unwrap_aws(self) -> <InlinedConnection as ConnectionAccess>::Aws {
489 match self {
490 Self::Aws(conn) => conn,
491 o => unreachable!("{o:?} is not an AWS connection"),
492 }
493 }
494
495 pub fn unwrap_gcp(self) -> <InlinedConnection as ConnectionAccess>::Gcp {
496 match self {
497 Self::Gcp(conn) => conn,
498 o => unreachable!("{o:?} is not a GCP connection"),
499 }
500 }
501
502 pub fn unwrap_ssh(self) -> <InlinedConnection as ConnectionAccess>::Ssh {
503 match self {
504 Self::Ssh(conn) => conn,
505 o => unreachable!("{o:?} is not an SSH connection"),
506 }
507 }
508
509 pub fn unwrap_csr(self) -> <InlinedConnection as ConnectionAccess>::Csr {
510 match self {
511 Self::Csr(conn) => conn,
512 o => unreachable!("{o:?} is not a Kafka connection"),
513 }
514 }
515
516 pub fn unwrap_glue_schema_registry(
517 self,
518 ) -> <InlinedConnection as ConnectionAccess>::GlueSchemaRegistry {
519 match self {
520 Self::GlueSchemaRegistry(conn) => conn,
521 o => unreachable!("{o:?} is not an AWS Glue Schema Registry connection"),
522 }
523 }
524
525 pub fn unwrap_iceberg_catalog(self) -> <InlinedConnection as ConnectionAccess>::IcebergCatalog {
526 match self {
527 Self::IcebergCatalog(conn) => conn,
528 o => unreachable!("{o:?} is not an Iceberg catalog connection"),
529 }
530 }
531}
532
533#[derive(thiserror::Error, Debug)]
535pub enum ConnectionValidationError {
536 #[error(transparent)]
537 Postgres(#[from] PostgresConnectionValidationError),
538 #[error(transparent)]
539 MySql(#[from] MySqlConnectionValidationError),
540 #[error(transparent)]
541 SqlServer(#[from] SqlServerConnectionValidationError),
542 #[error(transparent)]
543 Aws(#[from] AwsConnectionValidationError),
544 #[error(transparent)]
545 Gcp(#[from] gcp::GcpConnectionValidationError),
546 #[error("{}", .0.display_with_causes())]
547 Other(#[from] anyhow::Error),
548}
549
550impl ConnectionValidationError {
551 pub fn detail(&self) -> Option<String> {
553 match self {
554 ConnectionValidationError::Postgres(e) => e.detail(),
555 ConnectionValidationError::MySql(e) => e.detail(),
556 ConnectionValidationError::SqlServer(e) => e.detail(),
557 ConnectionValidationError::Aws(e) => e.detail(),
558 ConnectionValidationError::Gcp(e) => e.detail(),
559 ConnectionValidationError::Other(_) => None,
560 }
561 }
562
563 pub fn hint(&self) -> Option<String> {
565 match self {
566 ConnectionValidationError::Postgres(e) => e.hint(),
567 ConnectionValidationError::MySql(e) => e.hint(),
568 ConnectionValidationError::SqlServer(e) => e.hint(),
569 ConnectionValidationError::Aws(e) => e.hint(),
570 ConnectionValidationError::Gcp(e) => e.hint(),
571 ConnectionValidationError::Other(_) => None,
572 }
573 }
574}
575
576impl<C: ConnectionAccess> AlterCompatible for Connection<C> {
577 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
578 match (self, other) {
579 (Self::Aws(s), Self::Aws(o)) => s.alter_compatible(id, o),
580 (Self::AwsPrivatelink(s), Self::AwsPrivatelink(o)) => s.alter_compatible(id, o),
581 (Self::Gcp(s), Self::Gcp(o)) => s.alter_compatible(id, o),
582 (Self::Ssh(s), Self::Ssh(o)) => s.alter_compatible(id, o),
583 (Self::Csr(s), Self::Csr(o)) => s.alter_compatible(id, o),
584 (Self::Kafka(s), Self::Kafka(o)) => s.alter_compatible(id, o),
585 (Self::Postgres(s), Self::Postgres(o)) => s.alter_compatible(id, o),
586 (Self::MySql(s), Self::MySql(o)) => s.alter_compatible(id, o),
587 _ => {
588 tracing::warn!(
589 "Connection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
590 self,
591 other
592 );
593 Err(AlterError { id })
594 }
595 }
596 }
597}
598
599#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
601pub enum IcebergCatalogAuth<C: ConnectionAccess = InlinedConnection> {
602 OAuth {
604 credential: StringOrSecret,
606 scope: Option<String>,
608 },
609 Gcp(GcpConnectionReference<C>),
610}
611
612#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
613pub struct RestIcebergCatalog<C: ConnectionAccess = InlinedConnection> {
614 pub auth: IcebergCatalogAuth<C>,
615 pub warehouse: Option<String>,
617}
618
619#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
620pub struct S3TablesRestIcebergCatalog<C: ConnectionAccess = InlinedConnection> {
621 pub aws_connection: AwsConnectionReference<C>,
623 pub warehouse: String,
625}
626
627impl<R: ConnectionResolver> IntoInlineConnection<IcebergCatalogAuth, R>
628 for IcebergCatalogAuth<ReferencedConnection>
629{
630 fn into_inline_connection(self, r: R) -> IcebergCatalogAuth {
631 match self {
632 IcebergCatalogAuth::Gcp(x) => IcebergCatalogAuth::Gcp(x.into_inline_connection(&r)),
633 IcebergCatalogAuth::OAuth { credential, scope } => {
634 IcebergCatalogAuth::OAuth { credential, scope }
635 }
636 }
637 }
638}
639
640impl<R: ConnectionResolver> IntoInlineConnection<RestIcebergCatalog, R>
641 for RestIcebergCatalog<ReferencedConnection>
642{
643 fn into_inline_connection(self, r: R) -> RestIcebergCatalog {
644 RestIcebergCatalog {
645 auth: self.auth.into_inline_connection(&r),
646 warehouse: self.warehouse,
647 }
648 }
649}
650
651impl<R: ConnectionResolver> IntoInlineConnection<S3TablesRestIcebergCatalog, R>
652 for S3TablesRestIcebergCatalog<ReferencedConnection>
653{
654 fn into_inline_connection(self, r: R) -> S3TablesRestIcebergCatalog {
655 S3TablesRestIcebergCatalog {
656 aws_connection: self.aws_connection.into_inline_connection(&r),
657 warehouse: self.warehouse,
658 }
659 }
660}
661
662#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
663pub enum IcebergCatalogType {
664 Rest,
665 S3TablesRest,
666}
667
668#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
669pub enum IcebergCatalogImpl<C: ConnectionAccess = InlinedConnection> {
670 Rest(RestIcebergCatalog<C>),
671 S3TablesRest(S3TablesRestIcebergCatalog<C>),
672}
673
674impl<R: ConnectionResolver> IntoInlineConnection<IcebergCatalogImpl, R>
675 for IcebergCatalogImpl<ReferencedConnection>
676{
677 fn into_inline_connection(self, r: R) -> IcebergCatalogImpl {
678 match self {
679 IcebergCatalogImpl::Rest(rest) => {
680 IcebergCatalogImpl::Rest(rest.into_inline_connection(r))
681 }
682 IcebergCatalogImpl::S3TablesRest(s3tables) => {
683 IcebergCatalogImpl::S3TablesRest(s3tables.into_inline_connection(r))
684 }
685 }
686 }
687}
688
689#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
690pub struct IcebergCatalogConnection<C: ConnectionAccess = InlinedConnection> {
691 pub catalog: IcebergCatalogImpl<C>,
693 pub uri: reqwest::Url,
695}
696
697impl AlterCompatible for IcebergCatalogConnection {
698 fn alter_compatible(&self, id: GlobalId, _other: &Self) -> Result<(), AlterError> {
699 Err(AlterError { id })
700 }
701}
702
703impl<R: ConnectionResolver> IntoInlineConnection<IcebergCatalogConnection, R>
704 for IcebergCatalogConnection<ReferencedConnection>
705{
706 fn into_inline_connection(self, r: R) -> IcebergCatalogConnection {
707 IcebergCatalogConnection {
708 catalog: self.catalog.into_inline_connection(&r),
709 uri: self.uri,
710 }
711 }
712}
713
714impl<C: ConnectionAccess> IcebergCatalogConnection<C> {
715 fn validate_by_default(&self) -> bool {
716 true
717 }
718}
719
720impl IcebergCatalogConnection<InlinedConnection> {
721 pub async fn connect(
722 &self,
723 storage_configuration: &StorageConfiguration,
724 in_task: InTask,
725 ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
726 match self.catalog {
727 IcebergCatalogImpl::S3TablesRest(ref s3tables) => {
728 self.connect_s3tables(s3tables, storage_configuration, in_task)
729 .await
730 }
731 IcebergCatalogImpl::Rest(ref rest) => {
732 self.connect_rest(rest, storage_configuration, in_task)
733 .await
734 }
735 }
736 }
737
738 pub fn catalog_type(&self) -> IcebergCatalogType {
739 match self.catalog {
740 IcebergCatalogImpl::S3TablesRest(_) => IcebergCatalogType::S3TablesRest,
741 IcebergCatalogImpl::Rest(_) => IcebergCatalogType::Rest,
742 }
743 }
744
745 pub fn s3tables_catalog(&self) -> Option<&S3TablesRestIcebergCatalog> {
746 match &self.catalog {
747 IcebergCatalogImpl::S3TablesRest(s3tables) => Some(s3tables),
748 IcebergCatalogImpl::Rest(_) => None,
749 }
750 }
751
752 pub fn rest_catalog(&self) -> Option<&RestIcebergCatalog> {
753 match &self.catalog {
754 IcebergCatalogImpl::Rest(rest) => Some(rest),
755 IcebergCatalogImpl::S3TablesRest(_) => None,
756 }
757 }
758
759 async fn connect_s3tables(
760 &self,
761 s3tables: &S3TablesRestIcebergCatalog,
762 storage_configuration: &StorageConfiguration,
763 in_task: InTask,
764 ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
765 let secret_reader = &storage_configuration.connection_context.secrets_reader;
766 let aws_ref = &s3tables.aws_connection;
767 let aws_config = aws_ref
768 .connection
769 .load_sdk_config(
770 &storage_configuration.connection_context,
771 aws_ref.connection_id,
772 in_task,
773 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
774 )
775 .await
776 .with_context(|| {
777 format!(
778 "failed to load AWS SDK config for S3 Tables Iceberg catalog \
779 (connection id: {}, auth method: {}, catalog uri: {}, warehouse: {})",
780 aws_ref.connection_id,
781 aws_ref.connection.auth_method(),
782 self.uri,
783 s3tables.warehouse
784 )
785 })?;
786
787 let aws_region = aws_ref
788 .connection
789 .region
790 .clone()
791 .unwrap_or_else(|| "us-east-1".to_string());
792
793 let mut props = vec![
794 (S3_REGION.to_string(), aws_region.clone()),
795 (S3_DISABLE_EC2_METADATA.to_string(), "true".to_string()),
796 (
797 REST_CATALOG_PROP_WAREHOUSE.to_string(),
798 s3tables.warehouse.clone(),
799 ),
800 (REST_CATALOG_PROP_URI.to_string(), self.uri.to_string()),
801 ];
802
803 let aws_auth = aws_ref.connection.auth.clone();
804
805 if let AwsAuth::Credentials(creds) = &aws_auth {
806 props.push((
807 S3_ACCESS_KEY_ID.to_string(),
808 creds
809 .access_key_id
810 .get_string(in_task, secret_reader)
811 .await?,
812 ));
813 props.push((
814 S3_SECRET_ACCESS_KEY.to_string(),
815 secret_reader.read_string(creds.secret_access_key).await?,
816 ));
817 }
818
819 let credentials_provider = aws_config
823 .credentials_provider()
824 .ok_or_else(|| anyhow!("aws_config missing credentials provider"))?;
825
826 let authenticator = Arc::new(Sigv4Authenticator {
827 provider: credentials_provider.clone(),
828 region: aws_region.clone(),
829 signing_name: "s3tables".to_string(),
830 });
831
832 let customized_credential_load = if matches!(aws_auth, AwsAuth::AssumeRole(_)) {
835 Some(CustomAwsCredentialLoader::new(Arc::new(
836 AwsSdkCredentialLoader::new(credentials_provider),
837 )))
838 } else {
839 None
840 };
841
842 let storage_factory = Arc::new(OpenDalStorageFactory::S3 {
843 configured_scheme: "s3".to_string(),
844 customized_credential_load,
845 });
846
847 let catalog = RestCatalogBuilder::default()
848 .with_storage_factory(storage_factory)
849 .with_authenticator(authenticator)
850 .load("IcebergCatalog", props.into_iter().collect())
851 .await
852 .with_context(|| {
853 format!(
854 "failed to create S3 Tables Iceberg catalog \
855 (connection id: {}, catalog uri: {}, warehouse: {})",
856 aws_ref.connection_id, self.uri, s3tables.warehouse
857 )
858 })?;
859
860 Ok(Arc::new(catalog))
861 }
862
863 async fn connect_rest(
864 &self,
865 rest: &RestIcebergCatalog,
866 storage_configuration: &StorageConfiguration,
867 in_task: InTask,
868 ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
869 let mut props = BTreeMap::from([(
870 REST_CATALOG_PROP_URI.to_string(),
871 self.uri.to_string().clone(),
872 )]);
873
874 if let Some(warehouse) = &rest.warehouse {
875 props.insert(REST_CATALOG_PROP_WAREHOUSE.to_string(), warehouse.clone());
876 }
877
878 let (storage_factory, custom_authenticator) = match &rest.auth {
882 IcebergCatalogAuth::OAuth { credential, scope } => {
883 let credential = credential
884 .get_string(
885 in_task,
886 &storage_configuration.connection_context.secrets_reader,
887 )
888 .await
889 .map_err(|e| anyhow!("failed to read Iceberg catalog credential: {e}"))?;
890 props.insert(REST_CATALOG_PROP_CREDENTIAL.to_string(), credential);
891
892 if let Some(scope) = scope {
893 props.insert(REST_CATALOG_PROP_SCOPE.to_string(), scope.clone());
894 }
895 (
896 OpenDalStorageFactory::S3 {
897 configured_scheme: "s3".to_string(),
898 customized_credential_load: None,
903 },
904 None,
905 )
906 }
907 IcebergCatalogAuth::Gcp(gcp_connection_reference) => {
908 let (creds_json, service_account) = gcp_connection_reference
909 .connection
910 .read_credentials(storage_configuration)
911 .await
912 .map_err(|e| anyhow!("failed to parse GCP service account JSON: {e}"))?;
913
914 props.insert(
915 GCS_CREDENTIALS_JSON.to_owned(),
916 base64::engine::general_purpose::STANDARD.encode(creds_json),
917 );
918 props.insert(GCS_DISABLE_VM_METADATA.to_owned(), "true".to_owned());
920 props.insert(GCS_DISABLE_CONFIG_LOAD.to_owned(), "true".to_owned());
921 if let Some(project_id) = service_account.project_id() {
922 props.insert(GCS_USER_PROJECT.to_owned(), project_id.to_owned());
923 props.insert(
924 "header.x-goog-user-project".to_owned(),
925 project_id.to_owned(),
926 );
927 }
928
929 (
930 OpenDalStorageFactory::Gcs,
931 Some(iceberg_catalog_rest::BearerTokenAuthenticator::new(
932 Arc::new(GcpTokenProvider { service_account }),
933 )),
934 )
935 }
936 };
937
938 let mut catalog =
939 RestCatalogBuilder::default().with_storage_factory(Arc::new(storage_factory));
940 if let Some(auth) = custom_authenticator {
941 catalog = catalog.with_authenticator(Arc::new(auth));
942 }
943 let catalog = catalog
944 .load("IcebergCatalog", props.into_iter().collect())
945 .await
946 .map_err(|e| anyhow!("failed to create Iceberg catalog: {e}"))?;
947 Ok(Arc::new(catalog))
948 }
949
950 async fn validate(
951 &self,
952 _id: CatalogItemId,
953 storage_configuration: &StorageConfiguration,
954 ) -> Result<(), ConnectionValidationError> {
955 let catalog = self
956 .connect(storage_configuration, InTask::No)
957 .await
958 .map_err(|e| {
959 ConnectionValidationError::Other(anyhow!("failed to connect to catalog: {e}"))
960 })?;
961
962 catalog.list_namespaces(None).await.map_err(|e| {
964 ConnectionValidationError::Other(anyhow!("failed to list namespaces: {e}"))
965 })?;
966
967 Ok(())
968 }
969}
970
971#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
972pub struct AwsPrivatelinkConnection {
973 pub service_name: String,
974 pub availability_zones: Vec<String>,
975}
976
977impl AlterCompatible for AwsPrivatelinkConnection {
978 fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
979 Ok(())
981 }
982}
983
984#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
985pub struct KafkaTlsConfig {
986 pub identity: Option<TlsIdentity>,
987 pub root_cert: Option<StringOrSecret>,
988}
989
990#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
991pub struct KafkaSaslConfig<C: ConnectionAccess = InlinedConnection> {
992 pub mechanism: String,
993 pub username: StringOrSecret,
994 pub password: Option<CatalogItemId>,
995 pub aws: Option<AwsConnectionReference<C>>,
996}
997
998impl<R: ConnectionResolver> IntoInlineConnection<KafkaSaslConfig, R>
999 for KafkaSaslConfig<ReferencedConnection>
1000{
1001 fn into_inline_connection(self, r: R) -> KafkaSaslConfig {
1002 KafkaSaslConfig {
1003 mechanism: self.mechanism,
1004 username: self.username,
1005 password: self.password,
1006 aws: self.aws.map(|aws| aws.into_inline_connection(&r)),
1007 }
1008 }
1009}
1010
1011#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1013pub struct KafkaBroker<C: ConnectionAccess = InlinedConnection> {
1014 pub address: String,
1016 pub tunnel: Tunnel<C>,
1018}
1019
1020impl<R: ConnectionResolver> IntoInlineConnection<KafkaBroker, R>
1021 for KafkaBroker<ReferencedConnection>
1022{
1023 fn into_inline_connection(self, r: R) -> KafkaBroker {
1024 let KafkaBroker { address, tunnel } = self;
1025 KafkaBroker {
1026 address,
1027 tunnel: tunnel.into_inline_connection(r),
1028 }
1029 }
1030}
1031
1032#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Default)]
1033pub struct KafkaTopicOptions {
1034 pub replication_factor: Option<NonNeg<i32>>,
1037 pub partition_count: Option<NonNeg<i32>>,
1040 pub topic_config: BTreeMap<String, String>,
1042}
1043
1044#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1045pub struct KafkaConnection<C: ConnectionAccess = InlinedConnection> {
1046 pub brokers: Vec<KafkaBroker<C>>,
1047 pub default_tunnel: Tunnel<C>,
1051 pub progress_topic: Option<String>,
1052 pub progress_topic_options: KafkaTopicOptions,
1053 pub options: BTreeMap<String, StringOrSecret>,
1054 pub tls: Option<KafkaTlsConfig>,
1055 pub sasl: Option<KafkaSaslConfig<C>>,
1056}
1057
1058impl<R: ConnectionResolver> IntoInlineConnection<KafkaConnection, R>
1059 for KafkaConnection<ReferencedConnection>
1060{
1061 fn into_inline_connection(self, r: R) -> KafkaConnection {
1062 let KafkaConnection {
1063 brokers,
1064 progress_topic,
1065 progress_topic_options,
1066 default_tunnel,
1067 options,
1068 tls,
1069 sasl,
1070 } = self;
1071
1072 let brokers = brokers
1073 .into_iter()
1074 .map(|broker| broker.into_inline_connection(&r))
1075 .collect();
1076
1077 KafkaConnection {
1078 brokers,
1079 progress_topic,
1080 progress_topic_options,
1081 default_tunnel: default_tunnel.into_inline_connection(&r),
1082 options,
1083 tls,
1084 sasl: sasl.map(|sasl| sasl.into_inline_connection(&r)),
1085 }
1086 }
1087}
1088
1089impl<C: ConnectionAccess> KafkaConnection<C> {
1090 pub fn progress_topic(
1095 &self,
1096 connection_context: &ConnectionContext,
1097 connection_id: CatalogItemId,
1098 ) -> Cow<'_, str> {
1099 if let Some(progress_topic) = &self.progress_topic {
1100 Cow::Borrowed(progress_topic)
1101 } else {
1102 Cow::Owned(format!(
1103 "_materialize-progress-{}-{}",
1104 connection_context.environment_id, connection_id,
1105 ))
1106 }
1107 }
1108
1109 fn validate_by_default(&self) -> bool {
1110 true
1111 }
1112}
1113
1114impl KafkaConnection {
1115 pub fn id_base(
1119 connection_context: &ConnectionContext,
1120 connection_id: CatalogItemId,
1121 object_id: GlobalId,
1122 ) -> String {
1123 format!(
1124 "materialize-{}-{}-{}",
1125 connection_context.environment_id, connection_id, object_id,
1126 )
1127 }
1128
1129 pub fn enrich_client_id(&self, configs: &ConfigSet, client_id: &mut String) {
1132 #[derive(Debug, Deserialize)]
1133 struct EnrichmentRule {
1134 #[serde(deserialize_with = "deserialize_regex")]
1135 pattern: Regex,
1136 payload: String,
1137 }
1138
1139 fn deserialize_regex<'de, D>(deserializer: D) -> Result<Regex, D::Error>
1140 where
1141 D: Deserializer<'de>,
1142 {
1143 let buf = String::deserialize(deserializer)?;
1144 Regex::new(&buf).map_err(serde::de::Error::custom)
1145 }
1146
1147 let rules = KAFKA_CLIENT_ID_ENRICHMENT_RULES.get(configs);
1148 let rules = match serde_json::from_value::<Vec<EnrichmentRule>>(rules) {
1149 Ok(rules) => rules,
1150 Err(e) => {
1151 warn!(%e, "failed to decode kafka_client_id_enrichment_rules");
1152 return;
1153 }
1154 };
1155
1156 debug!(?self.brokers, "evaluating client ID enrichment rules");
1161 for rule in rules {
1162 let is_match = self
1163 .brokers
1164 .iter()
1165 .any(|b| rule.pattern.is_match(&b.address));
1166 debug!(?rule, is_match, "evaluated client ID enrichment rule");
1167 if is_match {
1168 client_id.push('-');
1169 client_id.push_str(&rule.payload);
1170 }
1171 }
1172 }
1173
1174 pub async fn create_with_context<C, T>(
1176 &self,
1177 storage_configuration: &StorageConfiguration,
1178 context: C,
1179 extra_options: &BTreeMap<&str, String>,
1180 in_task: InTask,
1181 ) -> Result<T, ContextCreationError>
1182 where
1183 C: ClientContext,
1184 T: FromClientConfigAndContext<TunnelingClientContext<C>>,
1185 {
1186 let mut options = self.options.clone();
1187
1188 options.insert("allow.auto.create.topics".into(), "false".into());
1193
1194 let brokers = match &self.default_tunnel {
1195 Tunnel::AwsPrivatelink(t) => {
1196 assert!(&self.brokers.is_empty());
1197
1198 let algo = KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM
1199 .get(storage_configuration.config_set());
1200 options.insert("ssl.endpoint.identification.algorithm".into(), algo.into());
1201
1202 format!(
1205 "{}:{}",
1206 vpc_endpoint_host(
1207 t.connection_id,
1208 None, ),
1210 t.port.unwrap_or(9092)
1211 )
1212 }
1213 Tunnel::AwsPrivatelinks(_pl) => {
1214 let algo = KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM
1215 .get(storage_configuration.config_set());
1216 options.insert("ssl.endpoint.identification.algorithm".into(), algo.into());
1217
1218 if self.brokers.is_empty() {
1219 return Err(ContextCreationError::Other(anyhow::anyhow!(
1220 "at least one static broker is required when using BROKER or BROKERS"
1221 )));
1222 }
1223 self.brokers.iter().map(|b| &b.address).join(",")
1224 }
1225 _ => self.brokers.iter().map(|b| &b.address).join(","),
1226 };
1227 options.insert("bootstrap.servers".into(), brokers.clone().into());
1228 let security_protocol = match (self.tls.is_some(), self.sasl.is_some()) {
1229 (false, false) => "PLAINTEXT",
1230 (true, false) => "SSL",
1231 (false, true) => "SASL_PLAINTEXT",
1232 (true, true) => "SASL_SSL",
1233 };
1234 info!(
1235 "kafka: create_with_context bootstrap.servers={brokers}, security_protocol={security_protocol}"
1236 );
1237 options.insert("security.protocol".into(), security_protocol.into());
1238 if let Some(tls) = &self.tls {
1239 if let Some(root_cert) = &tls.root_cert {
1240 options.insert("ssl.ca.pem".into(), root_cert.clone());
1241 }
1242 if let Some(identity) = &tls.identity {
1243 options.insert("ssl.key.pem".into(), StringOrSecret::Secret(identity.key));
1244 options.insert("ssl.certificate.pem".into(), identity.cert.clone());
1245 }
1246 }
1247 if let Some(sasl) = &self.sasl {
1248 options.insert("sasl.mechanisms".into(), (&sasl.mechanism).into());
1249 options.insert("sasl.username".into(), sasl.username.clone());
1250 if let Some(password) = sasl.password {
1251 options.insert("sasl.password".into(), StringOrSecret::Secret(password));
1252 }
1253 }
1254
1255 options.insert(
1256 "retry.backoff.ms".into(),
1257 KAFKA_RETRY_BACKOFF
1258 .get(storage_configuration.config_set())
1259 .as_millis()
1260 .into(),
1261 );
1262 options.insert(
1263 "retry.backoff.max.ms".into(),
1264 KAFKA_RETRY_BACKOFF_MAX
1265 .get(storage_configuration.config_set())
1266 .as_millis()
1267 .into(),
1268 );
1269 options.insert(
1270 "reconnect.backoff.ms".into(),
1271 KAFKA_RECONNECT_BACKOFF
1272 .get(storage_configuration.config_set())
1273 .as_millis()
1274 .into(),
1275 );
1276 options.insert(
1277 "reconnect.backoff.max.ms".into(),
1278 KAFKA_RECONNECT_BACKOFF_MAX
1279 .get(storage_configuration.config_set())
1280 .as_millis()
1281 .into(),
1282 );
1283
1284 let mut config = mz_kafka_util::client::create_new_client_config(
1285 storage_configuration
1286 .connection_context
1287 .librdkafka_log_level,
1288 storage_configuration.parameters.kafka_timeout_config,
1289 );
1290 for (k, v) in options {
1291 config.set(
1292 k,
1293 v.get_string(
1294 in_task,
1295 &storage_configuration.connection_context.secrets_reader,
1296 )
1297 .await
1298 .context("reading kafka secret")?,
1299 );
1300 }
1301 for (k, v) in extra_options {
1302 config.set(*k, v);
1303 }
1304
1305 let aws_config = match self.sasl.as_ref().and_then(|sasl| sasl.aws.as_ref()) {
1306 None => None,
1307 Some(aws) => Some(
1308 aws.connection
1309 .load_sdk_config(
1310 &storage_configuration.connection_context,
1311 aws.connection_id,
1312 in_task,
1313 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1314 )
1315 .await?,
1316 ),
1317 };
1318
1319 let mut context = TunnelingClientContext::new(
1323 context,
1324 Handle::current(),
1325 storage_configuration
1326 .connection_context
1327 .ssh_tunnel_manager
1328 .clone(),
1329 storage_configuration.parameters.ssh_timeout_config,
1330 aws_config,
1331 in_task,
1332 );
1333
1334 match &self.default_tunnel {
1335 Tunnel::Direct => {
1336 }
1338 Tunnel::AwsPrivatelink(pl) => {
1339 context.set_default_tunnel(TunnelConfig::StaticHost(
1340 KafkaConnection::from_default_aws_privatelink(pl).host,
1342 ));
1343 }
1344 Tunnel::AwsPrivatelinks(pl) => {
1345 context.set_default_tunnel(TunnelConfig::Rules(
1346 KafkaConnection::from_aws_privatelinks(pl),
1347 ));
1348 }
1349 Tunnel::Ssh(ssh_tunnel) => {
1350 let secret = storage_configuration
1351 .connection_context
1352 .secrets_reader
1353 .read_in_task_if(in_task, ssh_tunnel.connection_id)
1354 .await?;
1355 let key_pair = SshKeyPair::from_bytes(&secret)?;
1356
1357 let resolved = resolve_address(
1359 &ssh_tunnel.connection.host,
1360 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1361 )
1362 .await?;
1363 context.set_default_tunnel(TunnelConfig::Ssh(SshTunnelConfig {
1364 host: resolved
1365 .iter()
1366 .map(|a| a.to_string())
1367 .collect::<BTreeSet<_>>(),
1368 port: ssh_tunnel.connection.port,
1369 user: ssh_tunnel.connection.user.clone(),
1370 key_pair,
1371 }));
1372 }
1373 }
1374 info!(
1375 "kafka: tunnel config set to {}",
1376 match &self.default_tunnel {
1377 Tunnel::Direct => "Direct".to_string(),
1378 Tunnel::AwsPrivatelink(_) => "AwsPrivatelink (static host)".to_string(),
1379 Tunnel::AwsPrivatelinks(pl) =>
1380 format!("AwsPrivatelinks ({} rules)", pl.rules.len()),
1381 Tunnel::Ssh(_) => "Ssh".to_string(),
1382 }
1383 );
1384
1385 for broker in &self.brokers {
1388 let mut addr_parts = broker.address.splitn(2, ':');
1389 let addr = BrokerAddr {
1390 host: addr_parts
1391 .next()
1392 .context("BROKER is not address:port")?
1393 .into(),
1394 port: addr_parts
1395 .next()
1396 .unwrap_or("9092")
1397 .parse()
1398 .context("parsing BROKER port")?,
1399 };
1400 match &broker.tunnel {
1401 Tunnel::Direct => {
1402 }
1412 Tunnel::AwsPrivatelink(aws_privatelink) => {
1413 context.add_broker_rewrite(
1414 addr,
1415 KafkaConnection::from_aws_privatelink(aws_privatelink),
1416 );
1417 }
1418 Tunnel::AwsPrivatelinks(_) => unreachable!(
1419 "Individually predefined brokers do not use rule-based PrivateLinks routing."
1420 ),
1421 Tunnel::Ssh(ssh_tunnel) => {
1422 let ssh_host_resolved = resolve_address(
1424 &ssh_tunnel.connection.host,
1425 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1426 )
1427 .await?;
1428 context
1429 .add_ssh_tunnel(
1430 addr,
1431 SshTunnelConfig {
1432 host: ssh_host_resolved
1433 .iter()
1434 .map(|a| a.to_string())
1435 .collect::<BTreeSet<_>>(),
1436 port: ssh_tunnel.connection.port,
1437 user: ssh_tunnel.connection.user.clone(),
1438 key_pair: SshKeyPair::from_bytes(
1439 &storage_configuration
1440 .connection_context
1441 .secrets_reader
1442 .read_in_task_if(in_task, ssh_tunnel.connection_id)
1443 .await?,
1444 )?,
1445 },
1446 )
1447 .await
1448 .map_err(ContextCreationError::Ssh)?;
1449 }
1450 }
1451 }
1452
1453 Ok(config.create_with_context(context)?)
1454 }
1455
1456 async fn validate(
1457 &self,
1458 _id: CatalogItemId,
1459 storage_configuration: &StorageConfiguration,
1460 ) -> Result<(), anyhow::Error> {
1461 let (context, error_rx) = MzClientContext::with_errors();
1462 let consumer: BaseConsumer<_> = self
1463 .create_with_context(
1464 storage_configuration,
1465 context,
1466 &BTreeMap::new(),
1467 InTask::No,
1469 )
1470 .await?;
1471 let consumer = Arc::new(consumer);
1472
1473 let timeout = storage_configuration
1474 .parameters
1475 .kafka_timeout_config
1476 .fetch_metadata_timeout;
1477
1478 info!("kafka: starting connection validation via fetch_metadata (timeout={timeout:?})");
1489 let result = mz_ore::task::spawn_blocking(|| "kafka_get_metadata", {
1490 let consumer = Arc::clone(&consumer);
1491 move || consumer.fetch_metadata(None, timeout)
1492 })
1493 .await;
1494 info!(
1495 "kafka: connection validation result: {}",
1496 if result.is_ok() { "success" } else { "failed" },
1497 );
1498 match result {
1499 Ok(_) => Ok(()),
1500 Err(err) => {
1505 let main_err = error_rx.try_iter().reduce(|cur, new| match cur {
1509 MzKafkaError::Internal(_) => new,
1510 _ => cur,
1511 });
1512
1513 drop(consumer);
1517
1518 match main_err {
1519 Some(err) => Err(err.into()),
1520 None => Err(err.into()),
1521 }
1522 }
1523 }
1524 }
1525
1526 fn from_default_aws_privatelink(pl: &AwsPrivatelink) -> BrokerRewrite {
1528 BrokerRewrite {
1529 host: vpc_endpoint_host(
1530 pl.connection_id,
1531 None, ),
1533 port: pl.port,
1534 }
1535 }
1536
1537 fn from_aws_privatelink(pl: &AwsPrivatelink) -> BrokerRewrite {
1539 BrokerRewrite {
1540 host: vpc_endpoint_host(pl.connection_id, pl.availability_zone.as_deref()),
1541 port: pl.port,
1542 }
1543 }
1544
1545 fn from_aws_privatelink_rule(
1546 AwsPrivatelinkRule { pattern, to }: &AwsPrivatelinkRule,
1547 ) -> (mz_kafka_util::client::ConnectionRulePattern, BrokerRewrite) {
1548 (
1549 mz_kafka_util::client::ConnectionRulePattern {
1550 prefix_wildcard: pattern.prefix_wildcard,
1551 literal_match: pattern.literal_match.clone(),
1552 suffix_wildcard: pattern.suffix_wildcard,
1553 },
1554 KafkaConnection::from_aws_privatelink(to),
1555 )
1556 }
1557
1558 fn from_aws_privatelinks(pl: &AwsPrivatelinks) -> HostMappingRules {
1559 HostMappingRules {
1560 rules: pl
1561 .rules
1562 .iter()
1563 .map(KafkaConnection::from_aws_privatelink_rule)
1564 .collect_vec(),
1565 }
1566 }
1567}
1568
1569impl<C: ConnectionAccess> AlterCompatible for KafkaConnection<C> {
1570 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1571 let KafkaConnection {
1572 brokers: _,
1573 default_tunnel: _,
1574 progress_topic,
1575 progress_topic_options,
1576 options: _,
1577 tls: _,
1578 sasl: _,
1579 } = self;
1580
1581 let compatibility_checks = [
1582 (progress_topic == &other.progress_topic, "progress_topic"),
1583 (
1584 progress_topic_options == &other.progress_topic_options,
1585 "progress_topic_options",
1586 ),
1587 ];
1588
1589 for (compatible, field) in compatibility_checks {
1590 if !compatible {
1591 tracing::warn!(
1592 "KafkaConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1593 self,
1594 other
1595 );
1596
1597 return Err(AlterError { id });
1598 }
1599 }
1600
1601 Ok(())
1602 }
1603}
1604
1605#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1607pub struct CsrConnection<C: ConnectionAccess = InlinedConnection> {
1608 pub url: Url,
1610 pub tls_root_cert: Option<StringOrSecret>,
1612 pub tls_identity: Option<TlsIdentity>,
1615 pub http_auth: Option<CsrConnectionHttpAuth>,
1617 pub tunnel: Tunnel<C>,
1619}
1620
1621impl<R: ConnectionResolver> IntoInlineConnection<CsrConnection, R>
1622 for CsrConnection<ReferencedConnection>
1623{
1624 fn into_inline_connection(self, r: R) -> CsrConnection {
1625 let CsrConnection {
1626 url,
1627 tls_root_cert,
1628 tls_identity,
1629 http_auth,
1630 tunnel,
1631 } = self;
1632 CsrConnection {
1633 url,
1634 tls_root_cert,
1635 tls_identity,
1636 http_auth,
1637 tunnel: tunnel.into_inline_connection(r),
1638 }
1639 }
1640}
1641
1642impl<C: ConnectionAccess> CsrConnection<C> {
1643 fn validate_by_default(&self) -> bool {
1644 true
1645 }
1646}
1647
1648impl CsrConnection {
1649 pub async fn connect(
1651 &self,
1652 storage_configuration: &StorageConfiguration,
1653 in_task: InTask,
1654 ) -> Result<mz_ccsr::Client, CsrConnectError> {
1655 let mut client_config = mz_ccsr::ClientConfig::new(self.url.clone());
1656 if let Some(root_cert) = &self.tls_root_cert {
1657 let root_cert = root_cert
1658 .get_string(
1659 in_task,
1660 &storage_configuration.connection_context.secrets_reader,
1661 )
1662 .await?;
1663 let root_cert = Certificate::from_pem(root_cert.as_bytes())?;
1664 client_config = client_config.add_root_certificate(root_cert);
1665 }
1666
1667 if let Some(tls_identity) = &self.tls_identity {
1668 let key = &storage_configuration
1669 .connection_context
1670 .secrets_reader
1671 .read_string_in_task_if(in_task, tls_identity.key)
1672 .await?;
1673 let cert = tls_identity
1674 .cert
1675 .get_string(
1676 in_task,
1677 &storage_configuration.connection_context.secrets_reader,
1678 )
1679 .await?;
1680 let ident = Identity::from_pem(key.as_bytes(), cert.as_bytes())?;
1681 client_config = client_config.identity(ident);
1682 }
1683
1684 if let Some(http_auth) = &self.http_auth {
1685 let username = http_auth
1686 .username
1687 .get_string(
1688 in_task,
1689 &storage_configuration.connection_context.secrets_reader,
1690 )
1691 .await?;
1692 let password = match http_auth.password {
1693 None => None,
1694 Some(password) => Some(
1695 storage_configuration
1696 .connection_context
1697 .secrets_reader
1698 .read_string_in_task_if(in_task, password)
1699 .await?,
1700 ),
1701 };
1702 client_config = client_config.auth(username, password);
1703 }
1704
1705 let host = self
1707 .url
1708 .host_str()
1709 .ok_or_else(|| anyhow!("url missing host"))?;
1710 match &self.tunnel {
1711 Tunnel::Direct => {
1712 let resolved = resolve_address(
1714 host,
1715 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1716 )
1717 .await?;
1718 client_config = client_config.resolve_to_addrs(
1719 host,
1720 &resolved
1721 .iter()
1722 .map(|addr| SocketAddr::new(*addr, 0))
1723 .collect::<Vec<_>>(),
1724 )
1725 }
1726 Tunnel::Ssh(ssh_tunnel) => {
1727 let ssh_tunnel = ssh_tunnel
1728 .connect(
1729 storage_configuration,
1730 host,
1731 self.url.port_or_known_default().unwrap_or(80),
1734 in_task,
1735 )
1736 .await
1737 .map_err(CsrConnectError::Ssh)?;
1738
1739 client_config = client_config
1745 .resolve_to_addrs(host, &[SocketAddr::new(ssh_tunnel.local_addr().ip(), 0)])
1752 .dynamic_url({
1763 let remote_url = self.url.clone();
1764 move || {
1765 let mut url = remote_url.clone();
1766 url.set_port(Some(ssh_tunnel.local_addr().port()))
1767 .expect("cannot fail");
1768 url
1769 }
1770 });
1771 }
1772 Tunnel::AwsPrivatelink(connection) => {
1773 assert_none!(connection.port);
1774
1775 let privatelink_host = mz_cloud_resources::vpc_endpoint_host(
1776 connection.connection_id,
1777 connection.availability_zone.as_deref(),
1778 );
1779 let addrs: Vec<_> = net::lookup_host((privatelink_host, 0))
1780 .await
1781 .context("resolving PrivateLink host")?
1782 .collect();
1783 client_config = client_config.resolve_to_addrs(host, &addrs)
1784 }
1785 Tunnel::AwsPrivatelinks(_) => {
1786 unreachable!("MATCHING broker rules are only available for Kafka connections.");
1787 }
1788 }
1789
1790 Ok(client_config.build()?)
1791 }
1792
1793 async fn validate(
1794 &self,
1795 _id: CatalogItemId,
1796 storage_configuration: &StorageConfiguration,
1797 ) -> Result<(), anyhow::Error> {
1798 let client = self
1799 .connect(
1800 storage_configuration,
1801 InTask::No,
1803 )
1804 .await?;
1805 client.list_subjects().await?;
1806 Ok(())
1807 }
1808}
1809
1810impl<C: ConnectionAccess> AlterCompatible for CsrConnection<C> {
1811 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1812 let CsrConnection {
1813 tunnel,
1814 url: _,
1816 tls_root_cert: _,
1817 tls_identity: _,
1818 http_auth: _,
1819 } = self;
1820
1821 let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
1822
1823 for (compatible, field) in compatibility_checks {
1824 if !compatible {
1825 tracing::warn!(
1826 "CsrConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1827 self,
1828 other
1829 );
1830
1831 return Err(AlterError { id });
1832 }
1833 }
1834 Ok(())
1835 }
1836}
1837
1838#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1843pub struct GlueSchemaRegistryConnection<C: ConnectionAccess = InlinedConnection> {
1844 pub aws_connection: AwsConnectionReference<C>,
1847 pub registry_name: String,
1849}
1850
1851impl<R: ConnectionResolver> IntoInlineConnection<GlueSchemaRegistryConnection, R>
1852 for GlueSchemaRegistryConnection<ReferencedConnection>
1853{
1854 fn into_inline_connection(self, r: R) -> GlueSchemaRegistryConnection {
1855 let GlueSchemaRegistryConnection {
1856 aws_connection,
1857 registry_name,
1858 } = self;
1859 GlueSchemaRegistryConnection {
1860 aws_connection: aws_connection.into_inline_connection(&r),
1861 registry_name,
1862 }
1863 }
1864}
1865
1866impl<C: ConnectionAccess> GlueSchemaRegistryConnection<C> {
1867 fn validate_by_default(&self) -> bool {
1868 true
1872 }
1873}
1874
1875impl GlueSchemaRegistryConnection {
1876 async fn validate(
1877 &self,
1878 _id: CatalogItemId,
1879 storage_configuration: &StorageConfiguration,
1880 ) -> Result<(), anyhow::Error> {
1881 let enforce_external_addresses =
1882 crate::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set());
1883 let sdk_config = self
1884 .aws_connection
1885 .connection
1886 .load_sdk_config(
1887 &storage_configuration.connection_context,
1888 self.aws_connection.connection_id,
1889 InTask::No,
1891 enforce_external_addresses,
1892 )
1893 .await?;
1894 let client = mz_aws_glue_schema_registry::ClientConfig::new(sdk_config).build();
1895 match client.get_registry(&self.registry_name).await {
1896 Ok(_) => Ok(()),
1897 Err(mz_aws_glue_schema_registry::GetRegistryError::NotFound) => Err(anyhow!(
1898 "AWS Glue Schema Registry {:?} does not exist in the configured account/region",
1899 self.registry_name
1900 )),
1901 Err(err) => Err(anyhow::Error::new(err).context(format!(
1902 "failed to validate AWS Glue Schema Registry connection (registry={:?})",
1903 self.registry_name
1904 ))),
1905 }
1906 }
1907}
1908
1909impl<C: ConnectionAccess> AlterCompatible for GlueSchemaRegistryConnection<C> {
1910 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1911 let GlueSchemaRegistryConnection {
1912 registry_name,
1913 aws_connection: _,
1916 } = self;
1917
1918 let compatibility_checks = [(registry_name == &other.registry_name, "registry_name")];
1919
1920 for (compatible, field) in compatibility_checks {
1921 if !compatible {
1922 tracing::warn!(
1923 "GlueSchemaRegistryConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1924 self,
1925 other
1926 );
1927
1928 return Err(AlterError { id });
1929 }
1930 }
1931 Ok(())
1932 }
1933}
1934
1935#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1937pub struct TlsIdentity {
1938 pub cert: StringOrSecret,
1940 pub key: CatalogItemId,
1943}
1944
1945#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1947pub struct CsrConnectionHttpAuth {
1948 pub username: StringOrSecret,
1950 pub password: Option<CatalogItemId>,
1952}
1953
1954#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1956pub struct PostgresConnection<C: ConnectionAccess = InlinedConnection> {
1957 pub host: String,
1959 pub port: u16,
1961 pub database: String,
1963 pub user: StringOrSecret,
1965 pub password: Option<CatalogItemId>,
1967 pub tunnel: Tunnel<C>,
1969 pub tls_mode: SslMode,
1971 pub tls_root_cert: Option<StringOrSecret>,
1974 pub tls_identity: Option<TlsIdentity>,
1976}
1977
1978impl<R: ConnectionResolver> IntoInlineConnection<PostgresConnection, R>
1979 for PostgresConnection<ReferencedConnection>
1980{
1981 fn into_inline_connection(self, r: R) -> PostgresConnection {
1982 let PostgresConnection {
1983 host,
1984 port,
1985 database,
1986 user,
1987 password,
1988 tunnel,
1989 tls_mode,
1990 tls_root_cert,
1991 tls_identity,
1992 } = self;
1993
1994 PostgresConnection {
1995 host,
1996 port,
1997 database,
1998 user,
1999 password,
2000 tunnel: tunnel.into_inline_connection(r),
2001 tls_mode,
2002 tls_root_cert,
2003 tls_identity,
2004 }
2005 }
2006}
2007
2008impl<C: ConnectionAccess> PostgresConnection<C> {
2009 fn validate_by_default(&self) -> bool {
2010 true
2011 }
2012}
2013
2014impl PostgresConnection<InlinedConnection> {
2015 pub async fn config(
2016 &self,
2017 secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
2018 storage_configuration: &StorageConfiguration,
2019 in_task: InTask,
2020 ) -> Result<mz_postgres_util::Config, anyhow::Error> {
2021 let params = &storage_configuration.parameters;
2022
2023 let mut config = tokio_postgres::Config::new();
2024 config
2025 .host(&self.host)
2026 .port(self.port)
2027 .dbname(&self.database)
2028 .user(&self.user.get_string(in_task, secrets_reader).await?)
2029 .ssl_mode(self.tls_mode);
2030 if let Some(password) = self.password {
2031 let password = secrets_reader
2032 .read_string_in_task_if(in_task, password)
2033 .await?;
2034 config.password(password);
2035 }
2036 if let Some(tls_root_cert) = &self.tls_root_cert {
2037 let tls_root_cert = tls_root_cert.get_string(in_task, secrets_reader).await?;
2038 config.ssl_root_cert(tls_root_cert.as_bytes());
2039 }
2040 if let Some(tls_identity) = &self.tls_identity {
2041 let cert = tls_identity
2042 .cert
2043 .get_string(in_task, secrets_reader)
2044 .await?;
2045 let key = secrets_reader
2046 .read_string_in_task_if(in_task, tls_identity.key)
2047 .await?;
2048 config.ssl_cert(cert.as_bytes()).ssl_key(key.as_bytes());
2049 }
2050
2051 if let Some(connect_timeout) = params.pg_source_connect_timeout {
2052 config.connect_timeout(connect_timeout);
2053 }
2054 if let Some(keepalives_retries) = params.pg_source_tcp_keepalives_retries {
2055 config.keepalives_retries(keepalives_retries);
2056 }
2057 if let Some(keepalives_idle) = params.pg_source_tcp_keepalives_idle {
2058 config.keepalives_idle(keepalives_idle);
2059 }
2060 if let Some(keepalives_interval) = params.pg_source_tcp_keepalives_interval {
2061 config.keepalives_interval(keepalives_interval);
2062 }
2063 if let Some(tcp_user_timeout) = params.pg_source_tcp_user_timeout {
2064 config.tcp_user_timeout(tcp_user_timeout);
2065 }
2066
2067 let mut options = vec![];
2068 if let Some(wal_sender_timeout) = params.pg_source_wal_sender_timeout {
2069 options.push(format!(
2070 "--wal_sender_timeout={}",
2071 wal_sender_timeout.as_millis()
2072 ));
2073 };
2074 if params.pg_source_tcp_configure_server {
2075 if let Some(keepalives_retries) = params.pg_source_tcp_keepalives_retries {
2076 options.push(format!("--tcp_keepalives_count={}", keepalives_retries));
2077 }
2078 if let Some(keepalives_idle) = params.pg_source_tcp_keepalives_idle {
2079 options.push(format!(
2080 "--tcp_keepalives_idle={}",
2081 keepalives_idle.as_secs()
2082 ));
2083 }
2084 if let Some(keepalives_interval) = params.pg_source_tcp_keepalives_interval {
2085 options.push(format!(
2086 "--tcp_keepalives_interval={}",
2087 keepalives_interval.as_secs()
2088 ));
2089 }
2090 if let Some(tcp_user_timeout) = params.pg_source_tcp_user_timeout {
2091 options.push(format!(
2092 "--tcp_user_timeout={}",
2093 tcp_user_timeout.as_millis()
2094 ));
2095 }
2096 }
2097 config.options(options.join(" ").as_str());
2098
2099 let tunnel = match &self.tunnel {
2100 Tunnel::Direct => {
2101 let resolved = resolve_address(
2103 &self.host,
2104 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2105 )
2106 .await?;
2107 mz_postgres_util::TunnelConfig::Direct {
2108 resolved_ips: Some(resolved),
2109 }
2110 }
2111 Tunnel::Ssh(SshTunnel {
2112 connection_id,
2113 connection,
2114 }) => {
2115 let secret = secrets_reader
2116 .read_in_task_if(in_task, *connection_id)
2117 .await?;
2118 let key_pair = SshKeyPair::from_bytes(&secret)?;
2119 let resolved = resolve_address(
2121 &connection.host,
2122 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2123 )
2124 .await?;
2125 mz_postgres_util::TunnelConfig::Ssh {
2126 config: SshTunnelConfig {
2127 host: resolved
2128 .iter()
2129 .map(|a| a.to_string())
2130 .collect::<BTreeSet<_>>(),
2131 port: connection.port,
2132 user: connection.user.clone(),
2133 key_pair,
2134 },
2135 }
2136 }
2137 Tunnel::AwsPrivatelink(connection) => {
2138 assert_none!(connection.port);
2139 mz_postgres_util::TunnelConfig::AwsPrivatelink {
2140 connection_id: connection.connection_id,
2141 }
2142 }
2143 Tunnel::AwsPrivatelinks(_) => {
2144 unreachable!("MATCHING broker rules are only available for Kafka connections.");
2145 }
2146 };
2147
2148 Ok(mz_postgres_util::Config::new(
2149 config,
2150 tunnel,
2151 params.ssh_timeout_config,
2152 in_task,
2153 )?)
2154 }
2155
2156 pub async fn validate(
2157 &self,
2158 _id: CatalogItemId,
2159 storage_configuration: &StorageConfiguration,
2160 ) -> Result<mz_postgres_util::Client, anyhow::Error> {
2161 let config = self
2162 .config(
2163 &storage_configuration.connection_context.secrets_reader,
2164 storage_configuration,
2165 InTask::No,
2167 )
2168 .await?;
2169 let client = config
2170 .connect(
2171 "connection validation",
2172 &storage_configuration.connection_context.ssh_tunnel_manager,
2173 )
2174 .await?;
2175
2176 let wal_level = mz_postgres_util::get_wal_level(&client).await?;
2177
2178 if wal_level < mz_postgres_util::replication::WalLevel::Logical {
2179 Err(PostgresConnectionValidationError::InsufficientWalLevel { wal_level })?;
2180 }
2181
2182 let max_wal_senders = mz_postgres_util::get_max_wal_senders(&client).await?;
2183
2184 if max_wal_senders < 1 {
2185 Err(PostgresConnectionValidationError::ReplicationDisabled)?;
2186 }
2187
2188 let available_replication_slots =
2189 mz_postgres_util::available_replication_slots(&client).await?;
2190
2191 if available_replication_slots < 2 {
2193 Err(
2194 PostgresConnectionValidationError::InsufficientReplicationSlotsAvailable {
2195 count: 2,
2196 },
2197 )?;
2198 }
2199
2200 Ok(client)
2201 }
2202}
2203
2204#[derive(Debug, Clone, thiserror::Error)]
2205pub enum PostgresConnectionValidationError {
2206 #[error("PostgreSQL server has insufficient number of replication slots available")]
2207 InsufficientReplicationSlotsAvailable { count: usize },
2208 #[error("server must have wal_level >= logical, but has {wal_level}")]
2209 InsufficientWalLevel {
2210 wal_level: mz_postgres_util::replication::WalLevel,
2211 },
2212 #[error("replication disabled on server")]
2213 ReplicationDisabled,
2214}
2215
2216impl PostgresConnectionValidationError {
2217 pub fn detail(&self) -> Option<String> {
2218 match self {
2219 Self::InsufficientReplicationSlotsAvailable { count } => Some(format!(
2220 "executing this statement requires {} replication slot{}",
2221 count,
2222 if *count == 1 { "" } else { "s" }
2223 )),
2224 _ => None,
2225 }
2226 }
2227
2228 pub fn hint(&self) -> Option<String> {
2229 match self {
2230 Self::InsufficientReplicationSlotsAvailable { .. } => Some(
2231 "you might be able to wait for other sources to finish snapshotting and try again"
2232 .into(),
2233 ),
2234 Self::ReplicationDisabled => Some("set max_wal_senders to a value > 0".into()),
2235 Self::InsufficientWalLevel { .. } => None,
2236 }
2237 }
2238}
2239
2240impl<C: ConnectionAccess> AlterCompatible for PostgresConnection<C> {
2241 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2242 let PostgresConnection {
2243 tunnel,
2244 host: _,
2246 port: _,
2247 database: _,
2248 user: _,
2249 password: _,
2250 tls_mode: _,
2251 tls_root_cert: _,
2252 tls_identity: _,
2253 } = self;
2254
2255 let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
2256
2257 for (compatible, field) in compatibility_checks {
2258 if !compatible {
2259 tracing::warn!(
2260 "PostgresConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2261 self,
2262 other
2263 );
2264
2265 return Err(AlterError { id });
2266 }
2267 }
2268 Ok(())
2269 }
2270}
2271
2272#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2274pub enum Tunnel<C: ConnectionAccess = InlinedConnection> {
2275 Direct,
2277 Ssh(SshTunnel<C>),
2279 AwsPrivatelink(AwsPrivatelink),
2281 AwsPrivatelinks(AwsPrivatelinks),
2282}
2283
2284impl<R: ConnectionResolver> IntoInlineConnection<Tunnel, R> for Tunnel<ReferencedConnection> {
2285 fn into_inline_connection(self, r: R) -> Tunnel {
2286 match self {
2287 Tunnel::Direct => Tunnel::Direct,
2288 Tunnel::Ssh(ssh) => Tunnel::Ssh(ssh.into_inline_connection(r)),
2289 Tunnel::AwsPrivatelink(awspl) => Tunnel::AwsPrivatelink(awspl),
2290 Tunnel::AwsPrivatelinks(x) => Tunnel::AwsPrivatelinks(x),
2291 }
2292 }
2293}
2294
2295impl<C: ConnectionAccess> AlterCompatible for Tunnel<C> {
2296 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2297 let compatible = match (self, other) {
2298 (Self::Ssh(s), Self::Ssh(o)) => s.alter_compatible(id, o).is_ok(),
2299 (s, o) => s == o,
2300 };
2301
2302 if !compatible {
2303 tracing::warn!(
2304 "Tunnel incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
2305 self,
2306 other
2307 );
2308
2309 return Err(AlterError { id });
2310 }
2311
2312 Ok(())
2313 }
2314}
2315
2316#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2320pub enum MySqlSslMode {
2321 Disabled,
2322 Required,
2323 VerifyCa,
2324 VerifyIdentity,
2325}
2326
2327#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2329pub struct MySqlConnection<C: ConnectionAccess = InlinedConnection> {
2330 pub host: String,
2332 pub port: u16,
2334 pub user: StringOrSecret,
2336 pub password: Option<CatalogItemId>,
2338 pub tunnel: Tunnel<C>,
2340 pub tls_mode: MySqlSslMode,
2342 pub tls_root_cert: Option<StringOrSecret>,
2345 pub tls_identity: Option<TlsIdentity>,
2347 pub aws_connection: Option<AwsConnectionReference<C>>,
2350}
2351
2352impl<R: ConnectionResolver> IntoInlineConnection<MySqlConnection, R>
2353 for MySqlConnection<ReferencedConnection>
2354{
2355 fn into_inline_connection(self, r: R) -> MySqlConnection {
2356 let MySqlConnection {
2357 host,
2358 port,
2359 user,
2360 password,
2361 tunnel,
2362 tls_mode,
2363 tls_root_cert,
2364 tls_identity,
2365 aws_connection,
2366 } = self;
2367
2368 MySqlConnection {
2369 host,
2370 port,
2371 user,
2372 password,
2373 tunnel: tunnel.into_inline_connection(&r),
2374 tls_mode,
2375 tls_root_cert,
2376 tls_identity,
2377 aws_connection: aws_connection.map(|aws| aws.into_inline_connection(&r)),
2378 }
2379 }
2380}
2381
2382impl<C: ConnectionAccess> MySqlConnection<C> {
2383 fn validate_by_default(&self) -> bool {
2384 true
2385 }
2386}
2387
2388impl MySqlConnection<InlinedConnection> {
2389 pub async fn config(
2390 &self,
2391 secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
2392 storage_configuration: &StorageConfiguration,
2393 in_task: InTask,
2394 ) -> Result<mz_mysql_util::Config, anyhow::Error> {
2395 let mut opts = mysql_async::OptsBuilder::default()
2397 .ip_or_hostname(&self.host)
2398 .tcp_port(self.port)
2399 .user(Some(&self.user.get_string(in_task, secrets_reader).await?));
2400
2401 if let Some(password) = self.password {
2402 let password = secrets_reader
2403 .read_string_in_task_if(in_task, password)
2404 .await?;
2405 opts = opts.pass(Some(password));
2406 }
2407
2408 let mut ssl_opts = match self.tls_mode {
2413 MySqlSslMode::Disabled => None,
2414 MySqlSslMode::Required => Some(
2415 mysql_async::SslOpts::default()
2416 .with_danger_accept_invalid_certs(true)
2417 .with_danger_skip_domain_validation(true),
2418 ),
2419 MySqlSslMode::VerifyCa => {
2420 Some(mysql_async::SslOpts::default().with_danger_skip_domain_validation(true))
2421 }
2422 MySqlSslMode::VerifyIdentity => Some(mysql_async::SslOpts::default()),
2423 };
2424
2425 if matches!(
2426 self.tls_mode,
2427 MySqlSslMode::VerifyCa | MySqlSslMode::VerifyIdentity
2428 ) {
2429 if let Some(tls_root_cert) = &self.tls_root_cert {
2430 let tls_root_cert = tls_root_cert.get_string(in_task, secrets_reader).await?;
2431 ssl_opts = ssl_opts.map(|opts| {
2432 opts.with_root_certs(vec![tls_root_cert.as_bytes().to_vec().into()])
2433 });
2434 }
2435 }
2436
2437 if let Some(identity) = &self.tls_identity {
2438 let key = secrets_reader
2439 .read_string_in_task_if(in_task, identity.key)
2440 .await?;
2441 let cert = identity.cert.get_string(in_task, secrets_reader).await?;
2442 let (der, pass) =
2443 mz_tls_util::pkcs12der_from_pem(key.as_bytes(), cert.as_bytes())?.into_parts();
2444
2445 ssl_opts = ssl_opts.map(|opts| {
2447 opts.with_client_identity(Some(
2448 mysql_async::ClientIdentity::new(der.into()).with_password(pass),
2449 ))
2450 });
2451 }
2452
2453 opts = opts.ssl_opts(ssl_opts);
2454
2455 let tunnel = match &self.tunnel {
2456 Tunnel::Direct => {
2457 let resolved = resolve_address(
2459 &self.host,
2460 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2461 )
2462 .await?;
2463 mz_mysql_util::TunnelConfig::Direct {
2464 resolved_ips: Some(resolved),
2465 }
2466 }
2467 Tunnel::Ssh(SshTunnel {
2468 connection_id,
2469 connection,
2470 }) => {
2471 let secret = secrets_reader
2472 .read_in_task_if(in_task, *connection_id)
2473 .await?;
2474 let key_pair = SshKeyPair::from_bytes(&secret)?;
2475 let resolved = resolve_address(
2477 &connection.host,
2478 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2479 )
2480 .await?;
2481 mz_mysql_util::TunnelConfig::Ssh {
2482 config: SshTunnelConfig {
2483 host: resolved
2484 .iter()
2485 .map(|a| a.to_string())
2486 .collect::<BTreeSet<_>>(),
2487 port: connection.port,
2488 user: connection.user.clone(),
2489 key_pair,
2490 },
2491 }
2492 }
2493 Tunnel::AwsPrivatelink(connection) => {
2494 assert_none!(connection.port);
2495 mz_mysql_util::TunnelConfig::AwsPrivatelink {
2496 connection_id: connection.connection_id,
2497 }
2498 }
2499 Tunnel::AwsPrivatelinks(_) => {
2500 unreachable!("MATCHING broker rules are only available for Kafka connections.");
2501 }
2502 };
2503
2504 let aws_config = match self.aws_connection.as_ref() {
2505 None => None,
2506 Some(aws_ref) => Some(
2507 aws_ref
2508 .connection
2509 .load_sdk_config(
2510 &storage_configuration.connection_context,
2511 aws_ref.connection_id,
2512 in_task,
2513 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2514 )
2515 .await?,
2516 ),
2517 };
2518
2519 Ok(mz_mysql_util::Config::new(
2520 opts,
2521 tunnel,
2522 storage_configuration.parameters.ssh_timeout_config,
2523 in_task,
2524 storage_configuration
2525 .parameters
2526 .mysql_source_timeouts
2527 .clone(),
2528 aws_config,
2529 )?)
2530 }
2531
2532 pub async fn validate(
2533 &self,
2534 _id: CatalogItemId,
2535 storage_configuration: &StorageConfiguration,
2536 ) -> Result<MySqlConn, MySqlConnectionValidationError> {
2537 let config = self
2538 .config(
2539 &storage_configuration.connection_context.secrets_reader,
2540 storage_configuration,
2541 InTask::No,
2543 )
2544 .await?;
2545 let mut conn = config
2546 .connect(
2547 "connection validation",
2548 &storage_configuration.connection_context.ssh_tunnel_manager,
2549 )
2550 .await?;
2551
2552 let mut setting_errors = vec![];
2554 let gtid_res = mz_mysql_util::ensure_gtid_consistency(&mut conn).await;
2555 let binlog_res = mz_mysql_util::ensure_full_row_binlog_format(&mut conn).await;
2556 let order_res = mz_mysql_util::ensure_replication_commit_order(&mut conn).await;
2557 for res in [gtid_res, binlog_res, order_res] {
2558 match res {
2559 Err(MySqlError::InvalidSystemSetting {
2560 setting,
2561 expected,
2562 actual,
2563 }) => {
2564 setting_errors.push((setting, expected, actual));
2565 }
2566 Err(err) => Err(err)?,
2567 Ok(()) => {}
2568 }
2569 }
2570 if !setting_errors.is_empty() {
2571 Err(MySqlConnectionValidationError::ReplicationSettingsError(
2572 setting_errors,
2573 ))?;
2574 }
2575
2576 Ok(conn)
2577 }
2578}
2579
2580#[derive(Debug, thiserror::Error)]
2581pub enum MySqlConnectionValidationError {
2582 #[error("Invalid MySQL system replication settings")]
2583 ReplicationSettingsError(Vec<(String, String, String)>),
2584 #[error(transparent)]
2585 Client(#[from] MySqlError),
2586 #[error("{}", .0.display_with_causes())]
2587 Other(#[from] anyhow::Error),
2588}
2589
2590impl MySqlConnectionValidationError {
2591 pub fn detail(&self) -> Option<String> {
2592 match self {
2593 Self::ReplicationSettingsError(settings) => Some(format!(
2594 "Invalid MySQL system replication settings: {}",
2595 itertools::join(
2596 settings.iter().map(|(setting, expected, actual)| format!(
2597 "{}: expected {}, got {}",
2598 setting, expected, actual
2599 )),
2600 "; "
2601 )
2602 )),
2603 _ => None,
2604 }
2605 }
2606
2607 pub fn hint(&self) -> Option<String> {
2608 match self {
2609 Self::ReplicationSettingsError(_) => {
2610 Some("Set the necessary MySQL database system settings.".into())
2611 }
2612 _ => None,
2613 }
2614 }
2615}
2616
2617impl<C: ConnectionAccess> AlterCompatible for MySqlConnection<C> {
2618 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2619 let MySqlConnection {
2620 tunnel,
2621 host: _,
2623 port: _,
2624 user: _,
2625 password: _,
2626 tls_mode: _,
2627 tls_root_cert: _,
2628 tls_identity: _,
2629 aws_connection: _,
2630 } = self;
2631
2632 let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
2633
2634 for (compatible, field) in compatibility_checks {
2635 if !compatible {
2636 tracing::warn!(
2637 "MySqlConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2638 self,
2639 other
2640 );
2641
2642 return Err(AlterError { id });
2643 }
2644 }
2645 Ok(())
2646 }
2647}
2648
2649#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2656pub struct SqlServerConnectionDetails<C: ConnectionAccess = InlinedConnection> {
2657 pub host: String,
2659 pub port: u16,
2661 pub database: String,
2663 pub user: StringOrSecret,
2665 pub password: CatalogItemId,
2667 pub tunnel: Tunnel<C>,
2669 pub encryption: mz_sql_server_util::config::EncryptionLevel,
2671 pub certificate_validation_policy: mz_sql_server_util::config::CertificateValidationPolicy,
2673 pub tls_root_cert: Option<StringOrSecret>,
2675}
2676
2677impl<C: ConnectionAccess> SqlServerConnectionDetails<C> {
2678 fn validate_by_default(&self) -> bool {
2679 true
2680 }
2681}
2682
2683impl SqlServerConnectionDetails<InlinedConnection> {
2684 pub async fn validate(
2686 &self,
2687 _id: CatalogItemId,
2688 storage_configuration: &StorageConfiguration,
2689 ) -> Result<mz_sql_server_util::Client, anyhow::Error> {
2690 let config = self
2691 .resolve_config(
2692 &storage_configuration.connection_context.secrets_reader,
2693 storage_configuration,
2694 InTask::No,
2695 )
2696 .await?;
2697 tracing::debug!(?config, "Validating SQL Server connection");
2698
2699 let mut client = mz_sql_server_util::Client::connect(config).await?;
2700
2701 let mut replication_errors = vec![];
2706 for error in [
2707 mz_sql_server_util::inspect::ensure_database_cdc_enabled(&mut client).await,
2708 mz_sql_server_util::inspect::ensure_snapshot_isolation_enabled(&mut client).await,
2709 mz_sql_server_util::inspect::ensure_sql_server_agent_running(&mut client).await,
2710 ] {
2711 match error {
2712 Err(mz_sql_server_util::SqlServerError::InvalidSystemSetting {
2713 name,
2714 expected,
2715 actual,
2716 }) => replication_errors.push((name, expected, actual)),
2717 Err(other) => Err(other)?,
2718 Ok(()) => (),
2719 }
2720 }
2721 if !replication_errors.is_empty() {
2722 Err(SqlServerConnectionValidationError::ReplicationSettingsError(replication_errors))?;
2723 }
2724
2725 Ok(client)
2726 }
2727
2728 pub async fn resolve_config(
2738 &self,
2739 secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
2740 storage_configuration: &StorageConfiguration,
2741 in_task: InTask,
2742 ) -> Result<mz_sql_server_util::Config, anyhow::Error> {
2743 let dyncfg = storage_configuration.config_set();
2744 let mut inner_config = tiberius::Config::new();
2745
2746 inner_config.host(&self.host);
2748 inner_config.port(self.port);
2749 inner_config.database(self.database.clone());
2750 inner_config.encryption(self.encryption.into());
2751 match self.certificate_validation_policy {
2752 mz_sql_server_util::config::CertificateValidationPolicy::TrustAll => {
2753 inner_config.trust_cert()
2754 }
2755 mz_sql_server_util::config::CertificateValidationPolicy::VerifyCA => {
2756 inner_config.trust_cert_ca_pem(
2757 self.tls_root_cert
2758 .as_ref()
2759 .unwrap()
2760 .get_string(in_task, secrets_reader)
2761 .await
2762 .context("ca certificate")?,
2763 );
2764 }
2765 mz_sql_server_util::config::CertificateValidationPolicy::VerifySystem => (), }
2767
2768 inner_config.application_name("materialize");
2769
2770 let user = self
2772 .user
2773 .get_string(in_task, secrets_reader)
2774 .await
2775 .context("username")?;
2776 let password = secrets_reader
2777 .read_string_in_task_if(in_task, self.password)
2778 .await
2779 .context("password")?;
2780 inner_config.authentication(tiberius::AuthMethod::sql_server(user, password));
2783
2784 let enforce_external_addresses = ENFORCE_EXTERNAL_ADDRESSES.get(dyncfg);
2787
2788 let tunnel = match &self.tunnel {
2789 Tunnel::Direct => {
2790 let resolved_addresses: Vec<SocketAddr> =
2791 resolve_address(&self.host, enforce_external_addresses)
2792 .await?
2793 .into_iter()
2794 .map(|ip| SocketAddr::new(ip, self.port))
2795 .collect();
2796 mz_sql_server_util::config::TunnelConfig::Direct {
2797 resolved_addresses: resolved_addresses.into_boxed_slice(),
2798 }
2799 }
2800 Tunnel::Ssh(SshTunnel {
2801 connection_id,
2802 connection: ssh_connection,
2803 }) => {
2804 let secret = secrets_reader
2805 .read_in_task_if(in_task, *connection_id)
2806 .await
2807 .context("ssh secret")?;
2808 let key_pair = SshKeyPair::from_bytes(&secret).context("ssh key pair")?;
2809 let addresses = resolve_address(&ssh_connection.host, enforce_external_addresses)
2812 .await
2813 .context("ssh tunnel")?;
2814
2815 let config = SshTunnelConfig {
2816 host: addresses.into_iter().map(|a| a.to_string()).collect(),
2817 port: ssh_connection.port,
2818 user: ssh_connection.user.clone(),
2819 key_pair,
2820 };
2821 mz_sql_server_util::config::TunnelConfig::Ssh {
2822 config,
2823 manager: storage_configuration
2824 .connection_context
2825 .ssh_tunnel_manager
2826 .clone(),
2827 timeout: storage_configuration.parameters.ssh_timeout_config.clone(),
2828 host: self.host.clone(),
2829 port: self.port,
2830 }
2831 }
2832 Tunnel::AwsPrivatelink(private_link_connection) => {
2833 assert_none!(private_link_connection.port);
2834 mz_sql_server_util::config::TunnelConfig::AwsPrivatelink {
2835 connection_id: private_link_connection.connection_id,
2836 port: self.port,
2837 }
2838 }
2839 Tunnel::AwsPrivatelinks(_) => {
2840 unreachable!("MATCHING broker rules are only available for Kafka connections.");
2841 }
2842 };
2843
2844 Ok(mz_sql_server_util::Config::new(
2845 inner_config,
2846 tunnel,
2847 in_task,
2848 ))
2849 }
2850}
2851
2852#[derive(Debug, Clone, thiserror::Error)]
2853pub enum SqlServerConnectionValidationError {
2854 #[error("Invalid SQL Server system replication settings")]
2855 ReplicationSettingsError(Vec<(String, String, String)>),
2856}
2857
2858impl SqlServerConnectionValidationError {
2859 pub fn detail(&self) -> Option<String> {
2860 match self {
2861 Self::ReplicationSettingsError(settings) => Some(format!(
2862 "Invalid SQL Server system replication settings: {}",
2863 itertools::join(
2864 settings.iter().map(|(setting, expected, actual)| format!(
2865 "{}: expected {}, got {}",
2866 setting, expected, actual
2867 )),
2868 "; "
2869 )
2870 )),
2871 }
2872 }
2873
2874 pub fn hint(&self) -> Option<String> {
2875 match self {
2876 _ => None,
2877 }
2878 }
2879}
2880
2881impl<R: ConnectionResolver> IntoInlineConnection<SqlServerConnectionDetails, R>
2882 for SqlServerConnectionDetails<ReferencedConnection>
2883{
2884 fn into_inline_connection(self, r: R) -> SqlServerConnectionDetails {
2885 let SqlServerConnectionDetails {
2886 host,
2887 port,
2888 database,
2889 user,
2890 password,
2891 tunnel,
2892 encryption,
2893 certificate_validation_policy,
2894 tls_root_cert,
2895 } = self;
2896
2897 SqlServerConnectionDetails {
2898 host,
2899 port,
2900 database,
2901 user,
2902 password,
2903 tunnel: tunnel.into_inline_connection(&r),
2904 encryption,
2905 certificate_validation_policy,
2906 tls_root_cert,
2907 }
2908 }
2909}
2910
2911impl<C: ConnectionAccess> AlterCompatible for SqlServerConnectionDetails<C> {
2912 fn alter_compatible(
2913 &self,
2914 id: mz_repr::GlobalId,
2915 other: &Self,
2916 ) -> Result<(), crate::controller::AlterError> {
2917 let SqlServerConnectionDetails {
2918 tunnel,
2919 host: _,
2921 port: _,
2922 database: _,
2923 user: _,
2924 password: _,
2925 encryption: _,
2926 certificate_validation_policy: _,
2927 tls_root_cert: _,
2928 } = self;
2929
2930 let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
2931
2932 for (compatible, field) in compatibility_checks {
2933 if !compatible {
2934 tracing::warn!(
2935 "SqlServerConnectionDetails incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2936 self,
2937 other
2938 );
2939
2940 return Err(AlterError { id });
2941 }
2942 }
2943 Ok(())
2944 }
2945}
2946
2947#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2949pub struct SshConnection {
2950 pub host: String,
2951 pub port: u16,
2952 pub user: String,
2953}
2954
2955use self::inline::{
2956 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
2957 ReferencedConnection,
2958};
2959
2960impl AlterCompatible for SshConnection {
2961 fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
2962 Ok(())
2964 }
2965}
2966
2967#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2969pub struct AwsPrivatelink {
2970 pub connection_id: CatalogItemId,
2972 pub availability_zone: Option<String>,
2974 pub port: Option<u16>,
2977}
2978
2979impl AlterCompatible for AwsPrivatelink {
2980 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2981 let AwsPrivatelink {
2982 connection_id,
2983 availability_zone: _,
2984 port: _,
2985 } = self;
2986
2987 let compatibility_checks = [(connection_id == &other.connection_id, "connection_id")];
2988
2989 for (compatible, field) in compatibility_checks {
2990 if !compatible {
2991 tracing::warn!(
2992 "AwsPrivatelink incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2993 self,
2994 other
2995 );
2996
2997 return Err(AlterError { id });
2998 }
2999 }
3000
3001 Ok(())
3002 }
3003}
3004
3005#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
3006pub struct AwsPrivatelinks {
3007 pub rules: Vec<AwsPrivatelinkRule>,
3011}
3012
3013#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
3014pub struct AwsPrivatelinkRule {
3015 pub pattern: ConnectionRulePattern,
3017 pub to: AwsPrivatelink,
3019}
3020
3021#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
3023pub struct SshTunnel<C: ConnectionAccess = InlinedConnection> {
3024 pub connection_id: CatalogItemId,
3026 pub connection: C::Ssh,
3028}
3029
3030impl<R: ConnectionResolver> IntoInlineConnection<SshTunnel, R> for SshTunnel<ReferencedConnection> {
3031 fn into_inline_connection(self, r: R) -> SshTunnel {
3032 let SshTunnel {
3033 connection,
3034 connection_id,
3035 } = self;
3036
3037 SshTunnel {
3038 connection: r.resolve_connection(connection).unwrap_ssh(),
3039 connection_id,
3040 }
3041 }
3042}
3043
3044impl SshTunnel<InlinedConnection> {
3045 async fn connect(
3048 &self,
3049 storage_configuration: &StorageConfiguration,
3050 remote_host: &str,
3051 remote_port: u16,
3052 in_task: InTask,
3053 ) -> Result<ManagedSshTunnelHandle, anyhow::Error> {
3054 let resolved = resolve_address(
3056 &self.connection.host,
3057 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
3058 )
3059 .await?;
3060 storage_configuration
3061 .connection_context
3062 .ssh_tunnel_manager
3063 .connect(
3064 SshTunnelConfig {
3065 host: resolved
3066 .iter()
3067 .map(|a| a.to_string())
3068 .collect::<BTreeSet<_>>(),
3069 port: self.connection.port,
3070 user: self.connection.user.clone(),
3071 key_pair: SshKeyPair::from_bytes(
3072 &storage_configuration
3073 .connection_context
3074 .secrets_reader
3075 .read_in_task_if(in_task, self.connection_id)
3076 .await?,
3077 )?,
3078 },
3079 remote_host,
3080 remote_port,
3081 storage_configuration.parameters.ssh_timeout_config,
3082 in_task,
3083 )
3084 .await
3085 }
3086}
3087
3088impl<C: ConnectionAccess> AlterCompatible for SshTunnel<C> {
3089 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
3090 let SshTunnel {
3091 connection_id,
3092 connection,
3093 } = self;
3094
3095 let compatibility_checks = [
3096 (connection_id == &other.connection_id, "connection_id"),
3097 (
3098 connection.alter_compatible(id, &other.connection).is_ok(),
3099 "connection",
3100 ),
3101 ];
3102
3103 for (compatible, field) in compatibility_checks {
3104 if !compatible {
3105 tracing::warn!(
3106 "SshTunnel incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
3107 self,
3108 other
3109 );
3110
3111 return Err(AlterError { id });
3112 }
3113 }
3114
3115 Ok(())
3116 }
3117}
3118
3119impl SshConnection {
3120 #[allow(clippy::unused_async)]
3121 async fn validate(
3122 &self,
3123 id: CatalogItemId,
3124 storage_configuration: &StorageConfiguration,
3125 ) -> Result<(), anyhow::Error> {
3126 let secret = storage_configuration
3127 .connection_context
3128 .secrets_reader
3129 .read_in_task_if(
3130 InTask::No,
3132 id,
3133 )
3134 .await?;
3135 let key_pair = SshKeyPair::from_bytes(&secret)?;
3136
3137 let resolved = resolve_address(
3139 &self.host,
3140 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
3141 )
3142 .await?;
3143
3144 let config = SshTunnelConfig {
3145 host: resolved
3146 .iter()
3147 .map(|a| a.to_string())
3148 .collect::<BTreeSet<_>>(),
3149 port: self.port,
3150 user: self.user.clone(),
3151 key_pair,
3152 };
3153 config
3156 .validate(storage_configuration.parameters.ssh_timeout_config)
3157 .await
3158 }
3159
3160 fn validate_by_default(&self) -> bool {
3161 false
3162 }
3163}
3164
3165impl AwsPrivatelinkConnection {
3166 #[allow(clippy::unused_async)]
3167 async fn validate(
3168 &self,
3169 id: CatalogItemId,
3170 storage_configuration: &StorageConfiguration,
3171 ) -> Result<(), anyhow::Error> {
3172 let Some(ref cloud_resource_reader) = storage_configuration
3173 .connection_context
3174 .cloud_resource_reader
3175 else {
3176 return Err(anyhow!("AWS PrivateLink connections are unsupported"));
3177 };
3178
3179 let status = cloud_resource_reader.read(id).await?;
3181
3182 let availability = status
3183 .conditions
3184 .as_ref()
3185 .and_then(|conditions| conditions.iter().find(|c| c.type_ == "Available"));
3186
3187 match availability {
3188 Some(condition) if condition.status == "True" => Ok(()),
3189 Some(condition) => Err(anyhow!("{}", condition.message)),
3190 None => Err(anyhow!("Endpoint availability is unknown")),
3191 }
3192 }
3193
3194 fn validate_by_default(&self) -> bool {
3195 false
3196 }
3197}