1use std::collections::{BTreeMap, BTreeSet};
13
14use anyhow::Context;
15use array_concat::concat_arrays;
16use itertools::Itertools;
17use maplit::btreemap;
18use mz_ore::num::NonNeg;
19use mz_ore::str::StrExt;
20use mz_postgres_util::tunnel::PostgresFlavor;
21use mz_repr::CatalogItemId;
22use mz_sql_parser::ast::ConnectionOptionName::*;
23use mz_sql_parser::ast::display::AstDisplay;
24use mz_sql_parser::ast::{
25 ConnectionDefaultAwsPrivatelink, ConnectionOption, ConnectionOptionName, CreateConnectionType,
26 KafkaBroker, KafkaBrokerAwsPrivatelinkOption, KafkaBrokerAwsPrivatelinkOptionName,
27 KafkaBrokerTunnel,
28};
29use mz_ssh_util::keys::SshKeyPair;
30use mz_storage_types::connections::aws::{
31 AwsAssumeRole, AwsAuth, AwsConnection, AwsConnectionReference, AwsCredentials,
32};
33use mz_storage_types::connections::inline::ReferencedConnection;
34use mz_storage_types::connections::string_or_secret::StringOrSecret;
35use mz_storage_types::connections::{
36 AwsPrivatelink, AwsPrivatelinkConnection, CsrConnection, CsrConnectionHttpAuth,
37 KafkaConnection, KafkaSaslConfig, KafkaTlsConfig, KafkaTopicOptions, MySqlConnection,
38 MySqlSslMode, PostgresConnection, SqlServerConnectionDetails, SshConnection, SshTunnel,
39 TlsIdentity, Tunnel,
40};
41
42use crate::names::Aug;
43use crate::plan::statement::{Connection, ResolvedItemName};
44use crate::plan::with_options;
45use crate::plan::{ConnectionDetails, PlanError, SshKey, StatementContext};
46use crate::session::vars::{self, ENABLE_AWS_MSK_IAM_AUTH};
47
48generate_extracted_config!(
49 ConnectionOption,
50 (AccessKeyId, StringOrSecret),
51 (AssumeRoleArn, String),
52 (AssumeRoleSessionName, String),
53 (AvailabilityZones, Vec<String>),
54 (AwsConnection, with_options::Object),
55 (AwsPrivatelink, ConnectionDefaultAwsPrivatelink<Aug>),
56 (Broker, Vec<KafkaBroker<Aug>>),
58 (Brokers, Vec<KafkaBroker<Aug>>),
59 (Database, String),
60 (Endpoint, String),
61 (Host, String),
62 (Password, with_options::Secret),
63 (Port, u16),
64 (ProgressTopic, String),
65 (ProgressTopicReplicationFactor, i32),
66 (PublicKey1, String),
67 (PublicKey2, String),
68 (Region, String),
69 (SaslMechanisms, String),
70 (SaslPassword, with_options::Secret),
71 (SaslUsername, StringOrSecret),
72 (SecretAccessKey, with_options::Secret),
73 (SecurityProtocol, String),
74 (ServiceName, String),
75 (SshTunnel, with_options::Object),
76 (SslCertificate, StringOrSecret),
77 (SslCertificateAuthority, StringOrSecret),
78 (SslKey, with_options::Secret),
79 (SslMode, String),
80 (SessionToken, StringOrSecret),
81 (Url, String),
82 (User, StringOrSecret)
83);
84
85generate_extracted_config!(
86 KafkaBrokerAwsPrivatelinkOption,
87 (AvailabilityZone, String),
88 (Port, u16)
89);
90
91pub(crate) const INALTERABLE_OPTIONS: &[ConnectionOptionName] =
93 &[ProgressTopic, ProgressTopicReplicationFactor];
94
95pub(crate) const MUTUALLY_EXCLUSIVE_SETS: &[&[ConnectionOptionName]] = &[&[Broker, Brokers]];
97
98pub(super) fn validate_options_per_connection_type(
99 t: CreateConnectionType,
100 mut options: BTreeSet<ConnectionOptionName>,
101) -> Result<(), PlanError> {
102 use mz_sql_parser::ast::ConnectionOptionName::*;
103 let permitted_options = match t {
104 CreateConnectionType::Aws => [
105 AccessKeyId,
106 SecretAccessKey,
107 SessionToken,
108 Endpoint,
109 Region,
110 AssumeRoleArn,
111 AssumeRoleSessionName,
112 ]
113 .as_slice(),
114 CreateConnectionType::AwsPrivatelink => &[AvailabilityZones, Port, ServiceName],
115 CreateConnectionType::Csr => &[
116 AwsPrivatelink,
117 Password,
118 Port,
119 SshTunnel,
120 SslCertificate,
121 SslCertificateAuthority,
122 SslKey,
123 Url,
124 User,
125 ],
126 CreateConnectionType::Kafka => &[
127 AwsConnection,
128 Broker,
129 Brokers,
130 ProgressTopic,
131 ProgressTopicReplicationFactor,
132 AwsPrivatelink,
133 SshTunnel,
134 SslKey,
135 SslCertificate,
136 SslCertificateAuthority,
137 SaslMechanisms,
138 SaslUsername,
139 SaslPassword,
140 SecurityProtocol,
141 ],
142 CreateConnectionType::Postgres | CreateConnectionType::Yugabyte => &[
143 AwsPrivatelink,
144 Database,
145 Host,
146 Password,
147 Port,
148 SshTunnel,
149 SslCertificate,
150 SslCertificateAuthority,
151 SslKey,
152 SslMode,
153 User,
154 ],
155 CreateConnectionType::Ssh => &[Host, Port, User, PublicKey1, PublicKey2],
156 CreateConnectionType::MySql => &[
157 AwsPrivatelink,
158 Host,
159 Password,
160 Port,
161 SshTunnel,
162 SslCertificate,
163 SslCertificateAuthority,
164 SslKey,
165 SslMode,
166 User,
167 AwsConnection,
168 ],
169 CreateConnectionType::SqlServer => &[
170 AwsPrivatelink,
171 Database,
172 Host,
173 Password,
174 Port,
175 SshTunnel,
176 SslCertificate,
177 SslCertificateAuthority,
178 SslKey,
179 SslMode,
180 User,
181 ],
182 };
183
184 for o in permitted_options {
185 options.remove(o);
186 }
187
188 if !options.is_empty() {
189 sql_bail!(
190 "{} connections do not support {} values",
191 t,
192 options.iter().join(", ")
193 )
194 }
195
196 Ok(())
197}
198
199impl ConnectionOptionExtracted {
200 pub(super) fn ensure_only_valid_options(
201 &self,
202 t: CreateConnectionType,
203 ) -> Result<(), PlanError> {
204 validate_options_per_connection_type(t, self.seen.clone())
205 }
206
207 pub fn try_into_connection_details(
208 self,
209 scx: &StatementContext,
210 connection_type: CreateConnectionType,
211 ) -> Result<ConnectionDetails, PlanError> {
212 self.ensure_only_valid_options(connection_type)?;
213
214 let connection: ConnectionDetails = match connection_type {
215 CreateConnectionType::Aws => {
216 let credentials = match (
217 self.access_key_id,
218 self.secret_access_key,
219 self.session_token,
220 ) {
221 (Some(access_key_id), Some(secret_access_key), session_token) => {
222 Some(AwsCredentials {
223 access_key_id,
224 secret_access_key: secret_access_key.into(),
225 session_token,
226 })
227 }
228 (None, None, None) => None,
229 _ => {
230 sql_bail!(
231 "must specify both ACCESS KEY ID and SECRET ACCESS KEY with optional SESSION TOKEN"
232 );
233 }
234 };
235
236 let assume_role = match (self.assume_role_arn, self.assume_role_session_name) {
237 (Some(arn), session_name) => Some(AwsAssumeRole { arn, session_name }),
238 (None, Some(_)) => {
239 sql_bail!(
240 "must specify ASSUME ROLE ARN with optional ASSUME ROLE SESSION NAME"
241 );
242 }
243 _ => None,
244 };
245
246 let auth = match (credentials, assume_role) {
247 (None, None) => sql_bail!(
248 "must specify either ASSUME ROLE ARN or ACCESS KEY ID and SECRET ACCESS KEY"
249 ),
250 (Some(credentials), None) => AwsAuth::Credentials(credentials),
251 (None, Some(assume_role)) => AwsAuth::AssumeRole(assume_role),
252 (Some(_), Some(_)) => {
253 sql_bail!("cannot specify both ACCESS KEY ID and ASSUME ROLE ARN");
254 }
255 };
256
257 ConnectionDetails::Aws(AwsConnection {
258 auth,
259 endpoint: match self.endpoint {
260 Some(endpoint) if !endpoint.is_empty() => Some(endpoint),
265 _ => None,
266 },
267 region: self.region,
268 })
269 }
270 CreateConnectionType::AwsPrivatelink => {
271 let connection = AwsPrivatelinkConnection {
272 service_name: self
273 .service_name
274 .ok_or_else(|| sql_err!("SERVICE NAME option is required"))?,
275 availability_zones: self
276 .availability_zones
277 .ok_or_else(|| sql_err!("AVAILABILITY ZONES option is required"))?,
278 };
279 if let Some(supported_azs) = scx.catalog.aws_privatelink_availability_zones() {
280 let mut unique_azs: BTreeSet<String> = BTreeSet::new();
281 let mut duplicate_azs: BTreeSet<String> = BTreeSet::new();
282 for connection_az in &connection.availability_zones {
284 if unique_azs.contains(connection_az) {
285 duplicate_azs.insert(connection_az.to_string());
286 } else {
287 unique_azs.insert(connection_az.to_string());
288 }
289 if !supported_azs.contains(connection_az) {
290 return Err(PlanError::InvalidPrivatelinkAvailabilityZone {
291 name: connection_az.to_string(),
292 supported_azs,
293 });
294 }
295 }
296 if duplicate_azs.len() > 0 {
297 return Err(PlanError::DuplicatePrivatelinkAvailabilityZone {
298 duplicate_azs,
299 });
300 }
301 }
302 ConnectionDetails::AwsPrivatelink(connection)
303 }
304 CreateConnectionType::Kafka => {
305 let (tls, sasl) = plan_kafka_security(scx, &self)?;
306
307 ConnectionDetails::Kafka(KafkaConnection {
308 brokers: self.get_brokers(scx)?,
309 default_tunnel: scx
310 .build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?,
311 progress_topic: self.progress_topic,
312 progress_topic_options: KafkaTopicOptions {
313 partition_count: Some(NonNeg::try_from(1).expect("1 is positive")),
317 replication_factor: self.progress_topic_replication_factor.map(|val| {
318 if val <= 0 {
319 Err(sql_err!("invalid CONNECTION: PROGRESS TOPIC REPLICATION FACTOR must be greater than 0"))?
320 }
321 NonNeg::try_from(val).map_err(|e| sql_err!("{e}"))
322 }).transpose()?,
323 topic_config: btreemap! {
324 "cleanup.policy".to_string() => "compact".to_string(),
325 },
326 },
327 options: BTreeMap::new(),
328 tls,
329 sasl,
330 })
331 }
332 CreateConnectionType::Csr => {
333 let url: reqwest::Url = match self.url {
334 Some(url) => url
335 .parse()
336 .map_err(|e| sql_err!("parsing schema registry url: {e}"))?,
337 None => sql_bail!("invalid CONNECTION: must specify URL"),
338 };
339 let _ = url
340 .host_str()
341 .ok_or_else(|| sql_err!("invalid CONNECTION: URL must specify domain name"))?;
342 if url.path() != "/" {
343 sql_bail!("invalid CONNECTION: URL must have an empty path");
344 }
345 let cert = self.ssl_certificate;
346 let key = self.ssl_key.map(|secret| secret.into());
347 let tls_identity = match (cert, key) {
348 (None, None) => None,
349 (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
350 _ => sql_bail!(
351 "invalid CONNECTION: reading from SSL-auth Confluent Schema Registry requires both SSL KEY and SSL CERTIFICATE"
352 ),
353 };
354 let http_auth = self.user.map(|username| CsrConnectionHttpAuth {
355 username,
356 password: self.password.map(|secret| secret.into()),
357 });
358
359 if let Some(privatelink) = self.aws_privatelink.as_ref() {
361 if privatelink.port.is_some() {
362 sql_bail!(
363 "invalid CONNECTION: PORT in AWS PRIVATELINK is only supported for kafka"
364 )
365 }
366 }
367 let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?;
368
369 ConnectionDetails::Csr(CsrConnection {
370 url,
371 tls_root_cert: self.ssl_certificate_authority,
372 tls_identity,
373 http_auth,
374 tunnel,
375 })
376 }
377 CreateConnectionType::Postgres | CreateConnectionType::Yugabyte => {
378 if matches!(connection_type, CreateConnectionType::Yugabyte) {
379 scx.require_feature_flag(&vars::ENABLE_YUGABYTE_CONNECTION)?;
380 }
381
382 let cert = self.ssl_certificate;
383 let key = self.ssl_key.map(|secret| secret.into());
384 let tls_identity = match (cert, key) {
385 (None, None) => None,
386 (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
387 _ => sql_bail!(
388 "invalid CONNECTION: both SSL KEY and SSL CERTIFICATE are required"
389 ),
390 };
391 let tls_mode = match self.ssl_mode.as_ref().map(|m| m.as_str()) {
392 None | Some("disable") => tokio_postgres::config::SslMode::Disable,
393 Some("require") | Some("required") => tokio_postgres::config::SslMode::Require,
396 Some("verify_ca") | Some("verify-ca") => {
397 tokio_postgres::config::SslMode::VerifyCa
398 }
399 Some("verify_full") | Some("verify-full") => {
400 tokio_postgres::config::SslMode::VerifyFull
401 }
402 Some(m) => sql_bail!("invalid CONNECTION: unknown SSL MODE {}", m.quoted()),
403 };
404
405 if let Some(privatelink) = self.aws_privatelink.as_ref() {
407 if privatelink.port.is_some() {
408 sql_bail!(
409 "invalid CONNECTION: PORT in AWS PRIVATELINK is only supported for kafka"
410 )
411 }
412 }
413 let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?;
414
415 ConnectionDetails::Postgres(PostgresConnection {
416 database: self
417 .database
418 .ok_or_else(|| sql_err!("DATABASE option is required"))?,
419 password: self.password.map(|password| password.into()),
420 host: self
421 .host
422 .ok_or_else(|| sql_err!("HOST option is required"))?,
423 port: self.port.unwrap_or(5432_u16),
424 tunnel,
425 tls_mode,
426 tls_root_cert: self.ssl_certificate_authority,
427 tls_identity,
428 user: self
429 .user
430 .ok_or_else(|| sql_err!("USER option is required"))?,
431 flavor: match connection_type {
432 CreateConnectionType::Postgres => PostgresFlavor::Vanilla,
433 CreateConnectionType::Yugabyte => PostgresFlavor::Yugabyte,
434 _ => unreachable!(),
435 },
436 })
437 }
438 CreateConnectionType::Ssh => {
439 let ensure_key = |public_key| match public_key {
440 Some(public_key) => Ok::<_, anyhow::Error>(SshKey::PublicOnly(public_key)),
441 None => {
442 let key = SshKeyPair::new().context("creating SSH key")?;
443 Ok(SshKey::Both(key))
444 }
445 };
446 ConnectionDetails::Ssh {
447 connection: SshConnection {
448 host: self
449 .host
450 .ok_or_else(|| sql_err!("HOST option is required"))?,
451 port: self.port.unwrap_or(22_u16),
452 user: match self
453 .user
454 .ok_or_else(|| sql_err!("USER option is required"))?
455 {
456 StringOrSecret::String(user) => user,
457 StringOrSecret::Secret(_) => {
458 sql_bail!(
459 "SSH connections do not support supplying USER value as SECRET"
460 )
461 }
462 },
463 },
464 key_1: ensure_key(self.public_key1)?,
465 key_2: ensure_key(self.public_key2)?,
466 }
467 }
468 CreateConnectionType::MySql => {
469 let aws_connection = get_aws_connection_reference(scx, &self)?;
470 if aws_connection.is_some() && self.password.is_some() {
471 sql_bail!(
472 "invalid CONNECTION: AWS IAM authentication is not supported with password"
473 );
474 }
475
476 let cert = self.ssl_certificate;
477 let key = self.ssl_key.map(|secret| secret.into());
478 let tls_identity = match (cert, key) {
479 (None, None) => None,
480 (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
481 _ => sql_bail!(
482 "invalid CONNECTION: both SSL KEY and SSL CERTIFICATE are required"
483 ),
484 };
485 let tls_mode = match self
488 .ssl_mode
489 .map(|f| f.to_uppercase())
490 .as_ref()
491 .map(|m| m.as_str())
492 {
493 None | Some("DISABLED") => {
494 if aws_connection.is_some() {
495 sql_bail!(
496 "invalid CONNECTION: AWS IAM authentication requires SSL to be enabled"
497 )
498 }
499 MySqlSslMode::Disabled
500 }
501 Some("REQUIRED") | Some("REQUIRE") => MySqlSslMode::Required,
504 Some("VERIFY_CA") | Some("VERIFY-CA") => MySqlSslMode::VerifyCa,
505 Some("VERIFY_IDENTITY") | Some("VERIFY-IDENTITY") => {
506 MySqlSslMode::VerifyIdentity
507 }
508 Some(m) => sql_bail!("invalid CONNECTION: unknown SSL MODE {}", m.quoted()),
509 };
510
511 if let Some(privatelink) = self.aws_privatelink.as_ref() {
513 if privatelink.port.is_some() {
514 sql_bail!(
515 "invalid CONNECTION: PORT in AWS PRIVATELINK is only supported for kafka"
516 )
517 }
518 }
519 let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?;
520
521 ConnectionDetails::MySql(MySqlConnection {
522 password: self.password.map(|password| password.into()),
523 host: self
524 .host
525 .ok_or_else(|| sql_err!("HOST option is required"))?,
526 port: self.port.unwrap_or(3306_u16),
527 tunnel,
528 tls_mode,
529 tls_root_cert: self.ssl_certificate_authority,
530 tls_identity,
531 user: self
532 .user
533 .ok_or_else(|| sql_err!("USER option is required"))?,
534 aws_connection,
535 })
536 }
537 CreateConnectionType::SqlServer => {
538 scx.require_feature_flag(&vars::ENABLE_SQL_SERVER_SOURCE)?;
539
540 let aws_connection = get_aws_connection_reference(scx, &self)?;
541 if aws_connection.is_some() {
544 return Err(PlanError::Unsupported {
545 feature: "AWS CONNECTION with SQL Server".to_string(),
546 discussion_no: None,
547 });
548 }
549
550 let encryption = mz_sql_server_util::config::EncryptionLevel::Preferred;
552
553 let port = self.port.unwrap_or(1433_u16);
557 let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?;
558
559 ConnectionDetails::SqlServer(SqlServerConnectionDetails {
560 host: self
561 .host
562 .ok_or_else(|| sql_err!("HOST option is required"))?,
563 port,
564 database: self
565 .database
566 .ok_or_else(|| sql_err!("DATABASE option is required"))?,
567 user: self
568 .user
569 .ok_or_else(|| sql_err!("USER option is required"))?,
570 password: self
571 .password
572 .ok_or_else(|| sql_err!("PASSWORD option is required"))
573 .map(|pass| pass.into())?,
574 tunnel,
575 encryption,
576 })
577 }
578 };
579
580 Ok(connection)
581 }
582
583 pub fn get_brokers(
584 &self,
585 scx: &StatementContext,
586 ) -> Result<Vec<mz_storage_types::connections::KafkaBroker<ReferencedConnection>>, PlanError>
587 {
588 let mut brokers = match (&self.broker, &self.brokers, &self.aws_privatelink) {
589 (Some(v), None, None) => v.to_vec(),
590 (None, Some(v), None) => v.to_vec(),
591 (None, None, Some(_)) => vec![],
592 (None, None, None) => {
593 sql_bail!("invalid CONNECTION: must set one of BROKER, BROKERS, or AWS PRIVATELINK")
594 }
595 _ => sql_bail!(
596 "invalid CONNECTION: can only set one of BROKER, BROKERS, or AWS PRIVATELINK"
597 ),
598 };
599
600 let mut out = vec![];
604 for broker in &mut brokers {
605 if broker.address.contains(',') {
606 sql_bail!("invalid CONNECTION: cannot specify multiple Kafka broker addresses in one string.\n\n
607Instead, specify BROKERS using multiple strings, e.g. BROKERS ('kafka:9092', 'kafka:9093')");
608 }
609
610 let tunnel = match &broker.tunnel {
611 KafkaBrokerTunnel::Direct => Tunnel::Direct,
612 KafkaBrokerTunnel::AwsPrivatelink(aws_privatelink) => {
613 let KafkaBrokerAwsPrivatelinkOptionExtracted {
614 availability_zone,
615 port,
616 seen: _,
617 } = KafkaBrokerAwsPrivatelinkOptionExtracted::try_from(
618 aws_privatelink.options.clone(),
619 )?;
620
621 let id = match &aws_privatelink.connection {
622 ResolvedItemName::Item { id, .. } => id,
623 _ => sql_bail!(
624 "internal error: Kafka PrivateLink connection was not resolved"
625 ),
626 };
627 let entry = scx.catalog.get_item(id);
628 match entry.connection()? {
629 Connection::AwsPrivatelink(connection) => {
630 if let Some(az) = &availability_zone {
631 if !connection.availability_zones.contains(az) {
632 sql_bail!(
633 "AWS PrivateLink availability zone {} does not match any of the \
634 availability zones on the AWS PrivateLink connection {}",
635 az.quoted(),
636 scx.catalog
637 .resolve_full_name(entry.name())
638 .to_string()
639 .quoted()
640 )
641 }
642 }
643 Tunnel::AwsPrivatelink(AwsPrivatelink {
644 connection_id: *id,
645 availability_zone,
646 port,
647 })
648 }
649 _ => {
650 sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item)
651 }
652 }
653 }
654 KafkaBrokerTunnel::SshTunnel(ssh) => {
655 let id = match &ssh {
656 ResolvedItemName::Item { id, .. } => id,
657 _ => sql_bail!(
658 "internal error: Kafka SSH tunnel connection was not resolved"
659 ),
660 };
661 let ssh_tunnel = scx.catalog.get_item(id);
662 match ssh_tunnel.connection()? {
663 Connection::Ssh(_connection) => Tunnel::Ssh(SshTunnel {
664 connection_id: *id,
665 connection: *id,
666 }),
667 _ => {
668 sql_bail!("{} is not an SSH connection", ssh_tunnel.name().item)
669 }
670 }
671 }
672 };
673
674 out.push(mz_storage_types::connections::KafkaBroker {
675 address: broker.address.clone(),
676 tunnel,
677 });
678 }
679
680 Ok(out)
681 }
682}
683
684fn get_aws_connection_reference(
685 scx: &StatementContext,
686 conn_options: &ConnectionOptionExtracted,
687) -> Result<Option<AwsConnectionReference<ReferencedConnection>>, PlanError> {
688 let Some(aws_connection_id) = conn_options.aws_connection else {
689 return Ok(None);
690 };
691
692 let id = CatalogItemId::from(aws_connection_id);
693 let item = scx.catalog.get_item(&id);
694 Ok(match item.connection()? {
695 Connection::Aws(_) => Some(AwsConnectionReference {
696 connection_id: id,
697 connection: id,
698 }),
699 _ => sql_bail!("{} is not an AWS connection", item.name().item),
700 })
701}
702
703fn plan_kafka_security(
704 scx: &StatementContext,
705 v: &ConnectionOptionExtracted,
706) -> Result<
707 (
708 Option<KafkaTlsConfig>,
709 Option<KafkaSaslConfig<ReferencedConnection>>,
710 ),
711 PlanError,
712> {
713 const SASL_CONFIGS: [ConnectionOptionName; 4] = [
714 ConnectionOptionName::AwsConnection,
715 ConnectionOptionName::SaslMechanisms,
716 ConnectionOptionName::SaslUsername,
717 ConnectionOptionName::SaslPassword,
718 ];
719
720 const ALL_CONFIGS: [ConnectionOptionName; 7] = concat_arrays!(
721 [
722 ConnectionOptionName::SslKey,
723 ConnectionOptionName::SslCertificate,
724 ConnectionOptionName::SslCertificateAuthority,
725 ],
726 SASL_CONFIGS
727 );
728
729 enum SecurityProtocol {
730 Plaintext,
731 Ssl,
732 SaslPlaintext,
733 SaslSsl,
734 }
735
736 let security_protocol = v.security_protocol.as_ref().map(|v| v.to_uppercase());
737 let security_protocol = match security_protocol.as_deref() {
738 Some("PLAINTEXT") => SecurityProtocol::Plaintext,
739 Some("SSL") => SecurityProtocol::Ssl,
740 Some("SASL_PLAINTEXT") => SecurityProtocol::SaslPlaintext,
741 Some("SASL_SSL") => SecurityProtocol::SaslSsl,
742 Some(p) => sql_bail!("unknown security protocol: {}", p),
743 None if SASL_CONFIGS.iter().any(|c| v.seen.contains(c)) => SecurityProtocol::SaslSsl,
748 None => SecurityProtocol::Ssl,
749 };
750
751 let mut outstanding = ALL_CONFIGS
752 .into_iter()
753 .filter(|c| v.seen.contains(c))
754 .collect::<BTreeSet<ConnectionOptionName>>();
755
756 let tls = match security_protocol {
757 SecurityProtocol::Ssl | SecurityProtocol::SaslSsl => {
758 outstanding.remove(&ConnectionOptionName::SslCertificate);
759 let identity = match &v.ssl_certificate {
760 None => None,
761 Some(cert) => {
762 outstanding.remove(&ConnectionOptionName::SslKey);
763 let Some(key) = &v.ssl_key else {
764 sql_bail!("SSL KEY must be specified with SSL CERTIFICATE");
765 };
766 Some(TlsIdentity {
767 cert: cert.clone(),
768 key: (*key).into(),
769 })
770 }
771 };
772 outstanding.remove(&ConnectionOptionName::SslCertificateAuthority);
773 Some(KafkaTlsConfig {
774 identity,
775 root_cert: v.ssl_certificate_authority.clone(),
776 })
777 }
778 _ => None,
779 };
780
781 let sasl = match security_protocol {
782 SecurityProtocol::SaslPlaintext | SecurityProtocol::SaslSsl => {
783 outstanding.remove(&ConnectionOptionName::AwsConnection);
784 match get_aws_connection_reference(scx, v)? {
785 Some(aws) => {
786 scx.require_feature_flag(&ENABLE_AWS_MSK_IAM_AUTH)?;
787 Some(KafkaSaslConfig {
788 mechanism: "OAUTHBEARER".into(),
789 username: "".into(),
790 password: None,
791 aws: Some(aws),
792 })
793 }
794 None => {
795 outstanding.remove(&ConnectionOptionName::SaslMechanisms);
796 outstanding.remove(&ConnectionOptionName::SaslUsername);
797 outstanding.remove(&ConnectionOptionName::SaslPassword);
798 let Some(mechanism) = &v.sasl_mechanisms else {
801 sql_bail!("SASL MECHANISMS must be specified");
802 };
803 let Some(username) = &v.sasl_username else {
804 sql_bail!("SASL USERNAME must be specified");
805 };
806 let Some(password) = &v.sasl_password else {
807 sql_bail!("SASL PASSWORD must be specified");
808 };
809 Some(KafkaSaslConfig {
810 mechanism: mechanism.to_uppercase(),
821 username: username.clone(),
822 password: Some((*password).into()),
823 aws: None,
824 })
825 }
826 }
827 }
828 _ => None,
829 };
830
831 if let Some(outstanding) = outstanding.first() {
832 sql_bail!("option {outstanding} not supported with this configuration");
833 }
834
835 Ok((tls, sasl))
836}