1use std::borrow::Cow;
13use std::collections::{BTreeMap, BTreeSet};
14use std::net::SocketAddr;
15use std::sync::Arc;
16
17use anyhow::{Context, anyhow};
18use iceberg::{Catalog, CatalogBuilder};
19use iceberg_catalog_rest::{
20 REST_CATALOG_PROP_URI, REST_CATALOG_PROP_WAREHOUSE, RestCatalogBuilder,
21};
22use itertools::Itertools;
23use mz_ccsr::tls::{Certificate, Identity};
24use mz_cloud_resources::{AwsExternalIdPrefix, CloudResourceReader, vpc_endpoint_host};
25use mz_dyncfg::ConfigSet;
26use mz_kafka_util::client::{
27 BrokerAddr, BrokerRewrite, MzClientContext, MzKafkaError, TunnelConfig, TunnelingClientContext,
28};
29use mz_mysql_util::{MySqlConn, MySqlError};
30use mz_ore::assert_none;
31use mz_ore::error::ErrorExt;
32use mz_ore::future::{InTask, OreFutureExt};
33use mz_ore::netio::DUMMY_DNS_PORT;
34use mz_ore::netio::resolve_address;
35use mz_ore::num::NonNeg;
36use mz_repr::{CatalogItemId, GlobalId};
37use mz_secrets::SecretsReader;
38use mz_ssh_util::keys::SshKeyPair;
39use mz_ssh_util::tunnel::SshTunnelConfig;
40use mz_ssh_util::tunnel_manager::{ManagedSshTunnelHandle, SshTunnelManager};
41use mz_tls_util::Pkcs12Archive;
42use mz_tracing::CloneableEnvFilter;
43use rdkafka::ClientContext;
44use rdkafka::config::FromClientConfigAndContext;
45use rdkafka::consumer::{BaseConsumer, Consumer};
46use regex::Regex;
47use serde::{Deserialize, Deserializer, Serialize};
48use tokio::net;
49use tokio::runtime::Handle;
50use tokio_postgres::config::SslMode;
51use tracing::{debug, warn};
52use url::Url;
53
54use crate::AlterCompatible;
55use crate::configuration::StorageConfiguration;
56use crate::connections::aws::{
57 AwsConnection, AwsConnectionReference, AwsConnectionValidationError,
58};
59use crate::connections::string_or_secret::StringOrSecret;
60use crate::controller::AlterError;
61use crate::dyncfgs::{
62 ENFORCE_EXTERNAL_ADDRESSES, KAFKA_CLIENT_ID_ENRICHMENT_RULES,
63 KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM, KAFKA_RECONNECT_BACKOFF,
64 KAFKA_RECONNECT_BACKOFF_MAX, KAFKA_RETRY_BACKOFF, KAFKA_RETRY_BACKOFF_MAX,
65};
66use crate::errors::{ContextCreationError, CsrConnectError};
67
68pub mod aws;
69pub mod inline;
70pub mod string_or_secret;
71
72const REST_CATALOG_PROP_SCOPE: &str = "scope";
73const REST_CATALOG_PROP_CREDENTIAL: &str = "credential";
74
75#[async_trait::async_trait]
77trait SecretsReaderExt {
78 async fn read_in_task_if(
80 &self,
81 in_task: InTask,
82 id: CatalogItemId,
83 ) -> Result<Vec<u8>, anyhow::Error>;
84
85 async fn read_string_in_task_if(
87 &self,
88 in_task: InTask,
89 id: CatalogItemId,
90 ) -> Result<String, anyhow::Error>;
91}
92
93#[async_trait::async_trait]
94impl SecretsReaderExt for Arc<dyn SecretsReader> {
95 async fn read_in_task_if(
96 &self,
97 in_task: InTask,
98 id: CatalogItemId,
99 ) -> Result<Vec<u8>, anyhow::Error> {
100 let sr = Arc::clone(self);
101 async move { sr.read(id).await }
102 .run_in_task_if(in_task, || "secrets_reader_read".to_string())
103 .await
104 }
105 async fn read_string_in_task_if(
106 &self,
107 in_task: InTask,
108 id: CatalogItemId,
109 ) -> Result<String, anyhow::Error> {
110 let sr = Arc::clone(self);
111 async move { sr.read_string(id).await }
112 .run_in_task_if(in_task, || "secrets_reader_read".to_string())
113 .await
114 }
115}
116
117#[derive(Debug, Clone)]
122pub struct ConnectionContext {
123 pub environment_id: String,
130 pub librdkafka_log_level: tracing::Level,
132 pub aws_external_id_prefix: Option<AwsExternalIdPrefix>,
134 pub aws_connection_role_arn: Option<String>,
137 pub secrets_reader: Arc<dyn SecretsReader>,
139 pub cloud_resource_reader: Option<Arc<dyn CloudResourceReader>>,
141 pub ssh_tunnel_manager: SshTunnelManager,
143}
144
145impl ConnectionContext {
146 pub fn from_cli_args(
154 environment_id: String,
155 startup_log_level: &CloneableEnvFilter,
156 aws_external_id_prefix: Option<AwsExternalIdPrefix>,
157 aws_connection_role_arn: Option<String>,
158 secrets_reader: Arc<dyn SecretsReader>,
159 cloud_resource_reader: Option<Arc<dyn CloudResourceReader>>,
160 ) -> ConnectionContext {
161 ConnectionContext {
162 environment_id,
163 librdkafka_log_level: mz_ore::tracing::crate_level(
164 &startup_log_level.clone().into(),
165 "librdkafka",
166 ),
167 aws_external_id_prefix,
168 aws_connection_role_arn,
169 secrets_reader,
170 cloud_resource_reader,
171 ssh_tunnel_manager: SshTunnelManager::default(),
172 }
173 }
174
175 pub fn for_tests(secrets_reader: Arc<dyn SecretsReader>) -> ConnectionContext {
177 ConnectionContext {
178 environment_id: "test-environment-id".into(),
179 librdkafka_log_level: tracing::Level::INFO,
180 aws_external_id_prefix: Some(
181 AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable(
182 "test-aws-external-id-prefix",
183 )
184 .expect("infallible"),
185 ),
186 aws_connection_role_arn: Some(
187 "arn:aws:iam::123456789000:role/MaterializeConnection".into(),
188 ),
189 secrets_reader,
190 cloud_resource_reader: None,
191 ssh_tunnel_manager: SshTunnelManager::default(),
192 }
193 }
194}
195
196#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
197pub enum Connection<C: ConnectionAccess = InlinedConnection> {
198 Kafka(KafkaConnection<C>),
199 Csr(CsrConnection<C>),
200 Postgres(PostgresConnection<C>),
201 Ssh(SshConnection),
202 Aws(AwsConnection),
203 AwsPrivatelink(AwsPrivatelinkConnection),
204 MySql(MySqlConnection<C>),
205 SqlServer(SqlServerConnectionDetails<C>),
206 IcebergCatalog(IcebergCatalogConnection<C>),
207}
208
209impl<R: ConnectionResolver> IntoInlineConnection<Connection, R>
210 for Connection<ReferencedConnection>
211{
212 fn into_inline_connection(self, r: R) -> Connection {
213 match self {
214 Connection::Kafka(kafka) => Connection::Kafka(kafka.into_inline_connection(r)),
215 Connection::Csr(csr) => Connection::Csr(csr.into_inline_connection(r)),
216 Connection::Postgres(pg) => Connection::Postgres(pg.into_inline_connection(r)),
217 Connection::Ssh(ssh) => Connection::Ssh(ssh),
218 Connection::Aws(aws) => Connection::Aws(aws),
219 Connection::AwsPrivatelink(awspl) => Connection::AwsPrivatelink(awspl),
220 Connection::MySql(mysql) => Connection::MySql(mysql.into_inline_connection(r)),
221 Connection::SqlServer(sql_server) => {
222 Connection::SqlServer(sql_server.into_inline_connection(r))
223 }
224 Connection::IcebergCatalog(iceberg) => {
225 Connection::IcebergCatalog(iceberg.into_inline_connection(r))
226 }
227 }
228 }
229}
230
231impl<C: ConnectionAccess> Connection<C> {
232 pub fn validate_by_default(&self) -> bool {
234 match self {
235 Connection::Kafka(conn) => conn.validate_by_default(),
236 Connection::Csr(conn) => conn.validate_by_default(),
237 Connection::Postgres(conn) => conn.validate_by_default(),
238 Connection::Ssh(conn) => conn.validate_by_default(),
239 Connection::Aws(conn) => conn.validate_by_default(),
240 Connection::AwsPrivatelink(conn) => conn.validate_by_default(),
241 Connection::MySql(conn) => conn.validate_by_default(),
242 Connection::SqlServer(conn) => conn.validate_by_default(),
243 Connection::IcebergCatalog(conn) => conn.validate_by_default(),
244 }
245 }
246}
247
248impl Connection<InlinedConnection> {
249 pub async fn validate(
251 &self,
252 id: CatalogItemId,
253 storage_configuration: &StorageConfiguration,
254 ) -> Result<(), ConnectionValidationError> {
255 match self {
256 Connection::Kafka(conn) => conn.validate(id, storage_configuration).await?,
257 Connection::Csr(conn) => conn.validate(id, storage_configuration).await?,
258 Connection::Postgres(conn) => {
259 conn.validate(id, storage_configuration).await?;
260 }
261 Connection::Ssh(conn) => conn.validate(id, storage_configuration).await?,
262 Connection::Aws(conn) => conn.validate(id, storage_configuration).await?,
263 Connection::AwsPrivatelink(conn) => conn.validate(id, storage_configuration).await?,
264 Connection::MySql(conn) => {
265 conn.validate(id, storage_configuration).await?;
266 }
267 Connection::SqlServer(conn) => {
268 conn.validate(id, storage_configuration).await?;
269 }
270 Connection::IcebergCatalog(conn) => conn.validate(id, storage_configuration).await?,
271 }
272 Ok(())
273 }
274
275 pub fn unwrap_kafka(self) -> <InlinedConnection as ConnectionAccess>::Kafka {
276 match self {
277 Self::Kafka(conn) => conn,
278 o => unreachable!("{o:?} is not a Kafka connection"),
279 }
280 }
281
282 pub fn unwrap_pg(self) -> <InlinedConnection as ConnectionAccess>::Pg {
283 match self {
284 Self::Postgres(conn) => conn,
285 o => unreachable!("{o:?} is not a Postgres connection"),
286 }
287 }
288
289 pub fn unwrap_mysql(self) -> <InlinedConnection as ConnectionAccess>::MySql {
290 match self {
291 Self::MySql(conn) => conn,
292 o => unreachable!("{o:?} is not a MySQL connection"),
293 }
294 }
295
296 pub fn unwrap_sql_server(self) -> <InlinedConnection as ConnectionAccess>::SqlServer {
297 match self {
298 Self::SqlServer(conn) => conn,
299 o => unreachable!("{o:?} is not a SQL Server connection"),
300 }
301 }
302
303 pub fn unwrap_aws(self) -> <InlinedConnection as ConnectionAccess>::Aws {
304 match self {
305 Self::Aws(conn) => conn,
306 o => unreachable!("{o:?} is not an AWS connection"),
307 }
308 }
309
310 pub fn unwrap_ssh(self) -> <InlinedConnection as ConnectionAccess>::Ssh {
311 match self {
312 Self::Ssh(conn) => conn,
313 o => unreachable!("{o:?} is not an SSH connection"),
314 }
315 }
316
317 pub fn unwrap_csr(self) -> <InlinedConnection as ConnectionAccess>::Csr {
318 match self {
319 Self::Csr(conn) => conn,
320 o => unreachable!("{o:?} is not a Kafka connection"),
321 }
322 }
323
324 pub fn unwrap_iceberg_catalog(self) -> <InlinedConnection as ConnectionAccess>::IcebergCatalog {
325 match self {
326 Self::IcebergCatalog(conn) => conn,
327 o => unreachable!("{o:?} is not an Iceberg catalog connection"),
328 }
329 }
330}
331
332#[derive(thiserror::Error, Debug)]
334pub enum ConnectionValidationError {
335 #[error(transparent)]
336 Postgres(#[from] PostgresConnectionValidationError),
337 #[error(transparent)]
338 MySql(#[from] MySqlConnectionValidationError),
339 #[error(transparent)]
340 SqlServer(#[from] SqlServerConnectionValidationError),
341 #[error(transparent)]
342 Aws(#[from] AwsConnectionValidationError),
343 #[error("{}", .0.display_with_causes())]
344 Other(#[from] anyhow::Error),
345}
346
347impl ConnectionValidationError {
348 pub fn detail(&self) -> Option<String> {
350 match self {
351 ConnectionValidationError::Postgres(e) => e.detail(),
352 ConnectionValidationError::MySql(e) => e.detail(),
353 ConnectionValidationError::SqlServer(e) => e.detail(),
354 ConnectionValidationError::Aws(e) => e.detail(),
355 ConnectionValidationError::Other(_) => None,
356 }
357 }
358
359 pub fn hint(&self) -> Option<String> {
361 match self {
362 ConnectionValidationError::Postgres(e) => e.hint(),
363 ConnectionValidationError::MySql(e) => e.hint(),
364 ConnectionValidationError::SqlServer(e) => e.hint(),
365 ConnectionValidationError::Aws(e) => e.hint(),
366 ConnectionValidationError::Other(_) => None,
367 }
368 }
369}
370
371impl<C: ConnectionAccess> AlterCompatible for Connection<C> {
372 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
373 match (self, other) {
374 (Self::Aws(s), Self::Aws(o)) => s.alter_compatible(id, o),
375 (Self::AwsPrivatelink(s), Self::AwsPrivatelink(o)) => s.alter_compatible(id, o),
376 (Self::Ssh(s), Self::Ssh(o)) => s.alter_compatible(id, o),
377 (Self::Csr(s), Self::Csr(o)) => s.alter_compatible(id, o),
378 (Self::Kafka(s), Self::Kafka(o)) => s.alter_compatible(id, o),
379 (Self::Postgres(s), Self::Postgres(o)) => s.alter_compatible(id, o),
380 (Self::MySql(s), Self::MySql(o)) => s.alter_compatible(id, o),
381 _ => {
382 tracing::warn!(
383 "Connection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
384 self,
385 other
386 );
387 Err(AlterError { id })
388 }
389 }
390 }
391}
392
393#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
394pub struct RestIcebergCatalog {
395 pub credential: StringOrSecret,
397 pub scope: Option<String>,
399 pub warehouse: Option<String>,
401}
402
403#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
404pub struct S3TablesRestIcebergCatalog<C: ConnectionAccess = InlinedConnection> {
405 pub aws_connection: AwsConnectionReference<C>,
407 pub warehouse: String,
409}
410
411impl<R: ConnectionResolver> IntoInlineConnection<S3TablesRestIcebergCatalog, R>
412 for S3TablesRestIcebergCatalog<ReferencedConnection>
413{
414 fn into_inline_connection(self, r: R) -> S3TablesRestIcebergCatalog {
415 S3TablesRestIcebergCatalog {
416 aws_connection: self.aws_connection.into_inline_connection(&r),
417 warehouse: self.warehouse,
418 }
419 }
420}
421
422#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
423pub enum IcebergCatalogType {
424 Rest,
425 S3TablesRest,
426}
427
428#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
429pub enum IcebergCatalogImpl<C: ConnectionAccess = InlinedConnection> {
430 Rest(RestIcebergCatalog),
431 S3TablesRest(S3TablesRestIcebergCatalog<C>),
432}
433
434impl<R: ConnectionResolver> IntoInlineConnection<IcebergCatalogImpl, R>
435 for IcebergCatalogImpl<ReferencedConnection>
436{
437 fn into_inline_connection(self, r: R) -> IcebergCatalogImpl {
438 match self {
439 IcebergCatalogImpl::Rest(rest) => IcebergCatalogImpl::Rest(rest),
440 IcebergCatalogImpl::S3TablesRest(s3tables) => {
441 IcebergCatalogImpl::S3TablesRest(s3tables.into_inline_connection(r))
442 }
443 }
444 }
445}
446
447#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
448pub struct IcebergCatalogConnection<C: ConnectionAccess = InlinedConnection> {
449 pub catalog: IcebergCatalogImpl<C>,
451 pub uri: reqwest::Url,
453}
454
455impl AlterCompatible for IcebergCatalogConnection {
456 fn alter_compatible(&self, id: GlobalId, _other: &Self) -> Result<(), AlterError> {
457 Err(AlterError { id })
458 }
459}
460
461impl<R: ConnectionResolver> IntoInlineConnection<IcebergCatalogConnection, R>
462 for IcebergCatalogConnection<ReferencedConnection>
463{
464 fn into_inline_connection(self, r: R) -> IcebergCatalogConnection {
465 IcebergCatalogConnection {
466 catalog: self.catalog.into_inline_connection(&r),
467 uri: self.uri,
468 }
469 }
470}
471
472impl<C: ConnectionAccess> IcebergCatalogConnection<C> {
473 fn validate_by_default(&self) -> bool {
474 true
475 }
476}
477
478impl IcebergCatalogConnection<InlinedConnection> {
479 pub async fn connect(
480 &self,
481 storage_configuration: &StorageConfiguration,
482 in_task: InTask,
483 ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
484 match self.catalog {
485 IcebergCatalogImpl::S3TablesRest(ref s3tables) => {
486 self.connect_s3tables(s3tables, storage_configuration, in_task)
487 .await
488 }
489 IcebergCatalogImpl::Rest(ref rest) => {
490 self.connect_rest(rest, storage_configuration, in_task)
491 .await
492 }
493 }
494 }
495
496 async fn connect_s3tables(
497 &self,
498 s3tables: &S3TablesRestIcebergCatalog,
499 storage_configuration: &StorageConfiguration,
500 in_task: InTask,
501 ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
502 let mut props = BTreeMap::from([(
503 REST_CATALOG_PROP_URI.to_string(),
504 self.uri.to_string().clone(),
505 )]);
506
507 let aws_ref = &s3tables.aws_connection;
508 let aws_config = aws_ref
509 .connection
510 .load_sdk_config(
511 &storage_configuration.connection_context,
512 aws_ref.connection_id,
513 in_task,
514 )
515 .await?;
516
517 props.insert(
518 REST_CATALOG_PROP_WAREHOUSE.to_string(),
519 s3tables.warehouse.clone(),
520 );
521
522 let catalog = RestCatalogBuilder::default()
523 .with_aws_client(aws_config)
524 .load("IcebergCatalog", props.into_iter().collect())
525 .await
526 .map_err(|e| anyhow!("failed to create Iceberg catalog: {e}"))?;
527 Ok(Arc::new(catalog))
528 }
529
530 async fn connect_rest(
531 &self,
532 rest: &RestIcebergCatalog,
533 storage_configuration: &StorageConfiguration,
534 in_task: InTask,
535 ) -> Result<Arc<dyn Catalog>, anyhow::Error> {
536 let mut props = BTreeMap::from([(
537 REST_CATALOG_PROP_URI.to_string(),
538 self.uri.to_string().clone(),
539 )]);
540
541 if let Some(warehouse) = &rest.warehouse {
542 props.insert(REST_CATALOG_PROP_WAREHOUSE.to_string(), warehouse.clone());
543 }
544
545 let credential = rest
546 .credential
547 .get_string(
548 in_task,
549 &storage_configuration.connection_context.secrets_reader,
550 )
551 .await
552 .map_err(|e| anyhow!("failed to read Iceberg catalog credential: {e}"))?;
553 props.insert(REST_CATALOG_PROP_CREDENTIAL.to_string(), credential);
554
555 if let Some(scope) = &rest.scope {
556 props.insert(REST_CATALOG_PROP_SCOPE.to_string(), scope.clone());
557 }
558
559 let catalog = RestCatalogBuilder::default()
560 .load("IcebergCatalog", props.into_iter().collect())
561 .await
562 .map_err(|e| anyhow!("failed to create Iceberg catalog: {e}"))?;
563 Ok(Arc::new(catalog))
564 }
565
566 async fn validate(
567 &self,
568 _id: CatalogItemId,
569 storage_configuration: &StorageConfiguration,
570 ) -> Result<(), ConnectionValidationError> {
571 let catalog = self
572 .connect(storage_configuration, InTask::No)
573 .await
574 .map_err(|e| {
575 ConnectionValidationError::Other(anyhow!("failed to connect to catalog: {e}"))
576 })?;
577
578 catalog.list_namespaces(None).await.map_err(|e| {
580 ConnectionValidationError::Other(anyhow!("failed to list namespaces: {e}"))
581 })?;
582
583 Ok(())
584 }
585}
586
587#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
588pub struct AwsPrivatelinkConnection {
589 pub service_name: String,
590 pub availability_zones: Vec<String>,
591}
592
593impl AlterCompatible for AwsPrivatelinkConnection {
594 fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
595 Ok(())
597 }
598}
599
600#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
601pub struct KafkaTlsConfig {
602 pub identity: Option<TlsIdentity>,
603 pub root_cert: Option<StringOrSecret>,
604}
605
606#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
607pub struct KafkaSaslConfig<C: ConnectionAccess = InlinedConnection> {
608 pub mechanism: String,
609 pub username: StringOrSecret,
610 pub password: Option<CatalogItemId>,
611 pub aws: Option<AwsConnectionReference<C>>,
612}
613
614impl<R: ConnectionResolver> IntoInlineConnection<KafkaSaslConfig, R>
615 for KafkaSaslConfig<ReferencedConnection>
616{
617 fn into_inline_connection(self, r: R) -> KafkaSaslConfig {
618 KafkaSaslConfig {
619 mechanism: self.mechanism,
620 username: self.username,
621 password: self.password,
622 aws: self.aws.map(|aws| aws.into_inline_connection(&r)),
623 }
624 }
625}
626
627#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
629pub struct KafkaBroker<C: ConnectionAccess = InlinedConnection> {
630 pub address: String,
632 pub tunnel: Tunnel<C>,
634}
635
636impl<R: ConnectionResolver> IntoInlineConnection<KafkaBroker, R>
637 for KafkaBroker<ReferencedConnection>
638{
639 fn into_inline_connection(self, r: R) -> KafkaBroker {
640 let KafkaBroker { address, tunnel } = self;
641 KafkaBroker {
642 address,
643 tunnel: tunnel.into_inline_connection(r),
644 }
645 }
646}
647
648#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Default)]
649pub struct KafkaTopicOptions {
650 pub replication_factor: Option<NonNeg<i32>>,
653 pub partition_count: Option<NonNeg<i32>>,
656 pub topic_config: BTreeMap<String, String>,
658}
659
660#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
661pub struct KafkaConnection<C: ConnectionAccess = InlinedConnection> {
662 pub brokers: Vec<KafkaBroker<C>>,
663 pub default_tunnel: Tunnel<C>,
667 pub progress_topic: Option<String>,
668 pub progress_topic_options: KafkaTopicOptions,
669 pub options: BTreeMap<String, StringOrSecret>,
670 pub tls: Option<KafkaTlsConfig>,
671 pub sasl: Option<KafkaSaslConfig<C>>,
672}
673
674impl<R: ConnectionResolver> IntoInlineConnection<KafkaConnection, R>
675 for KafkaConnection<ReferencedConnection>
676{
677 fn into_inline_connection(self, r: R) -> KafkaConnection {
678 let KafkaConnection {
679 brokers,
680 progress_topic,
681 progress_topic_options,
682 default_tunnel,
683 options,
684 tls,
685 sasl,
686 } = self;
687
688 let brokers = brokers
689 .into_iter()
690 .map(|broker| broker.into_inline_connection(&r))
691 .collect();
692
693 KafkaConnection {
694 brokers,
695 progress_topic,
696 progress_topic_options,
697 default_tunnel: default_tunnel.into_inline_connection(&r),
698 options,
699 tls,
700 sasl: sasl.map(|sasl| sasl.into_inline_connection(&r)),
701 }
702 }
703}
704
705impl<C: ConnectionAccess> KafkaConnection<C> {
706 pub fn progress_topic(
711 &self,
712 connection_context: &ConnectionContext,
713 connection_id: CatalogItemId,
714 ) -> Cow<'_, str> {
715 if let Some(progress_topic) = &self.progress_topic {
716 Cow::Borrowed(progress_topic)
717 } else {
718 Cow::Owned(format!(
719 "_materialize-progress-{}-{}",
720 connection_context.environment_id, connection_id,
721 ))
722 }
723 }
724
725 fn validate_by_default(&self) -> bool {
726 true
727 }
728}
729
730impl KafkaConnection {
731 pub fn id_base(
735 connection_context: &ConnectionContext,
736 connection_id: CatalogItemId,
737 object_id: GlobalId,
738 ) -> String {
739 format!(
740 "materialize-{}-{}-{}",
741 connection_context.environment_id, connection_id, object_id,
742 )
743 }
744
745 pub fn enrich_client_id(&self, configs: &ConfigSet, client_id: &mut String) {
748 #[derive(Debug, Deserialize)]
749 struct EnrichmentRule {
750 #[serde(deserialize_with = "deserialize_regex")]
751 pattern: Regex,
752 payload: String,
753 }
754
755 fn deserialize_regex<'de, D>(deserializer: D) -> Result<Regex, D::Error>
756 where
757 D: Deserializer<'de>,
758 {
759 let buf = String::deserialize(deserializer)?;
760 Regex::new(&buf).map_err(serde::de::Error::custom)
761 }
762
763 let rules = KAFKA_CLIENT_ID_ENRICHMENT_RULES.get(configs);
764 let rules = match serde_json::from_value::<Vec<EnrichmentRule>>(rules) {
765 Ok(rules) => rules,
766 Err(e) => {
767 warn!(%e, "failed to decode kafka_client_id_enrichment_rules");
768 return;
769 }
770 };
771
772 debug!(?self.brokers, "evaluating client ID enrichment rules");
777 for rule in rules {
778 let is_match = self
779 .brokers
780 .iter()
781 .any(|b| rule.pattern.is_match(&b.address));
782 debug!(?rule, is_match, "evaluated client ID enrichment rule");
783 if is_match {
784 client_id.push('-');
785 client_id.push_str(&rule.payload);
786 }
787 }
788 }
789
790 pub async fn create_with_context<C, T>(
792 &self,
793 storage_configuration: &StorageConfiguration,
794 context: C,
795 extra_options: &BTreeMap<&str, String>,
796 in_task: InTask,
797 ) -> Result<T, ContextCreationError>
798 where
799 C: ClientContext,
800 T: FromClientConfigAndContext<TunnelingClientContext<C>>,
801 {
802 let mut options = self.options.clone();
803
804 options.insert("allow.auto.create.topics".into(), "false".into());
809
810 let brokers = match &self.default_tunnel {
811 Tunnel::AwsPrivatelink(t) => {
812 assert!(&self.brokers.is_empty());
813
814 let algo = KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM
815 .get(storage_configuration.config_set());
816 options.insert("ssl.endpoint.identification.algorithm".into(), algo.into());
817
818 format!(
821 "{}:{}",
822 vpc_endpoint_host(
823 t.connection_id,
824 None, ),
826 t.port.unwrap_or(9092)
827 )
828 }
829 _ => self.brokers.iter().map(|b| &b.address).join(","),
830 };
831 options.insert("bootstrap.servers".into(), brokers.into());
832 let security_protocol = match (self.tls.is_some(), self.sasl.is_some()) {
833 (false, false) => "PLAINTEXT",
834 (true, false) => "SSL",
835 (false, true) => "SASL_PLAINTEXT",
836 (true, true) => "SASL_SSL",
837 };
838 options.insert("security.protocol".into(), security_protocol.into());
839 if let Some(tls) = &self.tls {
840 if let Some(root_cert) = &tls.root_cert {
841 options.insert("ssl.ca.pem".into(), root_cert.clone());
842 }
843 if let Some(identity) = &tls.identity {
844 options.insert("ssl.key.pem".into(), StringOrSecret::Secret(identity.key));
845 options.insert("ssl.certificate.pem".into(), identity.cert.clone());
846 }
847 }
848 if let Some(sasl) = &self.sasl {
849 options.insert("sasl.mechanisms".into(), (&sasl.mechanism).into());
850 options.insert("sasl.username".into(), sasl.username.clone());
851 if let Some(password) = sasl.password {
852 options.insert("sasl.password".into(), StringOrSecret::Secret(password));
853 }
854 }
855
856 options.insert(
857 "retry.backoff.ms".into(),
858 KAFKA_RETRY_BACKOFF
859 .get(storage_configuration.config_set())
860 .as_millis()
861 .into(),
862 );
863 options.insert(
864 "retry.backoff.max.ms".into(),
865 KAFKA_RETRY_BACKOFF_MAX
866 .get(storage_configuration.config_set())
867 .as_millis()
868 .into(),
869 );
870 options.insert(
871 "reconnect.backoff.ms".into(),
872 KAFKA_RECONNECT_BACKOFF
873 .get(storage_configuration.config_set())
874 .as_millis()
875 .into(),
876 );
877 options.insert(
878 "reconnect.backoff.max.ms".into(),
879 KAFKA_RECONNECT_BACKOFF_MAX
880 .get(storage_configuration.config_set())
881 .as_millis()
882 .into(),
883 );
884
885 let mut config = mz_kafka_util::client::create_new_client_config(
886 storage_configuration
887 .connection_context
888 .librdkafka_log_level,
889 storage_configuration.parameters.kafka_timeout_config,
890 );
891 for (k, v) in options {
892 config.set(
893 k,
894 v.get_string(
895 in_task,
896 &storage_configuration.connection_context.secrets_reader,
897 )
898 .await
899 .context("reading kafka secret")?,
900 );
901 }
902 for (k, v) in extra_options {
903 config.set(*k, v);
904 }
905
906 let aws_config = match self.sasl.as_ref().and_then(|sasl| sasl.aws.as_ref()) {
907 None => None,
908 Some(aws) => Some(
909 aws.connection
910 .load_sdk_config(
911 &storage_configuration.connection_context,
912 aws.connection_id,
913 in_task,
914 )
915 .await?,
916 ),
917 };
918
919 let mut context = TunnelingClientContext::new(
923 context,
924 Handle::current(),
925 storage_configuration
926 .connection_context
927 .ssh_tunnel_manager
928 .clone(),
929 storage_configuration.parameters.ssh_timeout_config,
930 aws_config,
931 in_task,
932 );
933
934 match &self.default_tunnel {
935 Tunnel::Direct => {
936 }
938 Tunnel::AwsPrivatelink(pl) => {
939 context.set_default_tunnel(TunnelConfig::StaticHost(vpc_endpoint_host(
940 pl.connection_id,
941 None, )));
943 }
944 Tunnel::Ssh(ssh_tunnel) => {
945 let secret = storage_configuration
946 .connection_context
947 .secrets_reader
948 .read_in_task_if(in_task, ssh_tunnel.connection_id)
949 .await?;
950 let key_pair = SshKeyPair::from_bytes(&secret)?;
951
952 let resolved = resolve_address(
954 &ssh_tunnel.connection.host,
955 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
956 )
957 .await?;
958 context.set_default_tunnel(TunnelConfig::Ssh(SshTunnelConfig {
959 host: resolved
960 .iter()
961 .map(|a| a.to_string())
962 .collect::<BTreeSet<_>>(),
963 port: ssh_tunnel.connection.port,
964 user: ssh_tunnel.connection.user.clone(),
965 key_pair,
966 }));
967 }
968 }
969
970 for broker in &self.brokers {
971 let mut addr_parts = broker.address.splitn(2, ':');
972 let addr = BrokerAddr {
973 host: addr_parts
974 .next()
975 .context("BROKER is not address:port")?
976 .into(),
977 port: addr_parts
978 .next()
979 .unwrap_or("9092")
980 .parse()
981 .context("parsing BROKER port")?,
982 };
983 match &broker.tunnel {
984 Tunnel::Direct => {
985 }
995 Tunnel::AwsPrivatelink(aws_privatelink) => {
996 let host = mz_cloud_resources::vpc_endpoint_host(
997 aws_privatelink.connection_id,
998 aws_privatelink.availability_zone.as_deref(),
999 );
1000 let port = aws_privatelink.port;
1001 context.add_broker_rewrite(
1002 addr,
1003 BrokerRewrite {
1004 host: host.clone(),
1005 port,
1006 },
1007 );
1008 }
1009 Tunnel::Ssh(ssh_tunnel) => {
1010 let ssh_host_resolved = resolve_address(
1012 &ssh_tunnel.connection.host,
1013 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1014 )
1015 .await?;
1016 context
1017 .add_ssh_tunnel(
1018 addr,
1019 SshTunnelConfig {
1020 host: ssh_host_resolved
1021 .iter()
1022 .map(|a| a.to_string())
1023 .collect::<BTreeSet<_>>(),
1024 port: ssh_tunnel.connection.port,
1025 user: ssh_tunnel.connection.user.clone(),
1026 key_pair: SshKeyPair::from_bytes(
1027 &storage_configuration
1028 .connection_context
1029 .secrets_reader
1030 .read_in_task_if(in_task, ssh_tunnel.connection_id)
1031 .await?,
1032 )?,
1033 },
1034 )
1035 .await
1036 .map_err(ContextCreationError::Ssh)?;
1037 }
1038 }
1039 }
1040
1041 Ok(config.create_with_context(context)?)
1042 }
1043
1044 async fn validate(
1045 &self,
1046 _id: CatalogItemId,
1047 storage_configuration: &StorageConfiguration,
1048 ) -> Result<(), anyhow::Error> {
1049 let (context, error_rx) = MzClientContext::with_errors();
1050 let consumer: BaseConsumer<_> = self
1051 .create_with_context(
1052 storage_configuration,
1053 context,
1054 &BTreeMap::new(),
1055 InTask::No,
1057 )
1058 .await?;
1059 let consumer = Arc::new(consumer);
1060
1061 let timeout = storage_configuration
1062 .parameters
1063 .kafka_timeout_config
1064 .fetch_metadata_timeout;
1065
1066 let result = mz_ore::task::spawn_blocking(|| "kafka_get_metadata", {
1077 let consumer = Arc::clone(&consumer);
1078 move || consumer.fetch_metadata(None, timeout)
1079 })
1080 .await;
1081 match result {
1082 Ok(_) => Ok(()),
1083 Err(err) => {
1088 let main_err = error_rx.try_iter().reduce(|cur, new| match cur {
1092 MzKafkaError::Internal(_) => new,
1093 _ => cur,
1094 });
1095
1096 drop(consumer);
1100
1101 match main_err {
1102 Some(err) => Err(err.into()),
1103 None => Err(err.into()),
1104 }
1105 }
1106 }
1107 }
1108}
1109
1110impl<C: ConnectionAccess> AlterCompatible for KafkaConnection<C> {
1111 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1112 let KafkaConnection {
1113 brokers: _,
1114 default_tunnel: _,
1115 progress_topic,
1116 progress_topic_options,
1117 options: _,
1118 tls: _,
1119 sasl: _,
1120 } = self;
1121
1122 let compatibility_checks = [
1123 (progress_topic == &other.progress_topic, "progress_topic"),
1124 (
1125 progress_topic_options == &other.progress_topic_options,
1126 "progress_topic_options",
1127 ),
1128 ];
1129
1130 for (compatible, field) in compatibility_checks {
1131 if !compatible {
1132 tracing::warn!(
1133 "KafkaConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1134 self,
1135 other
1136 );
1137
1138 return Err(AlterError { id });
1139 }
1140 }
1141
1142 Ok(())
1143 }
1144}
1145
1146#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1148pub struct CsrConnection<C: ConnectionAccess = InlinedConnection> {
1149 pub url: Url,
1151 pub tls_root_cert: Option<StringOrSecret>,
1153 pub tls_identity: Option<TlsIdentity>,
1156 pub http_auth: Option<CsrConnectionHttpAuth>,
1158 pub tunnel: Tunnel<C>,
1160}
1161
1162impl<R: ConnectionResolver> IntoInlineConnection<CsrConnection, R>
1163 for CsrConnection<ReferencedConnection>
1164{
1165 fn into_inline_connection(self, r: R) -> CsrConnection {
1166 let CsrConnection {
1167 url,
1168 tls_root_cert,
1169 tls_identity,
1170 http_auth,
1171 tunnel,
1172 } = self;
1173 CsrConnection {
1174 url,
1175 tls_root_cert,
1176 tls_identity,
1177 http_auth,
1178 tunnel: tunnel.into_inline_connection(r),
1179 }
1180 }
1181}
1182
1183impl<C: ConnectionAccess> CsrConnection<C> {
1184 fn validate_by_default(&self) -> bool {
1185 true
1186 }
1187}
1188
1189impl CsrConnection {
1190 pub async fn connect(
1192 &self,
1193 storage_configuration: &StorageConfiguration,
1194 in_task: InTask,
1195 ) -> Result<mz_ccsr::Client, CsrConnectError> {
1196 let mut client_config = mz_ccsr::ClientConfig::new(self.url.clone());
1197 if let Some(root_cert) = &self.tls_root_cert {
1198 let root_cert = root_cert
1199 .get_string(
1200 in_task,
1201 &storage_configuration.connection_context.secrets_reader,
1202 )
1203 .await?;
1204 let root_cert = Certificate::from_pem(root_cert.as_bytes())?;
1205 client_config = client_config.add_root_certificate(root_cert);
1206 }
1207
1208 if let Some(tls_identity) = &self.tls_identity {
1209 let key = &storage_configuration
1210 .connection_context
1211 .secrets_reader
1212 .read_string_in_task_if(in_task, tls_identity.key)
1213 .await?;
1214 let cert = tls_identity
1215 .cert
1216 .get_string(
1217 in_task,
1218 &storage_configuration.connection_context.secrets_reader,
1219 )
1220 .await?;
1221 let ident = Identity::from_pem(key.as_bytes(), cert.as_bytes())?;
1222 client_config = client_config.identity(ident);
1223 }
1224
1225 if let Some(http_auth) = &self.http_auth {
1226 let username = http_auth
1227 .username
1228 .get_string(
1229 in_task,
1230 &storage_configuration.connection_context.secrets_reader,
1231 )
1232 .await?;
1233 let password = match http_auth.password {
1234 None => None,
1235 Some(password) => Some(
1236 storage_configuration
1237 .connection_context
1238 .secrets_reader
1239 .read_string_in_task_if(in_task, password)
1240 .await?,
1241 ),
1242 };
1243 client_config = client_config.auth(username, password);
1244 }
1245
1246 let host = self
1248 .url
1249 .host_str()
1250 .ok_or_else(|| anyhow!("url missing host"))?;
1251 match &self.tunnel {
1252 Tunnel::Direct => {
1253 let resolved = resolve_address(
1255 host,
1256 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1257 )
1258 .await?;
1259 client_config = client_config.resolve_to_addrs(
1260 host,
1261 &resolved
1262 .iter()
1263 .map(|addr| SocketAddr::new(*addr, DUMMY_DNS_PORT))
1264 .collect::<Vec<_>>(),
1265 )
1266 }
1267 Tunnel::Ssh(ssh_tunnel) => {
1268 let ssh_tunnel = ssh_tunnel
1269 .connect(
1270 storage_configuration,
1271 host,
1272 self.url.port().unwrap_or(80),
1275 in_task,
1276 )
1277 .await
1278 .map_err(CsrConnectError::Ssh)?;
1279
1280 client_config = client_config
1286 .resolve_to_addrs(
1292 host,
1293 &[SocketAddr::new(
1294 ssh_tunnel.local_addr().ip(),
1295 DUMMY_DNS_PORT,
1296 )],
1297 )
1298 .dynamic_url({
1309 let remote_url = self.url.clone();
1310 move || {
1311 let mut url = remote_url.clone();
1312 url.set_port(Some(ssh_tunnel.local_addr().port()))
1313 .expect("cannot fail");
1314 url
1315 }
1316 });
1317 }
1318 Tunnel::AwsPrivatelink(connection) => {
1319 assert_none!(connection.port);
1320
1321 let privatelink_host = mz_cloud_resources::vpc_endpoint_host(
1322 connection.connection_id,
1323 connection.availability_zone.as_deref(),
1324 );
1325 let addrs: Vec<_> = net::lookup_host((privatelink_host, DUMMY_DNS_PORT))
1326 .await
1327 .context("resolving PrivateLink host")?
1328 .collect();
1329 client_config = client_config.resolve_to_addrs(host, &addrs)
1330 }
1331 }
1332
1333 Ok(client_config.build()?)
1334 }
1335
1336 async fn validate(
1337 &self,
1338 _id: CatalogItemId,
1339 storage_configuration: &StorageConfiguration,
1340 ) -> Result<(), anyhow::Error> {
1341 let client = self
1342 .connect(
1343 storage_configuration,
1344 InTask::No,
1346 )
1347 .await?;
1348 client.list_subjects().await?;
1349 Ok(())
1350 }
1351}
1352
1353impl<C: ConnectionAccess> AlterCompatible for CsrConnection<C> {
1354 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1355 let CsrConnection {
1356 tunnel,
1357 url: _,
1359 tls_root_cert: _,
1360 tls_identity: _,
1361 http_auth: _,
1362 } = self;
1363
1364 let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
1365
1366 for (compatible, field) in compatibility_checks {
1367 if !compatible {
1368 tracing::warn!(
1369 "CsrConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1370 self,
1371 other
1372 );
1373
1374 return Err(AlterError { id });
1375 }
1376 }
1377 Ok(())
1378 }
1379}
1380
1381#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1383pub struct TlsIdentity {
1384 pub cert: StringOrSecret,
1386 pub key: CatalogItemId,
1389}
1390
1391#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1393pub struct CsrConnectionHttpAuth {
1394 pub username: StringOrSecret,
1396 pub password: Option<CatalogItemId>,
1398}
1399
1400#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1402pub struct PostgresConnection<C: ConnectionAccess = InlinedConnection> {
1403 pub host: String,
1405 pub port: u16,
1407 pub database: String,
1409 pub user: StringOrSecret,
1411 pub password: Option<CatalogItemId>,
1413 pub tunnel: Tunnel<C>,
1415 pub tls_mode: SslMode,
1417 pub tls_root_cert: Option<StringOrSecret>,
1420 pub tls_identity: Option<TlsIdentity>,
1422}
1423
1424impl<R: ConnectionResolver> IntoInlineConnection<PostgresConnection, R>
1425 for PostgresConnection<ReferencedConnection>
1426{
1427 fn into_inline_connection(self, r: R) -> PostgresConnection {
1428 let PostgresConnection {
1429 host,
1430 port,
1431 database,
1432 user,
1433 password,
1434 tunnel,
1435 tls_mode,
1436 tls_root_cert,
1437 tls_identity,
1438 } = self;
1439
1440 PostgresConnection {
1441 host,
1442 port,
1443 database,
1444 user,
1445 password,
1446 tunnel: tunnel.into_inline_connection(r),
1447 tls_mode,
1448 tls_root_cert,
1449 tls_identity,
1450 }
1451 }
1452}
1453
1454impl<C: ConnectionAccess> PostgresConnection<C> {
1455 fn validate_by_default(&self) -> bool {
1456 true
1457 }
1458}
1459
1460impl PostgresConnection<InlinedConnection> {
1461 pub async fn config(
1462 &self,
1463 secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
1464 storage_configuration: &StorageConfiguration,
1465 in_task: InTask,
1466 ) -> Result<mz_postgres_util::Config, anyhow::Error> {
1467 let params = &storage_configuration.parameters;
1468
1469 let mut config = tokio_postgres::Config::new();
1470 config
1471 .host(&self.host)
1472 .port(self.port)
1473 .dbname(&self.database)
1474 .user(&self.user.get_string(in_task, secrets_reader).await?)
1475 .ssl_mode(self.tls_mode);
1476 if let Some(password) = self.password {
1477 let password = secrets_reader
1478 .read_string_in_task_if(in_task, password)
1479 .await?;
1480 config.password(password);
1481 }
1482 if let Some(tls_root_cert) = &self.tls_root_cert {
1483 let tls_root_cert = tls_root_cert.get_string(in_task, secrets_reader).await?;
1484 config.ssl_root_cert(tls_root_cert.as_bytes());
1485 }
1486 if let Some(tls_identity) = &self.tls_identity {
1487 let cert = tls_identity
1488 .cert
1489 .get_string(in_task, secrets_reader)
1490 .await?;
1491 let key = secrets_reader
1492 .read_string_in_task_if(in_task, tls_identity.key)
1493 .await?;
1494 config.ssl_cert(cert.as_bytes()).ssl_key(key.as_bytes());
1495 }
1496
1497 if let Some(connect_timeout) = params.pg_source_connect_timeout {
1498 config.connect_timeout(connect_timeout);
1499 }
1500 if let Some(keepalives_retries) = params.pg_source_tcp_keepalives_retries {
1501 config.keepalives_retries(keepalives_retries);
1502 }
1503 if let Some(keepalives_idle) = params.pg_source_tcp_keepalives_idle {
1504 config.keepalives_idle(keepalives_idle);
1505 }
1506 if let Some(keepalives_interval) = params.pg_source_tcp_keepalives_interval {
1507 config.keepalives_interval(keepalives_interval);
1508 }
1509 if let Some(tcp_user_timeout) = params.pg_source_tcp_user_timeout {
1510 config.tcp_user_timeout(tcp_user_timeout);
1511 }
1512
1513 let mut options = vec![];
1514 if let Some(wal_sender_timeout) = params.pg_source_wal_sender_timeout {
1515 options.push(format!(
1516 "--wal_sender_timeout={}",
1517 wal_sender_timeout.as_millis()
1518 ));
1519 };
1520 if params.pg_source_tcp_configure_server {
1521 if let Some(keepalives_retries) = params.pg_source_tcp_keepalives_retries {
1522 options.push(format!("--tcp_keepalives_count={}", keepalives_retries));
1523 }
1524 if let Some(keepalives_idle) = params.pg_source_tcp_keepalives_idle {
1525 options.push(format!(
1526 "--tcp_keepalives_idle={}",
1527 keepalives_idle.as_secs()
1528 ));
1529 }
1530 if let Some(keepalives_interval) = params.pg_source_tcp_keepalives_interval {
1531 options.push(format!(
1532 "--tcp_keepalives_interval={}",
1533 keepalives_interval.as_secs()
1534 ));
1535 }
1536 if let Some(tcp_user_timeout) = params.pg_source_tcp_user_timeout {
1537 options.push(format!(
1538 "--tcp_user_timeout={}",
1539 tcp_user_timeout.as_millis()
1540 ));
1541 }
1542 }
1543 config.options(options.join(" ").as_str());
1544
1545 let tunnel = match &self.tunnel {
1546 Tunnel::Direct => {
1547 let resolved = resolve_address(
1549 &self.host,
1550 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1551 )
1552 .await?;
1553 mz_postgres_util::TunnelConfig::Direct {
1554 resolved_ips: Some(resolved),
1555 }
1556 }
1557 Tunnel::Ssh(SshTunnel {
1558 connection_id,
1559 connection,
1560 }) => {
1561 let secret = secrets_reader
1562 .read_in_task_if(in_task, *connection_id)
1563 .await?;
1564 let key_pair = SshKeyPair::from_bytes(&secret)?;
1565 let resolved = resolve_address(
1567 &connection.host,
1568 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1569 )
1570 .await?;
1571 mz_postgres_util::TunnelConfig::Ssh {
1572 config: SshTunnelConfig {
1573 host: resolved
1574 .iter()
1575 .map(|a| a.to_string())
1576 .collect::<BTreeSet<_>>(),
1577 port: connection.port,
1578 user: connection.user.clone(),
1579 key_pair,
1580 },
1581 }
1582 }
1583 Tunnel::AwsPrivatelink(connection) => {
1584 assert_none!(connection.port);
1585 mz_postgres_util::TunnelConfig::AwsPrivatelink {
1586 connection_id: connection.connection_id,
1587 }
1588 }
1589 };
1590
1591 Ok(mz_postgres_util::Config::new(
1592 config,
1593 tunnel,
1594 params.ssh_timeout_config,
1595 in_task,
1596 )?)
1597 }
1598
1599 pub async fn validate(
1600 &self,
1601 _id: CatalogItemId,
1602 storage_configuration: &StorageConfiguration,
1603 ) -> Result<mz_postgres_util::Client, anyhow::Error> {
1604 let config = self
1605 .config(
1606 &storage_configuration.connection_context.secrets_reader,
1607 storage_configuration,
1608 InTask::No,
1610 )
1611 .await?;
1612 let client = config
1613 .connect(
1614 "connection validation",
1615 &storage_configuration.connection_context.ssh_tunnel_manager,
1616 )
1617 .await?;
1618
1619 let wal_level = mz_postgres_util::get_wal_level(&client).await?;
1620
1621 if wal_level < mz_postgres_util::replication::WalLevel::Logical {
1622 Err(PostgresConnectionValidationError::InsufficientWalLevel { wal_level })?;
1623 }
1624
1625 let max_wal_senders = mz_postgres_util::get_max_wal_senders(&client).await?;
1626
1627 if max_wal_senders < 1 {
1628 Err(PostgresConnectionValidationError::ReplicationDisabled)?;
1629 }
1630
1631 let available_replication_slots =
1632 mz_postgres_util::available_replication_slots(&client).await?;
1633
1634 if available_replication_slots < 2 {
1636 Err(
1637 PostgresConnectionValidationError::InsufficientReplicationSlotsAvailable {
1638 count: 2,
1639 },
1640 )?;
1641 }
1642
1643 Ok(client)
1644 }
1645}
1646
1647#[derive(Debug, Clone, thiserror::Error)]
1648pub enum PostgresConnectionValidationError {
1649 #[error("PostgreSQL server has insufficient number of replication slots available")]
1650 InsufficientReplicationSlotsAvailable { count: usize },
1651 #[error("server must have wal_level >= logical, but has {wal_level}")]
1652 InsufficientWalLevel {
1653 wal_level: mz_postgres_util::replication::WalLevel,
1654 },
1655 #[error("replication disabled on server")]
1656 ReplicationDisabled,
1657}
1658
1659impl PostgresConnectionValidationError {
1660 pub fn detail(&self) -> Option<String> {
1661 match self {
1662 Self::InsufficientReplicationSlotsAvailable { count } => Some(format!(
1663 "executing this statement requires {} replication slot{}",
1664 count,
1665 if *count == 1 { "" } else { "s" }
1666 )),
1667 _ => None,
1668 }
1669 }
1670
1671 pub fn hint(&self) -> Option<String> {
1672 match self {
1673 Self::InsufficientReplicationSlotsAvailable { .. } => Some(
1674 "you might be able to wait for other sources to finish snapshotting and try again"
1675 .into(),
1676 ),
1677 Self::ReplicationDisabled => Some("set max_wal_senders to a value > 0".into()),
1678 _ => None,
1679 }
1680 }
1681}
1682
1683impl<C: ConnectionAccess> AlterCompatible for PostgresConnection<C> {
1684 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1685 let PostgresConnection {
1686 tunnel,
1687 host: _,
1689 port: _,
1690 database: _,
1691 user: _,
1692 password: _,
1693 tls_mode: _,
1694 tls_root_cert: _,
1695 tls_identity: _,
1696 } = self;
1697
1698 let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
1699
1700 for (compatible, field) in compatibility_checks {
1701 if !compatible {
1702 tracing::warn!(
1703 "PostgresConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
1704 self,
1705 other
1706 );
1707
1708 return Err(AlterError { id });
1709 }
1710 }
1711 Ok(())
1712 }
1713}
1714
1715#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1717pub enum Tunnel<C: ConnectionAccess = InlinedConnection> {
1718 Direct,
1720 Ssh(SshTunnel<C>),
1722 AwsPrivatelink(AwsPrivatelink),
1724}
1725
1726impl<R: ConnectionResolver> IntoInlineConnection<Tunnel, R> for Tunnel<ReferencedConnection> {
1727 fn into_inline_connection(self, r: R) -> Tunnel {
1728 match self {
1729 Tunnel::Direct => Tunnel::Direct,
1730 Tunnel::Ssh(ssh) => Tunnel::Ssh(ssh.into_inline_connection(r)),
1731 Tunnel::AwsPrivatelink(awspl) => Tunnel::AwsPrivatelink(awspl),
1732 }
1733 }
1734}
1735
1736impl<C: ConnectionAccess> AlterCompatible for Tunnel<C> {
1737 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1738 let compatible = match (self, other) {
1739 (Self::Ssh(s), Self::Ssh(o)) => s.alter_compatible(id, o).is_ok(),
1740 (s, o) => s == o,
1741 };
1742
1743 if !compatible {
1744 tracing::warn!(
1745 "Tunnel incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
1746 self,
1747 other
1748 );
1749
1750 return Err(AlterError { id });
1751 }
1752
1753 Ok(())
1754 }
1755}
1756
1757#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1761pub enum MySqlSslMode {
1762 Disabled,
1763 Required,
1764 VerifyCa,
1765 VerifyIdentity,
1766}
1767
1768#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
1770pub struct MySqlConnection<C: ConnectionAccess = InlinedConnection> {
1771 pub host: String,
1773 pub port: u16,
1775 pub user: StringOrSecret,
1777 pub password: Option<CatalogItemId>,
1779 pub tunnel: Tunnel<C>,
1781 pub tls_mode: MySqlSslMode,
1783 pub tls_root_cert: Option<StringOrSecret>,
1786 pub tls_identity: Option<TlsIdentity>,
1788 pub aws_connection: Option<AwsConnectionReference<C>>,
1791}
1792
1793impl<R: ConnectionResolver> IntoInlineConnection<MySqlConnection, R>
1794 for MySqlConnection<ReferencedConnection>
1795{
1796 fn into_inline_connection(self, r: R) -> MySqlConnection {
1797 let MySqlConnection {
1798 host,
1799 port,
1800 user,
1801 password,
1802 tunnel,
1803 tls_mode,
1804 tls_root_cert,
1805 tls_identity,
1806 aws_connection,
1807 } = self;
1808
1809 MySqlConnection {
1810 host,
1811 port,
1812 user,
1813 password,
1814 tunnel: tunnel.into_inline_connection(&r),
1815 tls_mode,
1816 tls_root_cert,
1817 tls_identity,
1818 aws_connection: aws_connection.map(|aws| aws.into_inline_connection(&r)),
1819 }
1820 }
1821}
1822
1823impl<C: ConnectionAccess> MySqlConnection<C> {
1824 fn validate_by_default(&self) -> bool {
1825 true
1826 }
1827}
1828
1829impl MySqlConnection<InlinedConnection> {
1830 pub async fn config(
1831 &self,
1832 secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
1833 storage_configuration: &StorageConfiguration,
1834 in_task: InTask,
1835 ) -> Result<mz_mysql_util::Config, anyhow::Error> {
1836 let mut opts = mysql_async::OptsBuilder::default()
1838 .ip_or_hostname(&self.host)
1839 .tcp_port(self.port)
1840 .user(Some(&self.user.get_string(in_task, secrets_reader).await?));
1841
1842 if let Some(password) = self.password {
1843 let password = secrets_reader
1844 .read_string_in_task_if(in_task, password)
1845 .await?;
1846 opts = opts.pass(Some(password));
1847 }
1848
1849 let mut ssl_opts = match self.tls_mode {
1854 MySqlSslMode::Disabled => None,
1855 MySqlSslMode::Required => Some(
1856 mysql_async::SslOpts::default()
1857 .with_danger_accept_invalid_certs(true)
1858 .with_danger_skip_domain_validation(true),
1859 ),
1860 MySqlSslMode::VerifyCa => {
1861 Some(mysql_async::SslOpts::default().with_danger_skip_domain_validation(true))
1862 }
1863 MySqlSslMode::VerifyIdentity => Some(mysql_async::SslOpts::default()),
1864 };
1865
1866 if matches!(
1867 self.tls_mode,
1868 MySqlSslMode::VerifyCa | MySqlSslMode::VerifyIdentity
1869 ) {
1870 if let Some(tls_root_cert) = &self.tls_root_cert {
1871 let tls_root_cert = tls_root_cert.get_string(in_task, secrets_reader).await?;
1872 ssl_opts = ssl_opts.map(|opts| {
1873 opts.with_root_certs(vec![tls_root_cert.as_bytes().to_vec().into()])
1874 });
1875 }
1876 }
1877
1878 if let Some(identity) = &self.tls_identity {
1879 let key = secrets_reader
1880 .read_string_in_task_if(in_task, identity.key)
1881 .await?;
1882 let cert = identity.cert.get_string(in_task, secrets_reader).await?;
1883 let Pkcs12Archive { der, pass } =
1884 mz_tls_util::pkcs12der_from_pem(key.as_bytes(), cert.as_bytes())?;
1885
1886 ssl_opts = ssl_opts.map(|opts| {
1888 opts.with_client_identity(Some(
1889 mysql_async::ClientIdentity::new(der.into()).with_password(pass),
1890 ))
1891 });
1892 }
1893
1894 opts = opts.ssl_opts(ssl_opts);
1895
1896 let tunnel = match &self.tunnel {
1897 Tunnel::Direct => {
1898 let resolved = resolve_address(
1900 &self.host,
1901 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1902 )
1903 .await?;
1904 mz_mysql_util::TunnelConfig::Direct {
1905 resolved_ips: Some(resolved),
1906 }
1907 }
1908 Tunnel::Ssh(SshTunnel {
1909 connection_id,
1910 connection,
1911 }) => {
1912 let secret = secrets_reader
1913 .read_in_task_if(in_task, *connection_id)
1914 .await?;
1915 let key_pair = SshKeyPair::from_bytes(&secret)?;
1916 let resolved = resolve_address(
1918 &connection.host,
1919 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
1920 )
1921 .await?;
1922 mz_mysql_util::TunnelConfig::Ssh {
1923 config: SshTunnelConfig {
1924 host: resolved
1925 .iter()
1926 .map(|a| a.to_string())
1927 .collect::<BTreeSet<_>>(),
1928 port: connection.port,
1929 user: connection.user.clone(),
1930 key_pair,
1931 },
1932 }
1933 }
1934 Tunnel::AwsPrivatelink(connection) => {
1935 assert_none!(connection.port);
1936 mz_mysql_util::TunnelConfig::AwsPrivatelink {
1937 connection_id: connection.connection_id,
1938 }
1939 }
1940 };
1941
1942 let aws_config = match self.aws_connection.as_ref() {
1943 None => None,
1944 Some(aws_ref) => Some(
1945 aws_ref
1946 .connection
1947 .load_sdk_config(
1948 &storage_configuration.connection_context,
1949 aws_ref.connection_id,
1950 in_task,
1951 )
1952 .await?,
1953 ),
1954 };
1955
1956 Ok(mz_mysql_util::Config::new(
1957 opts,
1958 tunnel,
1959 storage_configuration.parameters.ssh_timeout_config,
1960 in_task,
1961 storage_configuration
1962 .parameters
1963 .mysql_source_timeouts
1964 .clone(),
1965 aws_config,
1966 )?)
1967 }
1968
1969 pub async fn validate(
1970 &self,
1971 _id: CatalogItemId,
1972 storage_configuration: &StorageConfiguration,
1973 ) -> Result<MySqlConn, MySqlConnectionValidationError> {
1974 let config = self
1975 .config(
1976 &storage_configuration.connection_context.secrets_reader,
1977 storage_configuration,
1978 InTask::No,
1980 )
1981 .await?;
1982 let mut conn = config
1983 .connect(
1984 "connection validation",
1985 &storage_configuration.connection_context.ssh_tunnel_manager,
1986 )
1987 .await?;
1988
1989 let mut setting_errors = vec![];
1991 let gtid_res = mz_mysql_util::ensure_gtid_consistency(&mut conn).await;
1992 let binlog_res = mz_mysql_util::ensure_full_row_binlog_format(&mut conn).await;
1993 let order_res = mz_mysql_util::ensure_replication_commit_order(&mut conn).await;
1994 for res in [gtid_res, binlog_res, order_res] {
1995 match res {
1996 Err(MySqlError::InvalidSystemSetting {
1997 setting,
1998 expected,
1999 actual,
2000 }) => {
2001 setting_errors.push((setting, expected, actual));
2002 }
2003 Err(err) => Err(err)?,
2004 Ok(()) => {}
2005 }
2006 }
2007 if !setting_errors.is_empty() {
2008 Err(MySqlConnectionValidationError::ReplicationSettingsError(
2009 setting_errors,
2010 ))?;
2011 }
2012
2013 Ok(conn)
2014 }
2015}
2016
2017#[derive(Debug, thiserror::Error)]
2018pub enum MySqlConnectionValidationError {
2019 #[error("Invalid MySQL system replication settings")]
2020 ReplicationSettingsError(Vec<(String, String, String)>),
2021 #[error(transparent)]
2022 Client(#[from] MySqlError),
2023 #[error("{}", .0.display_with_causes())]
2024 Other(#[from] anyhow::Error),
2025}
2026
2027impl MySqlConnectionValidationError {
2028 pub fn detail(&self) -> Option<String> {
2029 match self {
2030 Self::ReplicationSettingsError(settings) => Some(format!(
2031 "Invalid MySQL system replication settings: {}",
2032 itertools::join(
2033 settings.iter().map(|(setting, expected, actual)| format!(
2034 "{}: expected {}, got {}",
2035 setting, expected, actual
2036 )),
2037 "; "
2038 )
2039 )),
2040 _ => None,
2041 }
2042 }
2043
2044 pub fn hint(&self) -> Option<String> {
2045 match self {
2046 Self::ReplicationSettingsError(_) => {
2047 Some("Set the necessary MySQL database system settings.".into())
2048 }
2049 _ => None,
2050 }
2051 }
2052}
2053
2054impl<C: ConnectionAccess> AlterCompatible for MySqlConnection<C> {
2055 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2056 let MySqlConnection {
2057 tunnel,
2058 host: _,
2060 port: _,
2061 user: _,
2062 password: _,
2063 tls_mode: _,
2064 tls_root_cert: _,
2065 tls_identity: _,
2066 aws_connection: _,
2067 } = self;
2068
2069 let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
2070
2071 for (compatible, field) in compatibility_checks {
2072 if !compatible {
2073 tracing::warn!(
2074 "MySqlConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2075 self,
2076 other
2077 );
2078
2079 return Err(AlterError { id });
2080 }
2081 }
2082 Ok(())
2083 }
2084}
2085
2086#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2093pub struct SqlServerConnectionDetails<C: ConnectionAccess = InlinedConnection> {
2094 pub host: String,
2096 pub port: u16,
2098 pub database: String,
2100 pub user: StringOrSecret,
2102 pub password: CatalogItemId,
2104 pub tunnel: Tunnel<C>,
2106 pub encryption: mz_sql_server_util::config::EncryptionLevel,
2108 pub certificate_validation_policy: mz_sql_server_util::config::CertificateValidationPolicy,
2110 pub tls_root_cert: Option<StringOrSecret>,
2112}
2113
2114impl<C: ConnectionAccess> SqlServerConnectionDetails<C> {
2115 fn validate_by_default(&self) -> bool {
2116 true
2117 }
2118}
2119
2120impl SqlServerConnectionDetails<InlinedConnection> {
2121 pub async fn validate(
2123 &self,
2124 _id: CatalogItemId,
2125 storage_configuration: &StorageConfiguration,
2126 ) -> Result<mz_sql_server_util::Client, anyhow::Error> {
2127 let config = self
2128 .resolve_config(
2129 &storage_configuration.connection_context.secrets_reader,
2130 storage_configuration,
2131 InTask::No,
2132 )
2133 .await?;
2134 tracing::debug!(?config, "Validating SQL Server connection");
2135
2136 let mut client = mz_sql_server_util::Client::connect(config).await?;
2137
2138 let mut replication_errors = vec![];
2143 for error in [
2144 mz_sql_server_util::inspect::ensure_database_cdc_enabled(&mut client).await,
2145 mz_sql_server_util::inspect::ensure_snapshot_isolation_enabled(&mut client).await,
2146 mz_sql_server_util::inspect::ensure_sql_server_agent_running(&mut client).await,
2147 ] {
2148 match error {
2149 Err(mz_sql_server_util::SqlServerError::InvalidSystemSetting {
2150 name,
2151 expected,
2152 actual,
2153 }) => replication_errors.push((name, expected, actual)),
2154 Err(other) => Err(other)?,
2155 Ok(()) => (),
2156 }
2157 }
2158 if !replication_errors.is_empty() {
2159 Err(SqlServerConnectionValidationError::ReplicationSettingsError(replication_errors))?;
2160 }
2161
2162 Ok(client)
2163 }
2164
2165 pub async fn resolve_config(
2175 &self,
2176 secrets_reader: &Arc<dyn mz_secrets::SecretsReader>,
2177 storage_configuration: &StorageConfiguration,
2178 in_task: InTask,
2179 ) -> Result<mz_sql_server_util::Config, anyhow::Error> {
2180 let dyncfg = storage_configuration.config_set();
2181 let mut inner_config = tiberius::Config::new();
2182
2183 inner_config.host(&self.host);
2185 inner_config.port(self.port);
2186 inner_config.database(self.database.clone());
2187 inner_config.encryption(self.encryption.into());
2188 match self.certificate_validation_policy {
2189 mz_sql_server_util::config::CertificateValidationPolicy::TrustAll => {
2190 inner_config.trust_cert()
2191 }
2192 mz_sql_server_util::config::CertificateValidationPolicy::VerifyCA => {
2193 inner_config.trust_cert_ca_pem(
2194 self.tls_root_cert
2195 .as_ref()
2196 .unwrap()
2197 .get_string(in_task, secrets_reader)
2198 .await
2199 .context("ca certificate")?,
2200 );
2201 }
2202 mz_sql_server_util::config::CertificateValidationPolicy::VerifySystem => (), }
2204
2205 inner_config.application_name("materialize");
2206
2207 let user = self
2209 .user
2210 .get_string(in_task, secrets_reader)
2211 .await
2212 .context("username")?;
2213 let password = secrets_reader
2214 .read_string_in_task_if(in_task, self.password)
2215 .await
2216 .context("password")?;
2217 inner_config.authentication(tiberius::AuthMethod::sql_server(user, password));
2220
2221 let enfoce_external_addresses = ENFORCE_EXTERNAL_ADDRESSES.get(dyncfg);
2224
2225 let tunnel = match &self.tunnel {
2226 Tunnel::Direct => mz_sql_server_util::config::TunnelConfig::Direct,
2227 Tunnel::Ssh(SshTunnel {
2228 connection_id,
2229 connection: ssh_connection,
2230 }) => {
2231 let secret = secrets_reader
2232 .read_in_task_if(in_task, *connection_id)
2233 .await
2234 .context("ssh secret")?;
2235 let key_pair = SshKeyPair::from_bytes(&secret).context("ssh key pair")?;
2236 let addresses = resolve_address(&ssh_connection.host, enfoce_external_addresses)
2239 .await
2240 .context("ssh tunnel")?;
2241
2242 let config = SshTunnelConfig {
2243 host: addresses.into_iter().map(|a| a.to_string()).collect(),
2244 port: ssh_connection.port,
2245 user: ssh_connection.user.clone(),
2246 key_pair,
2247 };
2248 mz_sql_server_util::config::TunnelConfig::Ssh {
2249 config,
2250 manager: storage_configuration
2251 .connection_context
2252 .ssh_tunnel_manager
2253 .clone(),
2254 timeout: storage_configuration.parameters.ssh_timeout_config.clone(),
2255 host: self.host.clone(),
2256 port: self.port,
2257 }
2258 }
2259 Tunnel::AwsPrivatelink(private_link_connection) => {
2260 assert_none!(private_link_connection.port);
2261 mz_sql_server_util::config::TunnelConfig::AwsPrivatelink {
2262 connection_id: private_link_connection.connection_id,
2263 port: self.port,
2264 }
2265 }
2266 };
2267
2268 Ok(mz_sql_server_util::Config::new(
2269 inner_config,
2270 tunnel,
2271 in_task,
2272 ))
2273 }
2274}
2275
2276#[derive(Debug, Clone, thiserror::Error)]
2277pub enum SqlServerConnectionValidationError {
2278 #[error("Invalid SQL Server system replication settings")]
2279 ReplicationSettingsError(Vec<(String, String, String)>),
2280}
2281
2282impl SqlServerConnectionValidationError {
2283 pub fn detail(&self) -> Option<String> {
2284 match self {
2285 Self::ReplicationSettingsError(settings) => Some(format!(
2286 "Invalid SQL Server system replication settings: {}",
2287 itertools::join(
2288 settings.iter().map(|(setting, expected, actual)| format!(
2289 "{}: expected {}, got {}",
2290 setting, expected, actual
2291 )),
2292 "; "
2293 )
2294 )),
2295 }
2296 }
2297
2298 pub fn hint(&self) -> Option<String> {
2299 match self {
2300 _ => None,
2301 }
2302 }
2303}
2304
2305impl<R: ConnectionResolver> IntoInlineConnection<SqlServerConnectionDetails, R>
2306 for SqlServerConnectionDetails<ReferencedConnection>
2307{
2308 fn into_inline_connection(self, r: R) -> SqlServerConnectionDetails {
2309 let SqlServerConnectionDetails {
2310 host,
2311 port,
2312 database,
2313 user,
2314 password,
2315 tunnel,
2316 encryption,
2317 certificate_validation_policy,
2318 tls_root_cert,
2319 } = self;
2320
2321 SqlServerConnectionDetails {
2322 host,
2323 port,
2324 database,
2325 user,
2326 password,
2327 tunnel: tunnel.into_inline_connection(&r),
2328 encryption,
2329 certificate_validation_policy,
2330 tls_root_cert,
2331 }
2332 }
2333}
2334
2335impl<C: ConnectionAccess> AlterCompatible for SqlServerConnectionDetails<C> {
2336 fn alter_compatible(
2337 &self,
2338 id: mz_repr::GlobalId,
2339 other: &Self,
2340 ) -> Result<(), crate::controller::AlterError> {
2341 let SqlServerConnectionDetails {
2342 tunnel,
2343 host: _,
2345 port: _,
2346 database: _,
2347 user: _,
2348 password: _,
2349 encryption: _,
2350 certificate_validation_policy: _,
2351 tls_root_cert: _,
2352 } = self;
2353
2354 let compatibility_checks = [(tunnel.alter_compatible(id, &other.tunnel).is_ok(), "tunnel")];
2355
2356 for (compatible, field) in compatibility_checks {
2357 if !compatible {
2358 tracing::warn!(
2359 "SqlServerConnectionDetails incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2360 self,
2361 other
2362 );
2363
2364 return Err(AlterError { id });
2365 }
2366 }
2367 Ok(())
2368 }
2369}
2370
2371#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2373pub struct SshConnection {
2374 pub host: String,
2375 pub port: u16,
2376 pub user: String,
2377}
2378
2379use self::inline::{
2380 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
2381 ReferencedConnection,
2382};
2383
2384impl AlterCompatible for SshConnection {
2385 fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
2386 Ok(())
2388 }
2389}
2390
2391#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2393pub struct AwsPrivatelink {
2394 pub connection_id: CatalogItemId,
2396 pub availability_zone: Option<String>,
2398 pub port: Option<u16>,
2401}
2402
2403impl AlterCompatible for AwsPrivatelink {
2404 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2405 let AwsPrivatelink {
2406 connection_id,
2407 availability_zone: _,
2408 port: _,
2409 } = self;
2410
2411 let compatibility_checks = [(connection_id == &other.connection_id, "connection_id")];
2412
2413 for (compatible, field) in compatibility_checks {
2414 if !compatible {
2415 tracing::warn!(
2416 "AwsPrivatelink incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2417 self,
2418 other
2419 );
2420
2421 return Err(AlterError { id });
2422 }
2423 }
2424
2425 Ok(())
2426 }
2427}
2428
2429#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
2431pub struct SshTunnel<C: ConnectionAccess = InlinedConnection> {
2432 pub connection_id: CatalogItemId,
2434 pub connection: C::Ssh,
2436}
2437
2438impl<R: ConnectionResolver> IntoInlineConnection<SshTunnel, R> for SshTunnel<ReferencedConnection> {
2439 fn into_inline_connection(self, r: R) -> SshTunnel {
2440 let SshTunnel {
2441 connection,
2442 connection_id,
2443 } = self;
2444
2445 SshTunnel {
2446 connection: r.resolve_connection(connection).unwrap_ssh(),
2447 connection_id,
2448 }
2449 }
2450}
2451
2452impl SshTunnel<InlinedConnection> {
2453 async fn connect(
2456 &self,
2457 storage_configuration: &StorageConfiguration,
2458 remote_host: &str,
2459 remote_port: u16,
2460 in_task: InTask,
2461 ) -> Result<ManagedSshTunnelHandle, anyhow::Error> {
2462 let resolved = resolve_address(
2464 &self.connection.host,
2465 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2466 )
2467 .await?;
2468 storage_configuration
2469 .connection_context
2470 .ssh_tunnel_manager
2471 .connect(
2472 SshTunnelConfig {
2473 host: resolved
2474 .iter()
2475 .map(|a| a.to_string())
2476 .collect::<BTreeSet<_>>(),
2477 port: self.connection.port,
2478 user: self.connection.user.clone(),
2479 key_pair: SshKeyPair::from_bytes(
2480 &storage_configuration
2481 .connection_context
2482 .secrets_reader
2483 .read_in_task_if(in_task, self.connection_id)
2484 .await?,
2485 )?,
2486 },
2487 remote_host,
2488 remote_port,
2489 storage_configuration.parameters.ssh_timeout_config,
2490 in_task,
2491 )
2492 .await
2493 }
2494}
2495
2496impl<C: ConnectionAccess> AlterCompatible for SshTunnel<C> {
2497 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
2498 let SshTunnel {
2499 connection_id,
2500 connection,
2501 } = self;
2502
2503 let compatibility_checks = [
2504 (connection_id == &other.connection_id, "connection_id"),
2505 (
2506 connection.alter_compatible(id, &other.connection).is_ok(),
2507 "connection",
2508 ),
2509 ];
2510
2511 for (compatible, field) in compatibility_checks {
2512 if !compatible {
2513 tracing::warn!(
2514 "SshTunnel incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
2515 self,
2516 other
2517 );
2518
2519 return Err(AlterError { id });
2520 }
2521 }
2522
2523 Ok(())
2524 }
2525}
2526
2527impl SshConnection {
2528 #[allow(clippy::unused_async)]
2529 async fn validate(
2530 &self,
2531 id: CatalogItemId,
2532 storage_configuration: &StorageConfiguration,
2533 ) -> Result<(), anyhow::Error> {
2534 let secret = storage_configuration
2535 .connection_context
2536 .secrets_reader
2537 .read_in_task_if(
2538 InTask::No,
2540 id,
2541 )
2542 .await?;
2543 let key_pair = SshKeyPair::from_bytes(&secret)?;
2544
2545 let resolved = resolve_address(
2547 &self.host,
2548 ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
2549 )
2550 .await?;
2551
2552 let config = SshTunnelConfig {
2553 host: resolved
2554 .iter()
2555 .map(|a| a.to_string())
2556 .collect::<BTreeSet<_>>(),
2557 port: self.port,
2558 user: self.user.clone(),
2559 key_pair,
2560 };
2561 config
2564 .validate(storage_configuration.parameters.ssh_timeout_config)
2565 .await
2566 }
2567
2568 fn validate_by_default(&self) -> bool {
2569 false
2570 }
2571}
2572
2573impl AwsPrivatelinkConnection {
2574 #[allow(clippy::unused_async)]
2575 async fn validate(
2576 &self,
2577 id: CatalogItemId,
2578 storage_configuration: &StorageConfiguration,
2579 ) -> Result<(), anyhow::Error> {
2580 let Some(ref cloud_resource_reader) = storage_configuration
2581 .connection_context
2582 .cloud_resource_reader
2583 else {
2584 return Err(anyhow!("AWS PrivateLink connections are unsupported"));
2585 };
2586
2587 let status = cloud_resource_reader.read(id).await?;
2589
2590 let availability = status
2591 .conditions
2592 .as_ref()
2593 .and_then(|conditions| conditions.iter().find(|c| c.type_ == "Available"));
2594
2595 match availability {
2596 Some(condition) if condition.status == "True" => Ok(()),
2597 Some(condition) => Err(anyhow!("{}", condition.message)),
2598 None => Err(anyhow!("Endpoint availability is unknown")),
2599 }
2600 }
2601
2602 fn validate_by_default(&self) -> bool {
2603 false
2604 }
2605}