1use std::collections::{BTreeMap, BTreeSet};
13
14use anyhow::Context;
15use array_concat::concat_arrays;
16use itertools::Itertools;
17use maplit::btreemap;
18use mz_ore::num::NonNeg;
19use mz_ore::str::StrExt;
20use mz_repr::CatalogItemId;
21use mz_sql_parser::ast::ConnectionOptionName::*;
22use mz_sql_parser::ast::display::AstDisplay;
23use mz_sql_parser::ast::{
24 ConnectionDefaultAwsPrivatelink, ConnectionOption, ConnectionOptionName, CreateConnectionType,
25 KafkaBroker, KafkaBrokerAwsPrivatelinkOption, KafkaBrokerAwsPrivatelinkOptionName,
26 KafkaBrokerTunnel, KafkaMatchingBrokerRule,
27};
28use mz_ssh_util::keys::SshKeyPair;
29use mz_storage_types::connections::aws::{
30 AwsAssumeRole, AwsAuth, AwsConnection, AwsConnectionReference, AwsCredentials,
31};
32use mz_storage_types::connections::inline::ReferencedConnection;
33use mz_storage_types::connections::string_or_secret::StringOrSecret;
34use mz_storage_types::connections::{
35 AwsPrivatelink, AwsPrivatelinkConnection, AwsPrivatelinkRule, CsrConnection,
36 CsrConnectionHttpAuth, IcebergCatalogConnection, IcebergCatalogImpl, IcebergCatalogType,
37 KafkaConnection, KafkaSaslConfig, KafkaTlsConfig, KafkaTopicOptions, MySqlConnection,
38 MySqlSslMode, PostgresConnection, RestIcebergCatalog, S3TablesRestIcebergCatalog,
39 SqlServerConnectionDetails, SshConnection, SshTunnel, TlsIdentity, Tunnel,
40};
41
42use crate::names::Aug;
43use crate::plan::statement::{Connection, ResolvedItemName};
44use crate::plan::with_options::{self};
45use crate::plan::{ConnectionDetails, PlanError, SshKey, StatementContext};
46use crate::session::vars::{self, ENABLE_AWS_MSK_IAM_AUTH};
47
48generate_extracted_config!(
49 ConnectionOption,
50 (AccessKeyId, StringOrSecret),
51 (AssumeRoleArn, String),
52 (AssumeRoleSessionName, String),
53 (AvailabilityZones, Vec<String>),
54 (AwsConnection, with_options::Object),
55 (AwsPrivatelink, ConnectionDefaultAwsPrivatelink<Aug>),
56 (Broker, Vec<KafkaBroker<Aug>>),
57 (Brokers, with_options::BrokersList),
58 (Credential, StringOrSecret),
59 (Database, String),
60 (Endpoint, String),
61 (Host, String),
62 (Password, with_options::Secret),
63 (Port, u16),
64 (ProgressTopic, String),
65 (ProgressTopicReplicationFactor, i32),
66 (PublicKey1, String),
67 (PublicKey2, String),
68 (Region, String),
69 (SaslMechanisms, String),
70 (SaslPassword, with_options::Secret),
71 (SaslUsername, StringOrSecret),
72 (Scope, String),
73 (SecretAccessKey, with_options::Secret),
74 (SecurityProtocol, String),
75 (ServiceName, String),
76 (SshTunnel, with_options::Object),
77 (SslCertificate, StringOrSecret),
78 (SslCertificateAuthority, StringOrSecret),
79 (SslKey, with_options::Secret),
80 (SslMode, String),
81 (SessionToken, StringOrSecret),
82 (CatalogType, IcebergCatalogType),
83 (Url, String),
84 (User, StringOrSecret),
85 (Warehouse, String)
86);
87
88generate_extracted_config!(
89 KafkaBrokerAwsPrivatelinkOption,
90 (AvailabilityZone, String),
91 (Port, u16)
92);
93
94pub(crate) const INALTERABLE_OPTIONS: &[ConnectionOptionName] =
96 &[ProgressTopic, ProgressTopicReplicationFactor];
97
98pub(crate) const MUTUALLY_EXCLUSIVE_SETS: &[&[ConnectionOptionName]] = &[&[Broker, Brokers]];
100
101pub(super) fn validate_options_per_connection_type(
102 t: CreateConnectionType,
103 mut options: BTreeSet<ConnectionOptionName>,
104) -> Result<(), PlanError> {
105 use mz_sql_parser::ast::ConnectionOptionName::*;
106 let permitted_options = match t {
107 CreateConnectionType::Aws => [
108 AccessKeyId,
109 SecretAccessKey,
110 SessionToken,
111 Endpoint,
112 Region,
113 AssumeRoleArn,
114 AssumeRoleSessionName,
115 ]
116 .as_slice(),
117 CreateConnectionType::AwsPrivatelink => &[AvailabilityZones, Port, ServiceName],
118 CreateConnectionType::Csr => &[
119 AwsPrivatelink,
120 Password,
121 Port,
122 SshTunnel,
123 SslCertificate,
124 SslCertificateAuthority,
125 SslKey,
126 Url,
127 User,
128 ],
129 CreateConnectionType::Kafka => &[
130 AwsConnection,
131 Broker,
132 Brokers,
133 ProgressTopic,
134 ProgressTopicReplicationFactor,
135 AwsPrivatelink,
136 SshTunnel,
137 SslKey,
138 SslCertificate,
139 SslCertificateAuthority,
140 SaslMechanisms,
141 SaslUsername,
142 SaslPassword,
143 SecurityProtocol,
144 ],
145 CreateConnectionType::Postgres => &[
146 AwsPrivatelink,
147 Database,
148 Host,
149 Password,
150 Port,
151 SshTunnel,
152 SslCertificate,
153 SslCertificateAuthority,
154 SslKey,
155 SslMode,
156 User,
157 ],
158 CreateConnectionType::Ssh => &[Host, Port, User, PublicKey1, PublicKey2],
159 CreateConnectionType::MySql => &[
160 AwsPrivatelink,
161 Host,
162 Password,
163 Port,
164 SshTunnel,
165 SslCertificate,
166 SslCertificateAuthority,
167 SslKey,
168 SslMode,
169 User,
170 AwsConnection,
171 ],
172 CreateConnectionType::SqlServer => &[
173 AwsPrivatelink,
174 Database,
175 Host,
176 Password,
177 Port,
178 SshTunnel,
179 SslCertificate,
180 SslCertificateAuthority,
181 SslKey,
182 SslMode,
183 User,
184 ],
185 CreateConnectionType::IcebergCatalog => &[
186 AwsConnection,
187 CatalogType,
188 Credential,
189 Scope,
190 Url,
191 Warehouse,
192 ],
193 };
194
195 for o in permitted_options {
196 options.remove(o);
197 }
198
199 if !options.is_empty() {
200 sql_bail!(
201 "{} connections do not support {} values",
202 t,
203 options.iter().join(", ")
204 )
205 }
206
207 Ok(())
208}
209
210impl ConnectionOptionExtracted {
211 pub(super) fn ensure_only_valid_options(
212 &self,
213 t: CreateConnectionType,
214 ) -> Result<(), PlanError> {
215 validate_options_per_connection_type(t, self.seen.clone())
216 }
217
218 pub fn try_into_connection_details(
219 self,
220 scx: &StatementContext,
221 connection_type: CreateConnectionType,
222 ) -> Result<ConnectionDetails, PlanError> {
223 self.ensure_only_valid_options(connection_type)?;
224
225 let connection: ConnectionDetails = match connection_type {
226 CreateConnectionType::Aws => {
227 let credentials = match (
228 self.access_key_id,
229 self.secret_access_key,
230 self.session_token,
231 ) {
232 (Some(access_key_id), Some(secret_access_key), session_token) => {
233 Some(AwsCredentials {
234 access_key_id,
235 secret_access_key: secret_access_key.into(),
236 session_token,
237 })
238 }
239 (None, None, None) => None,
240 _ => {
241 sql_bail!(
242 "must specify both ACCESS KEY ID and SECRET ACCESS KEY with optional SESSION TOKEN"
243 );
244 }
245 };
246
247 let assume_role = match (self.assume_role_arn, self.assume_role_session_name) {
248 (Some(arn), session_name) => Some(AwsAssumeRole { arn, session_name }),
249 (None, Some(_)) => {
250 sql_bail!(
251 "must specify ASSUME ROLE ARN with optional ASSUME ROLE SESSION NAME"
252 );
253 }
254 _ => None,
255 };
256
257 let auth = match (credentials, assume_role) {
258 (None, None) => sql_bail!(
259 "must specify either ASSUME ROLE ARN or ACCESS KEY ID and SECRET ACCESS KEY"
260 ),
261 (Some(credentials), None) => AwsAuth::Credentials(credentials),
262 (None, Some(assume_role)) => AwsAuth::AssumeRole(assume_role),
263 (Some(_), Some(_)) => {
264 sql_bail!("cannot specify both ACCESS KEY ID and ASSUME ROLE ARN");
265 }
266 };
267
268 ConnectionDetails::Aws(AwsConnection {
269 auth,
270 endpoint: match self.endpoint {
271 Some(endpoint) if !endpoint.is_empty() => Some(endpoint),
276 _ => None,
277 },
278 region: self.region,
279 })
280 }
281 CreateConnectionType::AwsPrivatelink => {
282 let connection = AwsPrivatelinkConnection {
283 service_name: self
284 .service_name
285 .ok_or_else(|| sql_err!("SERVICE NAME option is required"))?,
286 availability_zones: self
287 .availability_zones
288 .ok_or_else(|| sql_err!("AVAILABILITY ZONES option is required"))?,
289 };
290 if let Some(supported_azs) = scx.catalog.aws_privatelink_availability_zones() {
291 let mut unique_azs: BTreeSet<String> = BTreeSet::new();
292 let mut duplicate_azs: BTreeSet<String> = BTreeSet::new();
293 for connection_az in &connection.availability_zones {
295 if unique_azs.contains(connection_az) {
296 duplicate_azs.insert(connection_az.to_string());
297 } else {
298 unique_azs.insert(connection_az.to_string());
299 }
300 if !supported_azs.contains(connection_az) {
301 return Err(PlanError::InvalidPrivatelinkAvailabilityZone {
302 name: connection_az.to_string(),
303 supported_azs,
304 });
305 }
306 }
307 if duplicate_azs.len() > 0 {
308 return Err(PlanError::DuplicatePrivatelinkAvailabilityZone {
309 duplicate_azs,
310 });
311 }
312 }
313 ConnectionDetails::AwsPrivatelink(connection)
314 }
315 CreateConnectionType::Kafka => {
316 let (tls, sasl) = plan_kafka_security(scx, &self)?;
317 let (static_brokers, matching_rules) = self.get_brokers_and_rules(scx)?;
318
319 if !matching_rules.is_empty() {
320 scx.require_feature_flag(&vars::ENABLE_KAFKA_BROKER_MATCHING_RULES)?;
321 }
322
323 ConnectionDetails::Kafka(KafkaConnection {
324 brokers: static_brokers,
325 default_tunnel: build_tunnel_definition(
326 scx,
327 self.ssh_tunnel,
328 self.aws_privatelink,
329 if matching_rules.is_empty() { None } else { Some(matching_rules) },
330 )?,
331 progress_topic: self.progress_topic,
332 progress_topic_options: KafkaTopicOptions {
333 partition_count: Some(NonNeg::try_from(1).expect("1 is positive")),
337 replication_factor: self.progress_topic_replication_factor.map(|val| {
338 if val <= 0 {
339 Err(sql_err!("invalid CONNECTION: PROGRESS TOPIC REPLICATION FACTOR must be greater than 0"))?
340 }
341 NonNeg::try_from(val).map_err(|e| sql_err!("{e}"))
342 }).transpose()?,
343 topic_config: btreemap! {
344 "cleanup.policy".to_string() => "compact".to_string(),
345 "segment.bytes".to_string() => "134217728".to_string(), },
347 },
348 options: BTreeMap::new(),
349 tls,
350 sasl,
351 })
352 }
353 CreateConnectionType::Csr => {
354 let url: reqwest::Url = match self.url {
355 Some(url) => url
356 .parse()
357 .map_err(|e| sql_err!("parsing schema registry url: {e}"))?,
358 None => sql_bail!("invalid CONNECTION: must specify URL"),
359 };
360 let _ = url
361 .host_str()
362 .ok_or_else(|| sql_err!("invalid CONNECTION: URL must specify domain name"))?;
363 if url.path() != "/" {
364 sql_bail!("invalid CONNECTION: URL must have an empty path");
365 }
366 let cert = self.ssl_certificate;
367 let key = self.ssl_key.map(|secret| secret.into());
368 let tls_identity = match (cert, key) {
369 (None, None) => None,
370 (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
371 _ => sql_bail!(
372 "invalid CONNECTION: reading from SSL-auth Confluent Schema Registry requires both SSL KEY and SSL CERTIFICATE"
373 ),
374 };
375 let http_auth = self.user.map(|username| CsrConnectionHttpAuth {
376 username,
377 password: self.password.map(|secret| secret.into()),
378 });
379
380 if let Some(privatelink) = self.aws_privatelink.as_ref() {
382 if privatelink.port.is_some() {
383 sql_bail!(
384 "invalid CONNECTION: CONFLUENT SCHEMA REGISTRY does not support PORT for AWS PRIVATELINK"
385 )
386 }
387 }
388 let tunnel = build_tunnel_definition(
389 scx,
390 self.ssh_tunnel,
391 self.aws_privatelink,
392 None, )?;
394
395 ConnectionDetails::Csr(CsrConnection {
396 url,
397 tls_root_cert: self.ssl_certificate_authority,
398 tls_identity,
399 http_auth,
400 tunnel,
401 })
402 }
403 CreateConnectionType::Postgres => {
404 let cert = self.ssl_certificate;
405 let key = self.ssl_key.map(|secret| secret.into());
406 let tls_identity = match (cert, key) {
407 (None, None) => None,
408 (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
409 _ => sql_bail!(
410 "invalid CONNECTION: both SSL KEY and SSL CERTIFICATE are required"
411 ),
412 };
413 let tls_mode = match self.ssl_mode.as_ref().map(|m| m.as_str()) {
414 None | Some("disable") => tokio_postgres::config::SslMode::Disable,
415 Some("require") | Some("required") => tokio_postgres::config::SslMode::Require,
418 Some("verify_ca") | Some("verify-ca") => {
419 tokio_postgres::config::SslMode::VerifyCa
420 }
421 Some("verify_full") | Some("verify-full") => {
422 tokio_postgres::config::SslMode::VerifyFull
423 }
424 Some(m) => sql_bail!("invalid CONNECTION: unknown SSL MODE {}", m.quoted()),
425 };
426
427 if let Some(privatelink) = self.aws_privatelink.as_ref() {
429 if privatelink.port.is_some() {
430 sql_bail!(
431 "invalid CONNECTION: POSTGRES does not support PORT for AWS PRIVATELINK"
432 )
433 }
434 }
435 let tunnel = build_tunnel_definition(
436 scx,
437 self.ssh_tunnel,
438 self.aws_privatelink,
439 None, )?;
441
442 ConnectionDetails::Postgres(PostgresConnection {
443 database: self
444 .database
445 .ok_or_else(|| sql_err!("DATABASE option is required"))?,
446 password: self.password.map(|password| password.into()),
447 host: self
448 .host
449 .ok_or_else(|| sql_err!("HOST option is required"))?,
450 port: self.port.unwrap_or(5432_u16),
451 tunnel,
452 tls_mode,
453 tls_root_cert: self.ssl_certificate_authority,
454 tls_identity,
455 user: self
456 .user
457 .ok_or_else(|| sql_err!("USER option is required"))?,
458 })
459 }
460 CreateConnectionType::Ssh => {
461 let ensure_key = |public_key| match public_key {
462 Some(public_key) => Ok::<_, anyhow::Error>(SshKey::PublicOnly(public_key)),
463 None => {
464 let key = SshKeyPair::new().context("creating SSH key")?;
465 Ok(SshKey::Both(key))
466 }
467 };
468 ConnectionDetails::Ssh {
469 connection: SshConnection {
470 host: self
471 .host
472 .ok_or_else(|| sql_err!("HOST option is required"))?,
473 port: self.port.unwrap_or(22_u16),
474 user: match self
475 .user
476 .ok_or_else(|| sql_err!("USER option is required"))?
477 {
478 StringOrSecret::String(user) => user,
479 StringOrSecret::Secret(_) => {
480 sql_bail!(
481 "SSH connections do not support supplying USER value as SECRET"
482 )
483 }
484 },
485 },
486 key_1: ensure_key(self.public_key1)?,
487 key_2: ensure_key(self.public_key2)?,
488 }
489 }
490 CreateConnectionType::MySql => {
491 let aws_connection = get_aws_connection_reference(scx, &self)?;
492 if aws_connection.is_some() && self.password.is_some() {
493 sql_bail!(
494 "invalid CONNECTION: AWS IAM authentication is not supported with password"
495 );
496 }
497
498 let cert = self.ssl_certificate;
499 let key = self.ssl_key.map(|secret| secret.into());
500 let tls_identity = match (cert, key) {
501 (None, None) => None,
502 (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
503 _ => sql_bail!(
504 "invalid CONNECTION: both SSL KEY and SSL CERTIFICATE are required"
505 ),
506 };
507 let tls_mode = match self
510 .ssl_mode
511 .map(|f| f.to_uppercase())
512 .as_ref()
513 .map(|m| m.as_str())
514 {
515 None | Some("DISABLED") => {
516 if aws_connection.is_some() {
517 sql_bail!(
518 "invalid CONNECTION: AWS IAM authentication requires SSL to be enabled"
519 )
520 }
521 MySqlSslMode::Disabled
522 }
523 Some("REQUIRED") | Some("REQUIRE") => MySqlSslMode::Required,
526 Some("VERIFY_CA") | Some("VERIFY-CA") => MySqlSslMode::VerifyCa,
527 Some("VERIFY_IDENTITY") | Some("VERIFY-IDENTITY") => {
528 MySqlSslMode::VerifyIdentity
529 }
530 Some(m) => sql_bail!("invalid CONNECTION: unknown SSL MODE {}", m.quoted()),
531 };
532
533 if let Some(privatelink) = self.aws_privatelink.as_ref() {
535 if privatelink.port.is_some() {
536 sql_bail!(
537 "invalid CONNECTION: MYSQL does not support PORT for AWS PRIVATELINK"
538 )
539 }
540 }
541 let tunnel = build_tunnel_definition(
542 scx,
543 self.ssh_tunnel,
544 self.aws_privatelink,
545 None, )?;
547
548 ConnectionDetails::MySql(MySqlConnection {
549 password: self.password.map(|password| password.into()),
550 host: self
551 .host
552 .ok_or_else(|| sql_err!("HOST option is required"))?,
553 port: self.port.unwrap_or(3306_u16),
554 tunnel,
555 tls_mode,
556 tls_root_cert: self.ssl_certificate_authority,
557 tls_identity,
558 user: self
559 .user
560 .ok_or_else(|| sql_err!("USER option is required"))?,
561 aws_connection,
562 })
563 }
564 CreateConnectionType::SqlServer => {
565 scx.require_feature_flag(&vars::ENABLE_SQL_SERVER_SOURCE)?;
566
567 let aws_connection = get_aws_connection_reference(scx, &self)?;
568 if aws_connection.is_some() && self.password.is_some() {
569 sql_bail!(
570 "invalid CONNECTION: AWS IAM authentication is not supported with password"
571 );
572 }
573
574 let (encryption, certificate_validation_policy) = match self
575 .ssl_mode
576 .map(|mode| mode.to_uppercase())
577 .as_ref()
578 .map(|mode| mode.as_str())
579 {
580 None | Some("DISABLED") => (
581 mz_sql_server_util::config::EncryptionLevel::None,
582 mz_sql_server_util::config::CertificateValidationPolicy::TrustAll,
583 ),
584 Some("REQUIRED") => (
585 mz_sql_server_util::config::EncryptionLevel::Required,
586 mz_sql_server_util::config::CertificateValidationPolicy::TrustAll,
587 ),
588 Some("VERIFY") => (
589 mz_sql_server_util::config::EncryptionLevel::Required,
590 mz_sql_server_util::config::CertificateValidationPolicy::VerifySystem,
591 ),
592 Some("VERIFY_CA") => {
593 if self.ssl_certificate_authority.is_none() {
594 sql_bail!(
595 "invalid CONNECTION: SSL MODE 'verify_ca' requires SSL CERTIFICATE AUTHORITY"
596 );
597 }
598 (
599 mz_sql_server_util::config::EncryptionLevel::Required,
600 mz_sql_server_util::config::CertificateValidationPolicy::VerifyCA,
601 )
602 }
603 Some(mode) => {
604 sql_bail!("invalid CONNECTION: unknown SSL MODE {}", mode.quoted())
605 }
606 };
607
608 if let Some(privatelink) = self.aws_privatelink.as_ref() {
609 if privatelink.port.is_some() {
610 sql_bail!(
611 "invalid CONNECTION: SQL SERVER does not support PORT for AWS PRIVATELINK"
612 )
613 }
614 }
615
616 let port = self.port.unwrap_or(1433_u16);
620 let tunnel = build_tunnel_definition(
621 scx,
622 self.ssh_tunnel,
623 self.aws_privatelink,
624 None, )?;
626
627 ConnectionDetails::SqlServer(SqlServerConnectionDetails {
628 host: self
629 .host
630 .ok_or_else(|| sql_err!("HOST option is required"))?,
631 port,
632 database: self
633 .database
634 .ok_or_else(|| sql_err!("DATABASE option is required"))?,
635 user: self
636 .user
637 .ok_or_else(|| sql_err!("USER option is required"))?,
638 password: self
639 .password
640 .ok_or_else(|| sql_err!("PASSWORD option is required"))
641 .map(|pass| pass.into())?,
642 tunnel,
643 encryption,
644 certificate_validation_policy,
645 tls_root_cert: self.ssl_certificate_authority,
646 })
647 }
648 CreateConnectionType::IcebergCatalog => {
649 let catalog_type = self.catalog_type.clone().ok_or_else(|| {
650 sql_err!("invalid CONNECTION: ICEBERG connections must specify CATALOG TYPE")
651 })?;
652
653 let uri: reqwest::Url = match &self.url {
654 Some(url) => url
655 .parse()
656 .map_err(|e| sql_err!("parsing Iceberg catalog url: {e}"))?,
657 None => sql_bail!("invalid CONNECTION: must specify URL"),
658 };
659
660 let warehouse = self.warehouse.clone();
661 let credential = self.credential.clone();
662 let aws_connection = get_aws_connection_reference(scx, &self)?;
663
664 let catalog = match catalog_type {
665 IcebergCatalogType::S3TablesRest => {
666 let Some(warehouse) = warehouse else {
667 sql_bail!(
668 "invalid CONNECTION: ICEBERG s3tablesrest connections must specify WAREHOUSE"
669 );
670 };
671 let Some(aws_connection) = aws_connection else {
672 sql_bail!(
673 "invalid CONNECTION: ICEBERG s3tablesrest connections require an AWS connection"
674 );
675 };
676
677 IcebergCatalogImpl::S3TablesRest(S3TablesRestIcebergCatalog {
678 aws_connection,
679 warehouse,
680 })
681 }
682 IcebergCatalogType::Rest => {
683 let Some(credential) = credential else {
684 sql_bail!(
685 "invalid CONNECTION: ICEBERG rest connections require a CREDENTIAL"
686 );
687 };
688
689 IcebergCatalogImpl::Rest(RestIcebergCatalog {
690 credential,
691 scope: self.scope.clone(),
692 warehouse,
693 })
694 }
695 };
696
697 ConnectionDetails::IcebergCatalog(IcebergCatalogConnection { catalog, uri })
698 }
699 };
700
701 Ok(connection)
702 }
703
704 pub fn get_brokers_and_rules(
705 &self,
706 scx: &StatementContext,
707 ) -> Result<
708 (
709 Vec<mz_storage_types::connections::KafkaBroker<ReferencedConnection>>,
710 Vec<KafkaMatchingBrokerRule<Aug>>,
711 ),
712 PlanError,
713 > {
714 let mut all_brokers: Vec<KafkaBroker<Aug>> = vec![];
716 let mut matching_rules: Vec<KafkaMatchingBrokerRule<Aug>> = vec![];
717
718 match (&self.broker, &self.brokers, &self.aws_privatelink) {
720 (Some(broker), None, None) => all_brokers.extend(broker.iter().cloned()),
722 (None, Some(broker_list), None) => {
724 all_brokers.extend(broker_list.static_entries.iter().cloned());
725 matching_rules.extend(broker_list.matching_rules.iter().cloned());
726 }
727 (None, None, Some(_privatelink)) => {
729 }
731 (None, None, None) => {
733 sql_bail!("invalid CONNECTION: must set one of BROKER, BROKERS, or AWS PRIVATELINK")
734 }
735 _ => sql_bail!(
737 "invalid CONNECTION: can only set one of BROKER, BROKERS, or AWS PRIVATELINK"
738 ),
739 };
740
741 if !matching_rules.is_empty() && all_brokers.is_empty() {
743 sql_bail!(
744 "invalid CONNECTION: BROKERS must contain at least one static broker address"
745 );
746 }
747
748 let mut out = vec![];
751 for broker in &all_brokers {
752 if broker.address.contains(',') {
753 sql_bail!(
754 "invalid CONNECTION: cannot specify multiple Kafka broker addresses in one string.\n\nInstead, specify BROKERS using multiple strings, e.g. BROKERS ('kafka:9092', 'kafka:9093')"
755 );
756 }
757
758 let tunnel = match &broker.tunnel {
759 KafkaBrokerTunnel::Direct => Tunnel::Direct,
760 KafkaBrokerTunnel::AwsPrivatelink(aws_privatelink) => {
761 Tunnel::AwsPrivatelink(plan_privatelink(scx, aws_privatelink)?)
762 }
763 KafkaBrokerTunnel::SshTunnel(ssh) => {
764 let id = match &ssh {
765 ResolvedItemName::Item { id, .. } => id,
766 _ => sql_bail!(
767 "internal error: Kafka SSH tunnel connection was not resolved"
768 ),
769 };
770 let ssh_tunnel = scx.catalog.get_item(id);
771 match ssh_tunnel.connection()? {
772 Connection::Ssh(_connection) => Tunnel::Ssh(SshTunnel {
773 connection_id: *id,
774 connection: *id,
775 }),
776 _ => {
777 sql_bail!("{} is not an SSH connection", ssh_tunnel.name().item)
778 }
779 }
780 }
781 };
782
783 out.push(mz_storage_types::connections::KafkaBroker {
784 address: broker.address.clone(),
785 tunnel,
786 });
787 }
788
789 Ok((out, matching_rules))
790 }
791}
792
793fn get_aws_connection_reference(
794 scx: &StatementContext,
795 conn_options: &ConnectionOptionExtracted,
796) -> Result<Option<AwsConnectionReference<ReferencedConnection>>, PlanError> {
797 let Some(aws_connection_id) = conn_options.aws_connection else {
798 return Ok(None);
799 };
800
801 let id = CatalogItemId::from(aws_connection_id);
802 let item = scx.catalog.get_item(&id);
803 Ok(match item.connection()? {
804 Connection::Aws(_) => Some(AwsConnectionReference {
805 connection_id: id,
806 connection: id,
807 }),
808 _ => sql_bail!("{} is not an AWS connection", item.name().item),
809 })
810}
811
812fn plan_kafka_security(
813 scx: &StatementContext,
814 v: &ConnectionOptionExtracted,
815) -> Result<
816 (
817 Option<KafkaTlsConfig>,
818 Option<KafkaSaslConfig<ReferencedConnection>>,
819 ),
820 PlanError,
821> {
822 const SASL_CONFIGS: [ConnectionOptionName; 4] = [
823 ConnectionOptionName::AwsConnection,
824 ConnectionOptionName::SaslMechanisms,
825 ConnectionOptionName::SaslUsername,
826 ConnectionOptionName::SaslPassword,
827 ];
828
829 const ALL_CONFIGS: [ConnectionOptionName; 7] = concat_arrays!(
830 [
831 ConnectionOptionName::SslKey,
832 ConnectionOptionName::SslCertificate,
833 ConnectionOptionName::SslCertificateAuthority,
834 ],
835 SASL_CONFIGS
836 );
837
838 enum SecurityProtocol {
839 Plaintext,
840 Ssl,
841 SaslPlaintext,
842 SaslSsl,
843 }
844
845 let security_protocol = v.security_protocol.as_ref().map(|v| v.to_uppercase());
846 let security_protocol = match security_protocol.as_deref() {
847 Some("PLAINTEXT") => SecurityProtocol::Plaintext,
848 Some("SSL") => SecurityProtocol::Ssl,
849 Some("SASL_PLAINTEXT") => SecurityProtocol::SaslPlaintext,
850 Some("SASL_SSL") => SecurityProtocol::SaslSsl,
851 Some(p) => sql_bail!("unknown security protocol: {}", p),
852 None if SASL_CONFIGS.iter().any(|c| v.seen.contains(c)) => SecurityProtocol::SaslSsl,
857 None => SecurityProtocol::Ssl,
858 };
859
860 let mut outstanding = ALL_CONFIGS
861 .into_iter()
862 .filter(|c| v.seen.contains(c))
863 .collect::<BTreeSet<ConnectionOptionName>>();
864
865 let tls = match security_protocol {
866 SecurityProtocol::Ssl | SecurityProtocol::SaslSsl => {
867 outstanding.remove(&ConnectionOptionName::SslCertificate);
868 let identity = match &v.ssl_certificate {
869 None => None,
870 Some(cert) => {
871 outstanding.remove(&ConnectionOptionName::SslKey);
872 let Some(key) = &v.ssl_key else {
873 sql_bail!("SSL KEY must be specified with SSL CERTIFICATE");
874 };
875 Some(TlsIdentity {
876 cert: cert.clone(),
877 key: (*key).into(),
878 })
879 }
880 };
881 outstanding.remove(&ConnectionOptionName::SslCertificateAuthority);
882 Some(KafkaTlsConfig {
883 identity,
884 root_cert: v.ssl_certificate_authority.clone(),
885 })
886 }
887 _ => None,
888 };
889
890 let sasl = match security_protocol {
891 SecurityProtocol::SaslPlaintext | SecurityProtocol::SaslSsl => {
892 outstanding.remove(&ConnectionOptionName::AwsConnection);
893 match get_aws_connection_reference(scx, v)? {
894 Some(aws) => {
895 scx.require_feature_flag(&ENABLE_AWS_MSK_IAM_AUTH)?;
896 Some(KafkaSaslConfig {
897 mechanism: "OAUTHBEARER".into(),
898 username: "".into(),
899 password: None,
900 aws: Some(aws),
901 })
902 }
903 None => {
904 outstanding.remove(&ConnectionOptionName::SaslMechanisms);
905 outstanding.remove(&ConnectionOptionName::SaslUsername);
906 outstanding.remove(&ConnectionOptionName::SaslPassword);
907 let Some(mechanism) = &v.sasl_mechanisms else {
910 sql_bail!("SASL MECHANISMS must be specified");
911 };
912 let Some(username) = &v.sasl_username else {
913 sql_bail!("SASL USERNAME must be specified");
914 };
915 let Some(password) = &v.sasl_password else {
916 sql_bail!("SASL PASSWORD must be specified");
917 };
918 Some(KafkaSaslConfig {
919 mechanism: mechanism.to_uppercase(),
930 username: username.clone(),
931 password: Some((*password).into()),
932 aws: None,
933 })
934 }
935 }
936 }
937 _ => None,
938 };
939
940 if let Some(outstanding) = outstanding.first() {
941 sql_bail!("option {outstanding} not supported with this configuration");
942 }
943
944 Ok((tls, sasl))
945}
946pub fn plan_default_privatelink(
947 scx: &StatementContext,
948 pl: &mz_sql_parser::ast::ConnectionDefaultAwsPrivatelink<Aug>,
949) -> Result<AwsPrivatelink, PlanError> {
950 let id = pl.connection.item_id().clone();
951 let entry = scx.catalog.get_item(&id);
952 match entry.connection()? {
953 Connection::AwsPrivatelink(_) => Ok(AwsPrivatelink {
954 connection_id: id,
955 availability_zone: None,
957 port: pl.port,
959 }),
960 _ => sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item),
961 }
962}
963
964pub fn plan_privatelink(
965 scx: &StatementContext,
966 pl: &mz_sql_parser::ast::KafkaBrokerAwsPrivatelink<Aug>,
967) -> Result<AwsPrivatelink, PlanError> {
968 let KafkaBrokerAwsPrivatelinkOptionExtracted {
969 availability_zone,
970 port,
971 seen: _,
972 } = KafkaBrokerAwsPrivatelinkOptionExtracted::try_from(pl.options.clone())?;
973
974 let id = match &pl.connection {
975 ResolvedItemName::Item { id, .. } => id,
976 _ => sql_bail!("internal error: Kafka PrivateLink connection was not resolved"),
977 };
978 let entry = scx.catalog.get_item(id);
979 match entry.connection()? {
980 Connection::AwsPrivatelink(connection) => {
981 if let Some(az) = &availability_zone {
982 if !connection.availability_zones.contains(az) {
983 sql_bail!(
984 "AWS PrivateLink availability zone {} does not match any of the \
985 availability zones on the AWS PrivateLink connection {}",
986 az.quoted(),
987 scx.catalog
988 .resolve_full_name(entry.name())
989 .to_string()
990 .quoted()
991 )
992 }
993 }
994 Ok(AwsPrivatelink {
995 connection_id: *id,
996 availability_zone,
997 port,
998 })
999 }
1000 _ => {
1001 sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item)
1002 }
1003 }
1004}
1005
1006pub(crate) fn build_tunnel_definition(
1007 scx: &StatementContext,
1008 ssh_tunnel: Option<with_options::Object>,
1009 aws_privatelink: Option<ConnectionDefaultAwsPrivatelink<Aug>>,
1010 matching_rules: Option<Vec<KafkaMatchingBrokerRule<Aug>>>,
1011) -> Result<Tunnel<ReferencedConnection>, PlanError> {
1012 Ok(match (ssh_tunnel, aws_privatelink, matching_rules) {
1013 (None, None, None) => Tunnel::Direct,
1014 (Some(ssh_tunnel), None, None) => {
1015 let id = CatalogItemId::from(ssh_tunnel);
1016 let ssh_tunnel = scx.catalog.get_item(&id);
1017 match ssh_tunnel.connection()? {
1018 Connection::Ssh(_connection) => Tunnel::Ssh(SshTunnel {
1019 connection_id: id,
1020 connection: id,
1021 }),
1022 _ => sql_bail!("{} is not an SSH connection", ssh_tunnel.name().item),
1023 }
1024 }
1025 (None, Some(aws_privatelink), None) => {
1026 Tunnel::AwsPrivatelink(plan_default_privatelink(scx, &aws_privatelink)?)
1027 }
1028 (None, None, Some(rules)) => {
1029 if rules.is_empty() {
1030 sql_bail!("BROKERS MATCHING rules list cannot be empty");
1031 }
1032
1033 let rules = rules
1034 .iter()
1035 .map(|rule| {
1036 Ok(AwsPrivatelinkRule {
1037 pattern: rule.pattern.clone(),
1038 to: plan_privatelink(scx, &rule.tunnel)?,
1039 })
1040 })
1041 .collect::<Result<Vec<_>, PlanError>>()?;
1042 Tunnel::AwsPrivatelinks(mz_storage_types::connections::AwsPrivatelinks { rules })
1043 }
1044 _ => {
1045 sql_bail!("cannot specify both SSH TUNNEL and AWS PRIVATELINK");
1046 }
1047 })
1048}