1use std::collections::{BTreeMap, BTreeSet};
13
14use anyhow::Context;
15use array_concat::concat_arrays;
16use itertools::Itertools;
17use maplit::btreemap;
18use mz_ore::num::NonNeg;
19use mz_ore::str::StrExt;
20use mz_repr::CatalogItemId;
21use mz_sql_parser::ast::ConnectionOptionName::*;
22use mz_sql_parser::ast::display::AstDisplay;
23use mz_sql_parser::ast::{
24 ConnectionDefaultAwsPrivatelink, ConnectionOption, ConnectionOptionName, CreateConnectionType,
25 KafkaBroker, KafkaBrokerAwsPrivatelinkOption, KafkaBrokerAwsPrivatelinkOptionName,
26 KafkaBrokerTunnel,
27};
28use mz_ssh_util::keys::SshKeyPair;
29use mz_storage_types::connections::aws::{
30 AwsAssumeRole, AwsAuth, AwsConnection, AwsConnectionReference, AwsCredentials,
31};
32use mz_storage_types::connections::inline::ReferencedConnection;
33use mz_storage_types::connections::string_or_secret::StringOrSecret;
34use mz_storage_types::connections::{
35 AwsPrivatelink, AwsPrivatelinkConnection, CsrConnection, CsrConnectionHttpAuth,
36 IcebergCatalogConnection, IcebergCatalogImpl, IcebergCatalogType, KafkaConnection,
37 KafkaSaslConfig, KafkaTlsConfig, KafkaTopicOptions, MySqlConnection, MySqlSslMode,
38 PostgresConnection, RestIcebergCatalog, S3TablesRestIcebergCatalog, SqlServerConnectionDetails,
39 SshConnection, SshTunnel, TlsIdentity, Tunnel,
40};
41
42use crate::names::Aug;
43use crate::plan::statement::{Connection, ResolvedItemName};
44use crate::plan::with_options::{self};
45use crate::plan::{ConnectionDetails, PlanError, SshKey, StatementContext};
46use crate::session::vars::{self, ENABLE_AWS_MSK_IAM_AUTH};
47
48generate_extracted_config!(
49 ConnectionOption,
50 (AccessKeyId, StringOrSecret),
51 (AssumeRoleArn, String),
52 (AssumeRoleSessionName, String),
53 (AvailabilityZones, Vec<String>),
54 (AwsConnection, with_options::Object),
55 (AwsPrivatelink, ConnectionDefaultAwsPrivatelink<Aug>),
56 (Broker, Vec<KafkaBroker<Aug>>),
58 (Brokers, Vec<KafkaBroker<Aug>>),
59 (Credential, StringOrSecret),
60 (Database, String),
61 (Endpoint, String),
62 (Host, String),
63 (Password, with_options::Secret),
64 (Port, u16),
65 (ProgressTopic, String),
66 (ProgressTopicReplicationFactor, i32),
67 (PublicKey1, String),
68 (PublicKey2, String),
69 (Region, String),
70 (SaslMechanisms, String),
71 (SaslPassword, with_options::Secret),
72 (SaslUsername, StringOrSecret),
73 (Scope, String),
74 (SecretAccessKey, with_options::Secret),
75 (SecurityProtocol, String),
76 (ServiceName, String),
77 (SshTunnel, with_options::Object),
78 (SslCertificate, StringOrSecret),
79 (SslCertificateAuthority, StringOrSecret),
80 (SslKey, with_options::Secret),
81 (SslMode, String),
82 (SessionToken, StringOrSecret),
83 (CatalogType, IcebergCatalogType),
84 (Url, String),
85 (User, StringOrSecret),
86 (Warehouse, String)
87);
88
89generate_extracted_config!(
90 KafkaBrokerAwsPrivatelinkOption,
91 (AvailabilityZone, String),
92 (Port, u16)
93);
94
95pub(crate) const INALTERABLE_OPTIONS: &[ConnectionOptionName] =
97 &[ProgressTopic, ProgressTopicReplicationFactor];
98
99pub(crate) const MUTUALLY_EXCLUSIVE_SETS: &[&[ConnectionOptionName]] = &[&[Broker, Brokers]];
101
102pub(super) fn validate_options_per_connection_type(
103 t: CreateConnectionType,
104 mut options: BTreeSet<ConnectionOptionName>,
105) -> Result<(), PlanError> {
106 use mz_sql_parser::ast::ConnectionOptionName::*;
107 let permitted_options = match t {
108 CreateConnectionType::Aws => [
109 AccessKeyId,
110 SecretAccessKey,
111 SessionToken,
112 Endpoint,
113 Region,
114 AssumeRoleArn,
115 AssumeRoleSessionName,
116 ]
117 .as_slice(),
118 CreateConnectionType::AwsPrivatelink => &[AvailabilityZones, Port, ServiceName],
119 CreateConnectionType::Csr => &[
120 AwsPrivatelink,
121 Password,
122 Port,
123 SshTunnel,
124 SslCertificate,
125 SslCertificateAuthority,
126 SslKey,
127 Url,
128 User,
129 ],
130 CreateConnectionType::Kafka => &[
131 AwsConnection,
132 Broker,
133 Brokers,
134 ProgressTopic,
135 ProgressTopicReplicationFactor,
136 AwsPrivatelink,
137 SshTunnel,
138 SslKey,
139 SslCertificate,
140 SslCertificateAuthority,
141 SaslMechanisms,
142 SaslUsername,
143 SaslPassword,
144 SecurityProtocol,
145 ],
146 CreateConnectionType::Postgres => &[
147 AwsPrivatelink,
148 Database,
149 Host,
150 Password,
151 Port,
152 SshTunnel,
153 SslCertificate,
154 SslCertificateAuthority,
155 SslKey,
156 SslMode,
157 User,
158 ],
159 CreateConnectionType::Ssh => &[Host, Port, User, PublicKey1, PublicKey2],
160 CreateConnectionType::MySql => &[
161 AwsPrivatelink,
162 Host,
163 Password,
164 Port,
165 SshTunnel,
166 SslCertificate,
167 SslCertificateAuthority,
168 SslKey,
169 SslMode,
170 User,
171 AwsConnection,
172 ],
173 CreateConnectionType::SqlServer => &[
174 AwsPrivatelink,
175 Database,
176 Host,
177 Password,
178 Port,
179 SshTunnel,
180 SslCertificate,
181 SslCertificateAuthority,
182 SslKey,
183 SslMode,
184 User,
185 ],
186 CreateConnectionType::IcebergCatalog => &[
187 AwsConnection,
188 CatalogType,
189 Credential,
190 Scope,
191 Url,
192 Warehouse,
193 ],
194 };
195
196 for o in permitted_options {
197 options.remove(o);
198 }
199
200 if !options.is_empty() {
201 sql_bail!(
202 "{} connections do not support {} values",
203 t,
204 options.iter().join(", ")
205 )
206 }
207
208 Ok(())
209}
210
211impl ConnectionOptionExtracted {
212 pub(super) fn ensure_only_valid_options(
213 &self,
214 t: CreateConnectionType,
215 ) -> Result<(), PlanError> {
216 validate_options_per_connection_type(t, self.seen.clone())
217 }
218
219 pub fn try_into_connection_details(
220 self,
221 scx: &StatementContext,
222 connection_type: CreateConnectionType,
223 ) -> Result<ConnectionDetails, PlanError> {
224 self.ensure_only_valid_options(connection_type)?;
225
226 let connection: ConnectionDetails = match connection_type {
227 CreateConnectionType::Aws => {
228 let credentials = match (
229 self.access_key_id,
230 self.secret_access_key,
231 self.session_token,
232 ) {
233 (Some(access_key_id), Some(secret_access_key), session_token) => {
234 Some(AwsCredentials {
235 access_key_id,
236 secret_access_key: secret_access_key.into(),
237 session_token,
238 })
239 }
240 (None, None, None) => None,
241 _ => {
242 sql_bail!(
243 "must specify both ACCESS KEY ID and SECRET ACCESS KEY with optional SESSION TOKEN"
244 );
245 }
246 };
247
248 let assume_role = match (self.assume_role_arn, self.assume_role_session_name) {
249 (Some(arn), session_name) => Some(AwsAssumeRole { arn, session_name }),
250 (None, Some(_)) => {
251 sql_bail!(
252 "must specify ASSUME ROLE ARN with optional ASSUME ROLE SESSION NAME"
253 );
254 }
255 _ => None,
256 };
257
258 let auth = match (credentials, assume_role) {
259 (None, None) => sql_bail!(
260 "must specify either ASSUME ROLE ARN or ACCESS KEY ID and SECRET ACCESS KEY"
261 ),
262 (Some(credentials), None) => AwsAuth::Credentials(credentials),
263 (None, Some(assume_role)) => AwsAuth::AssumeRole(assume_role),
264 (Some(_), Some(_)) => {
265 sql_bail!("cannot specify both ACCESS KEY ID and ASSUME ROLE ARN");
266 }
267 };
268
269 ConnectionDetails::Aws(AwsConnection {
270 auth,
271 endpoint: match self.endpoint {
272 Some(endpoint) if !endpoint.is_empty() => Some(endpoint),
277 _ => None,
278 },
279 region: self.region,
280 })
281 }
282 CreateConnectionType::AwsPrivatelink => {
283 let connection = AwsPrivatelinkConnection {
284 service_name: self
285 .service_name
286 .ok_or_else(|| sql_err!("SERVICE NAME option is required"))?,
287 availability_zones: self
288 .availability_zones
289 .ok_or_else(|| sql_err!("AVAILABILITY ZONES option is required"))?,
290 };
291 if let Some(supported_azs) = scx.catalog.aws_privatelink_availability_zones() {
292 let mut unique_azs: BTreeSet<String> = BTreeSet::new();
293 let mut duplicate_azs: BTreeSet<String> = BTreeSet::new();
294 for connection_az in &connection.availability_zones {
296 if unique_azs.contains(connection_az) {
297 duplicate_azs.insert(connection_az.to_string());
298 } else {
299 unique_azs.insert(connection_az.to_string());
300 }
301 if !supported_azs.contains(connection_az) {
302 return Err(PlanError::InvalidPrivatelinkAvailabilityZone {
303 name: connection_az.to_string(),
304 supported_azs,
305 });
306 }
307 }
308 if duplicate_azs.len() > 0 {
309 return Err(PlanError::DuplicatePrivatelinkAvailabilityZone {
310 duplicate_azs,
311 });
312 }
313 }
314 ConnectionDetails::AwsPrivatelink(connection)
315 }
316 CreateConnectionType::Kafka => {
317 let (tls, sasl) = plan_kafka_security(scx, &self)?;
318
319 ConnectionDetails::Kafka(KafkaConnection {
320 brokers: self.get_brokers(scx)?,
321 default_tunnel: scx
322 .build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?,
323 progress_topic: self.progress_topic,
324 progress_topic_options: KafkaTopicOptions {
325 partition_count: Some(NonNeg::try_from(1).expect("1 is positive")),
329 replication_factor: self.progress_topic_replication_factor.map(|val| {
330 if val <= 0 {
331 Err(sql_err!("invalid CONNECTION: PROGRESS TOPIC REPLICATION FACTOR must be greater than 0"))?
332 }
333 NonNeg::try_from(val).map_err(|e| sql_err!("{e}"))
334 }).transpose()?,
335 topic_config: btreemap! {
336 "cleanup.policy".to_string() => "compact".to_string(),
337 },
338 },
339 options: BTreeMap::new(),
340 tls,
341 sasl,
342 })
343 }
344 CreateConnectionType::Csr => {
345 let url: reqwest::Url = match self.url {
346 Some(url) => url
347 .parse()
348 .map_err(|e| sql_err!("parsing schema registry url: {e}"))?,
349 None => sql_bail!("invalid CONNECTION: must specify URL"),
350 };
351 let _ = url
352 .host_str()
353 .ok_or_else(|| sql_err!("invalid CONNECTION: URL must specify domain name"))?;
354 if url.path() != "/" {
355 sql_bail!("invalid CONNECTION: URL must have an empty path");
356 }
357 let cert = self.ssl_certificate;
358 let key = self.ssl_key.map(|secret| secret.into());
359 let tls_identity = match (cert, key) {
360 (None, None) => None,
361 (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
362 _ => sql_bail!(
363 "invalid CONNECTION: reading from SSL-auth Confluent Schema Registry requires both SSL KEY and SSL CERTIFICATE"
364 ),
365 };
366 let http_auth = self.user.map(|username| CsrConnectionHttpAuth {
367 username,
368 password: self.password.map(|secret| secret.into()),
369 });
370
371 if let Some(privatelink) = self.aws_privatelink.as_ref() {
373 if privatelink.port.is_some() {
374 sql_bail!(
375 "invalid CONNECTION: CONFLUENT SCHEMA REGISTRY does not support PORT for AWS PRIVATELINK"
376 )
377 }
378 }
379 let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?;
380
381 ConnectionDetails::Csr(CsrConnection {
382 url,
383 tls_root_cert: self.ssl_certificate_authority,
384 tls_identity,
385 http_auth,
386 tunnel,
387 })
388 }
389 CreateConnectionType::Postgres => {
390 let cert = self.ssl_certificate;
391 let key = self.ssl_key.map(|secret| secret.into());
392 let tls_identity = match (cert, key) {
393 (None, None) => None,
394 (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
395 _ => sql_bail!(
396 "invalid CONNECTION: both SSL KEY and SSL CERTIFICATE are required"
397 ),
398 };
399 let tls_mode = match self.ssl_mode.as_ref().map(|m| m.as_str()) {
400 None | Some("disable") => tokio_postgres::config::SslMode::Disable,
401 Some("require") | Some("required") => tokio_postgres::config::SslMode::Require,
404 Some("verify_ca") | Some("verify-ca") => {
405 tokio_postgres::config::SslMode::VerifyCa
406 }
407 Some("verify_full") | Some("verify-full") => {
408 tokio_postgres::config::SslMode::VerifyFull
409 }
410 Some(m) => sql_bail!("invalid CONNECTION: unknown SSL MODE {}", m.quoted()),
411 };
412
413 if let Some(privatelink) = self.aws_privatelink.as_ref() {
415 if privatelink.port.is_some() {
416 sql_bail!(
417 "invalid CONNECTION: POSTGRES does not support PORT for AWS PRIVATELINK"
418 )
419 }
420 }
421 let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?;
422
423 ConnectionDetails::Postgres(PostgresConnection {
424 database: self
425 .database
426 .ok_or_else(|| sql_err!("DATABASE option is required"))?,
427 password: self.password.map(|password| password.into()),
428 host: self
429 .host
430 .ok_or_else(|| sql_err!("HOST option is required"))?,
431 port: self.port.unwrap_or(5432_u16),
432 tunnel,
433 tls_mode,
434 tls_root_cert: self.ssl_certificate_authority,
435 tls_identity,
436 user: self
437 .user
438 .ok_or_else(|| sql_err!("USER option is required"))?,
439 })
440 }
441 CreateConnectionType::Ssh => {
442 let ensure_key = |public_key| match public_key {
443 Some(public_key) => Ok::<_, anyhow::Error>(SshKey::PublicOnly(public_key)),
444 None => {
445 let key = SshKeyPair::new().context("creating SSH key")?;
446 Ok(SshKey::Both(key))
447 }
448 };
449 ConnectionDetails::Ssh {
450 connection: SshConnection {
451 host: self
452 .host
453 .ok_or_else(|| sql_err!("HOST option is required"))?,
454 port: self.port.unwrap_or(22_u16),
455 user: match self
456 .user
457 .ok_or_else(|| sql_err!("USER option is required"))?
458 {
459 StringOrSecret::String(user) => user,
460 StringOrSecret::Secret(_) => {
461 sql_bail!(
462 "SSH connections do not support supplying USER value as SECRET"
463 )
464 }
465 },
466 },
467 key_1: ensure_key(self.public_key1)?,
468 key_2: ensure_key(self.public_key2)?,
469 }
470 }
471 CreateConnectionType::MySql => {
472 let aws_connection = get_aws_connection_reference(scx, &self)?;
473 if aws_connection.is_some() && self.password.is_some() {
474 sql_bail!(
475 "invalid CONNECTION: AWS IAM authentication is not supported with password"
476 );
477 }
478
479 let cert = self.ssl_certificate;
480 let key = self.ssl_key.map(|secret| secret.into());
481 let tls_identity = match (cert, key) {
482 (None, None) => None,
483 (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
484 _ => sql_bail!(
485 "invalid CONNECTION: both SSL KEY and SSL CERTIFICATE are required"
486 ),
487 };
488 let tls_mode = match self
491 .ssl_mode
492 .map(|f| f.to_uppercase())
493 .as_ref()
494 .map(|m| m.as_str())
495 {
496 None | Some("DISABLED") => {
497 if aws_connection.is_some() {
498 sql_bail!(
499 "invalid CONNECTION: AWS IAM authentication requires SSL to be enabled"
500 )
501 }
502 MySqlSslMode::Disabled
503 }
504 Some("REQUIRED") | Some("REQUIRE") => MySqlSslMode::Required,
507 Some("VERIFY_CA") | Some("VERIFY-CA") => MySqlSslMode::VerifyCa,
508 Some("VERIFY_IDENTITY") | Some("VERIFY-IDENTITY") => {
509 MySqlSslMode::VerifyIdentity
510 }
511 Some(m) => sql_bail!("invalid CONNECTION: unknown SSL MODE {}", m.quoted()),
512 };
513
514 if let Some(privatelink) = self.aws_privatelink.as_ref() {
516 if privatelink.port.is_some() {
517 sql_bail!(
518 "invalid CONNECTION: MYSQL does not support PORT for AWS PRIVATELINK"
519 )
520 }
521 }
522 let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?;
523
524 ConnectionDetails::MySql(MySqlConnection {
525 password: self.password.map(|password| password.into()),
526 host: self
527 .host
528 .ok_or_else(|| sql_err!("HOST option is required"))?,
529 port: self.port.unwrap_or(3306_u16),
530 tunnel,
531 tls_mode,
532 tls_root_cert: self.ssl_certificate_authority,
533 tls_identity,
534 user: self
535 .user
536 .ok_or_else(|| sql_err!("USER option is required"))?,
537 aws_connection,
538 })
539 }
540 CreateConnectionType::SqlServer => {
541 scx.require_feature_flag(&vars::ENABLE_SQL_SERVER_SOURCE)?;
542
543 let aws_connection = get_aws_connection_reference(scx, &self)?;
544 if aws_connection.is_some() && self.password.is_some() {
545 sql_bail!(
546 "invalid CONNECTION: AWS IAM authentication is not supported with password"
547 );
548 }
549
550 let (encryption, certificate_validation_policy) = match self
551 .ssl_mode
552 .map(|mode| mode.to_uppercase())
553 .as_ref()
554 .map(|mode| mode.as_str())
555 {
556 None | Some("DISABLED") => (
557 mz_sql_server_util::config::EncryptionLevel::None,
558 mz_sql_server_util::config::CertificateValidationPolicy::TrustAll,
559 ),
560 Some("REQUIRED") => (
561 mz_sql_server_util::config::EncryptionLevel::Required,
562 mz_sql_server_util::config::CertificateValidationPolicy::TrustAll,
563 ),
564 Some("VERIFY") => (
565 mz_sql_server_util::config::EncryptionLevel::Required,
566 mz_sql_server_util::config::CertificateValidationPolicy::VerifySystem,
567 ),
568 Some("VERIFY_CA") => {
569 if self.ssl_certificate_authority.is_none() {
570 sql_bail!(
571 "invalid CONNECTION: SSL MODE 'verify_ca' requires SSL CERTIFICATE AUTHORITY"
572 );
573 }
574 (
575 mz_sql_server_util::config::EncryptionLevel::Required,
576 mz_sql_server_util::config::CertificateValidationPolicy::VerifyCA,
577 )
578 }
579 Some(mode) => {
580 sql_bail!("invalid CONNECTION: unknown SSL MODE {}", mode.quoted())
581 }
582 };
583
584 if let Some(privatelink) = self.aws_privatelink.as_ref() {
585 if privatelink.port.is_some() {
586 sql_bail!(
587 "invalid CONNECTION: SQL SERVER does not support PORT for AWS PRIVATELINK"
588 )
589 }
590 }
591
592 let port = self.port.unwrap_or(1433_u16);
596 let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?;
597
598 ConnectionDetails::SqlServer(SqlServerConnectionDetails {
599 host: self
600 .host
601 .ok_or_else(|| sql_err!("HOST option is required"))?,
602 port,
603 database: self
604 .database
605 .ok_or_else(|| sql_err!("DATABASE option is required"))?,
606 user: self
607 .user
608 .ok_or_else(|| sql_err!("USER option is required"))?,
609 password: self
610 .password
611 .ok_or_else(|| sql_err!("PASSWORD option is required"))
612 .map(|pass| pass.into())?,
613 tunnel,
614 encryption,
615 certificate_validation_policy,
616 tls_root_cert: self.ssl_certificate_authority,
617 })
618 }
619 CreateConnectionType::IcebergCatalog => {
620 let catalog_type = self.catalog_type.clone().ok_or_else(|| {
621 sql_err!("invalid CONNECTION: ICEBERG connections must specify CATALOG TYPE")
622 })?;
623
624 let uri: reqwest::Url = match &self.url {
625 Some(url) => url
626 .parse()
627 .map_err(|e| sql_err!("parsing Iceberg catalog url: {e}"))?,
628 None => sql_bail!("invalid CONNECTION: must specify URL"),
629 };
630
631 let warehouse = self.warehouse.clone();
632 let credential = self.credential.clone();
633 let aws_connection = get_aws_connection_reference(scx, &self)?;
634
635 let catalog = match catalog_type {
636 IcebergCatalogType::S3TablesRest => {
637 let Some(warehouse) = warehouse else {
638 sql_bail!(
639 "invalid CONNECTION: ICEBERG s3tablesrest connections must specify WAREHOUSE"
640 );
641 };
642 let Some(aws_connection) = aws_connection else {
643 sql_bail!(
644 "invalid CONNECTION: ICEBERG s3tablesrest connections require an AWS connection"
645 );
646 };
647
648 IcebergCatalogImpl::S3TablesRest(S3TablesRestIcebergCatalog {
649 aws_connection,
650 warehouse,
651 })
652 }
653 IcebergCatalogType::Rest => {
654 let Some(credential) = credential else {
655 sql_bail!(
656 "invalid CONNECTION: ICEBERG rest connections require a CREDENTIAL"
657 );
658 };
659
660 IcebergCatalogImpl::Rest(RestIcebergCatalog {
661 credential,
662 scope: self.scope.clone(),
663 warehouse,
664 })
665 }
666 };
667
668 ConnectionDetails::IcebergCatalog(IcebergCatalogConnection { catalog, uri })
669 }
670 };
671
672 Ok(connection)
673 }
674
675 pub fn get_brokers(
676 &self,
677 scx: &StatementContext,
678 ) -> Result<Vec<mz_storage_types::connections::KafkaBroker<ReferencedConnection>>, PlanError>
679 {
680 let mut brokers = match (&self.broker, &self.brokers, &self.aws_privatelink) {
681 (Some(v), None, None) => v.to_vec(),
682 (None, Some(v), None) => v.to_vec(),
683 (None, None, Some(_)) => vec![],
684 (None, None, None) => {
685 sql_bail!("invalid CONNECTION: must set one of BROKER, BROKERS, or AWS PRIVATELINK")
686 }
687 _ => sql_bail!(
688 "invalid CONNECTION: can only set one of BROKER, BROKERS, or AWS PRIVATELINK"
689 ),
690 };
691
692 let mut out = vec![];
696 for broker in &mut brokers {
697 if broker.address.contains(',') {
698 sql_bail!("invalid CONNECTION: cannot specify multiple Kafka broker addresses in one string.\n\n
699Instead, specify BROKERS using multiple strings, e.g. BROKERS ('kafka:9092', 'kafka:9093')");
700 }
701
702 let tunnel = match &broker.tunnel {
703 KafkaBrokerTunnel::Direct => Tunnel::Direct,
704 KafkaBrokerTunnel::AwsPrivatelink(aws_privatelink) => {
705 let KafkaBrokerAwsPrivatelinkOptionExtracted {
706 availability_zone,
707 port,
708 seen: _,
709 } = KafkaBrokerAwsPrivatelinkOptionExtracted::try_from(
710 aws_privatelink.options.clone(),
711 )?;
712
713 let id = match &aws_privatelink.connection {
714 ResolvedItemName::Item { id, .. } => id,
715 _ => sql_bail!(
716 "internal error: Kafka PrivateLink connection was not resolved"
717 ),
718 };
719 let entry = scx.catalog.get_item(id);
720 match entry.connection()? {
721 Connection::AwsPrivatelink(connection) => {
722 if let Some(az) = &availability_zone {
723 if !connection.availability_zones.contains(az) {
724 sql_bail!(
725 "AWS PrivateLink availability zone {} does not match any of the \
726 availability zones on the AWS PrivateLink connection {}",
727 az.quoted(),
728 scx.catalog
729 .resolve_full_name(entry.name())
730 .to_string()
731 .quoted()
732 )
733 }
734 }
735 Tunnel::AwsPrivatelink(AwsPrivatelink {
736 connection_id: *id,
737 availability_zone,
738 port,
739 })
740 }
741 _ => {
742 sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item)
743 }
744 }
745 }
746 KafkaBrokerTunnel::SshTunnel(ssh) => {
747 let id = match &ssh {
748 ResolvedItemName::Item { id, .. } => id,
749 _ => sql_bail!(
750 "internal error: Kafka SSH tunnel connection was not resolved"
751 ),
752 };
753 let ssh_tunnel = scx.catalog.get_item(id);
754 match ssh_tunnel.connection()? {
755 Connection::Ssh(_connection) => Tunnel::Ssh(SshTunnel {
756 connection_id: *id,
757 connection: *id,
758 }),
759 _ => {
760 sql_bail!("{} is not an SSH connection", ssh_tunnel.name().item)
761 }
762 }
763 }
764 };
765
766 out.push(mz_storage_types::connections::KafkaBroker {
767 address: broker.address.clone(),
768 tunnel,
769 });
770 }
771
772 Ok(out)
773 }
774}
775
776fn get_aws_connection_reference(
777 scx: &StatementContext,
778 conn_options: &ConnectionOptionExtracted,
779) -> Result<Option<AwsConnectionReference<ReferencedConnection>>, PlanError> {
780 let Some(aws_connection_id) = conn_options.aws_connection else {
781 return Ok(None);
782 };
783
784 let id = CatalogItemId::from(aws_connection_id);
785 let item = scx.catalog.get_item(&id);
786 Ok(match item.connection()? {
787 Connection::Aws(_) => Some(AwsConnectionReference {
788 connection_id: id,
789 connection: id,
790 }),
791 _ => sql_bail!("{} is not an AWS connection", item.name().item),
792 })
793}
794
795fn plan_kafka_security(
796 scx: &StatementContext,
797 v: &ConnectionOptionExtracted,
798) -> Result<
799 (
800 Option<KafkaTlsConfig>,
801 Option<KafkaSaslConfig<ReferencedConnection>>,
802 ),
803 PlanError,
804> {
805 const SASL_CONFIGS: [ConnectionOptionName; 4] = [
806 ConnectionOptionName::AwsConnection,
807 ConnectionOptionName::SaslMechanisms,
808 ConnectionOptionName::SaslUsername,
809 ConnectionOptionName::SaslPassword,
810 ];
811
812 const ALL_CONFIGS: [ConnectionOptionName; 7] = concat_arrays!(
813 [
814 ConnectionOptionName::SslKey,
815 ConnectionOptionName::SslCertificate,
816 ConnectionOptionName::SslCertificateAuthority,
817 ],
818 SASL_CONFIGS
819 );
820
821 enum SecurityProtocol {
822 Plaintext,
823 Ssl,
824 SaslPlaintext,
825 SaslSsl,
826 }
827
828 let security_protocol = v.security_protocol.as_ref().map(|v| v.to_uppercase());
829 let security_protocol = match security_protocol.as_deref() {
830 Some("PLAINTEXT") => SecurityProtocol::Plaintext,
831 Some("SSL") => SecurityProtocol::Ssl,
832 Some("SASL_PLAINTEXT") => SecurityProtocol::SaslPlaintext,
833 Some("SASL_SSL") => SecurityProtocol::SaslSsl,
834 Some(p) => sql_bail!("unknown security protocol: {}", p),
835 None if SASL_CONFIGS.iter().any(|c| v.seen.contains(c)) => SecurityProtocol::SaslSsl,
840 None => SecurityProtocol::Ssl,
841 };
842
843 let mut outstanding = ALL_CONFIGS
844 .into_iter()
845 .filter(|c| v.seen.contains(c))
846 .collect::<BTreeSet<ConnectionOptionName>>();
847
848 let tls = match security_protocol {
849 SecurityProtocol::Ssl | SecurityProtocol::SaslSsl => {
850 outstanding.remove(&ConnectionOptionName::SslCertificate);
851 let identity = match &v.ssl_certificate {
852 None => None,
853 Some(cert) => {
854 outstanding.remove(&ConnectionOptionName::SslKey);
855 let Some(key) = &v.ssl_key else {
856 sql_bail!("SSL KEY must be specified with SSL CERTIFICATE");
857 };
858 Some(TlsIdentity {
859 cert: cert.clone(),
860 key: (*key).into(),
861 })
862 }
863 };
864 outstanding.remove(&ConnectionOptionName::SslCertificateAuthority);
865 Some(KafkaTlsConfig {
866 identity,
867 root_cert: v.ssl_certificate_authority.clone(),
868 })
869 }
870 _ => None,
871 };
872
873 let sasl = match security_protocol {
874 SecurityProtocol::SaslPlaintext | SecurityProtocol::SaslSsl => {
875 outstanding.remove(&ConnectionOptionName::AwsConnection);
876 match get_aws_connection_reference(scx, v)? {
877 Some(aws) => {
878 scx.require_feature_flag(&ENABLE_AWS_MSK_IAM_AUTH)?;
879 Some(KafkaSaslConfig {
880 mechanism: "OAUTHBEARER".into(),
881 username: "".into(),
882 password: None,
883 aws: Some(aws),
884 })
885 }
886 None => {
887 outstanding.remove(&ConnectionOptionName::SaslMechanisms);
888 outstanding.remove(&ConnectionOptionName::SaslUsername);
889 outstanding.remove(&ConnectionOptionName::SaslPassword);
890 let Some(mechanism) = &v.sasl_mechanisms else {
893 sql_bail!("SASL MECHANISMS must be specified");
894 };
895 let Some(username) = &v.sasl_username else {
896 sql_bail!("SASL USERNAME must be specified");
897 };
898 let Some(password) = &v.sasl_password else {
899 sql_bail!("SASL PASSWORD must be specified");
900 };
901 Some(KafkaSaslConfig {
902 mechanism: mechanism.to_uppercase(),
913 username: username.clone(),
914 password: Some((*password).into()),
915 aws: None,
916 })
917 }
918 }
919 }
920 _ => None,
921 };
922
923 if let Some(outstanding) = outstanding.first() {
924 sql_bail!("option {outstanding} not supported with this configuration");
925 }
926
927 Ok((tls, sasl))
928}