1use std::collections::{BTreeMap, BTreeSet};
13
14use anyhow::Context;
15use array_concat::concat_arrays;
16use itertools::Itertools;
17use maplit::btreemap;
18use mz_ore::num::NonNeg;
19use mz_ore::str::StrExt;
20use mz_repr::CatalogItemId;
21use mz_sql_parser::ast::ConnectionOptionName::*;
22use mz_sql_parser::ast::display::AstDisplay;
23use mz_sql_parser::ast::{
24 ConnectionDefaultAwsPrivatelink, ConnectionOption, ConnectionOptionName, CreateConnectionType,
25 KafkaBroker, KafkaBrokerAwsPrivatelinkOption, KafkaBrokerAwsPrivatelinkOptionName,
26 KafkaBrokerTunnel, KafkaMatchingBrokerRule,
27};
28use mz_ssh_util::keys::SshKeyPair;
29use mz_storage_types::connections::aws::{
30 AwsAssumeRole, AwsAuth, AwsConnection, AwsConnectionReference, AwsCredentials,
31};
32use mz_storage_types::connections::inline::ReferencedConnection;
33use mz_storage_types::connections::string_or_secret::StringOrSecret;
34use mz_storage_types::connections::{
35 AwsPrivatelink, AwsPrivatelinkConnection, AwsPrivatelinkRule, CsrConnection,
36 CsrConnectionHttpAuth, GlueSchemaRegistryConnection, IcebergCatalogConnection,
37 IcebergCatalogImpl, IcebergCatalogType, KafkaConnection, KafkaSaslConfig, KafkaTlsConfig,
38 KafkaTopicOptions, MySqlConnection, MySqlSslMode, PostgresConnection, RestIcebergCatalog,
39 S3TablesRestIcebergCatalog, SqlServerConnectionDetails, SshConnection, SshTunnel, TlsIdentity,
40 Tunnel,
41};
42
43use crate::names::Aug;
44use crate::plan::statement::{Connection, ResolvedItemName};
45use crate::plan::with_options::{self};
46use crate::plan::{ConnectionDetails, PlanError, SshKey, StatementContext};
47use crate::session::vars;
48
49generate_extracted_config!(
50 ConnectionOption,
51 (AccessKeyId, StringOrSecret),
52 (AssumeRoleArn, String),
53 (AssumeRoleSessionName, String),
54 (AvailabilityZones, Vec<String>),
55 (AwsConnection, with_options::Object),
56 (AwsPrivatelink, ConnectionDefaultAwsPrivatelink<Aug>),
57 (Broker, Vec<KafkaBroker<Aug>>),
58 (Brokers, with_options::BrokersList),
59 (Credential, StringOrSecret),
60 (Database, String),
61 (Endpoint, String),
62 (Host, String),
63 (Password, with_options::Secret),
64 (Port, u16),
65 (ProgressTopic, String),
66 (ProgressTopicReplicationFactor, i32),
67 (PublicKey1, String),
68 (PublicKey2, String),
69 (Region, String),
70 (Registry, String),
71 (SaslMechanisms, String),
72 (SaslPassword, with_options::Secret),
73 (SaslUsername, StringOrSecret),
74 (Scope, String),
75 (SecretAccessKey, with_options::Secret),
76 (SecurityProtocol, String),
77 (ServiceName, String),
78 (SshTunnel, with_options::Object),
79 (SslCertificate, StringOrSecret),
80 (SslCertificateAuthority, StringOrSecret),
81 (SslKey, with_options::Secret),
82 (SslMode, String),
83 (SessionToken, StringOrSecret),
84 (CatalogType, IcebergCatalogType),
85 (Url, String),
86 (User, StringOrSecret),
87 (Warehouse, String)
88);
89
90generate_extracted_config!(
91 KafkaBrokerAwsPrivatelinkOption,
92 (AvailabilityZone, String),
93 (Port, u16)
94);
95
96pub(crate) const INALTERABLE_OPTIONS: &[ConnectionOptionName] =
98 &[ProgressTopic, ProgressTopicReplicationFactor];
99
100pub(crate) const MUTUALLY_EXCLUSIVE_SETS: &[&[ConnectionOptionName]] = &[&[Broker, Brokers]];
102
103pub(super) fn validate_options_per_connection_type(
104 t: CreateConnectionType,
105 mut options: BTreeSet<ConnectionOptionName>,
106) -> Result<(), PlanError> {
107 use mz_sql_parser::ast::ConnectionOptionName::*;
108 let permitted_options = match t {
109 CreateConnectionType::Aws => [
110 AccessKeyId,
111 SecretAccessKey,
112 SessionToken,
113 Endpoint,
114 Region,
115 AssumeRoleArn,
116 AssumeRoleSessionName,
117 ]
118 .as_slice(),
119 CreateConnectionType::AwsPrivatelink => &[AvailabilityZones, Port, ServiceName],
120 CreateConnectionType::GlueSchemaRegistry => &[AwsConnection, Registry],
121 CreateConnectionType::Csr => &[
122 AwsPrivatelink,
123 Password,
124 Port,
125 SshTunnel,
126 SslCertificate,
127 SslCertificateAuthority,
128 SslKey,
129 Url,
130 User,
131 ],
132 CreateConnectionType::Kafka => &[
133 AwsConnection,
134 Broker,
135 Brokers,
136 ProgressTopic,
137 ProgressTopicReplicationFactor,
138 AwsPrivatelink,
139 SshTunnel,
140 SslKey,
141 SslCertificate,
142 SslCertificateAuthority,
143 SaslMechanisms,
144 SaslUsername,
145 SaslPassword,
146 SecurityProtocol,
147 ],
148 CreateConnectionType::Postgres => &[
149 AwsPrivatelink,
150 Database,
151 Host,
152 Password,
153 Port,
154 SshTunnel,
155 SslCertificate,
156 SslCertificateAuthority,
157 SslKey,
158 SslMode,
159 User,
160 ],
161 CreateConnectionType::Ssh => &[Host, Port, User, PublicKey1, PublicKey2],
162 CreateConnectionType::MySql => &[
163 AwsPrivatelink,
164 Host,
165 Password,
166 Port,
167 SshTunnel,
168 SslCertificate,
169 SslCertificateAuthority,
170 SslKey,
171 SslMode,
172 User,
173 AwsConnection,
174 ],
175 CreateConnectionType::SqlServer => &[
176 AwsPrivatelink,
177 Database,
178 Host,
179 Password,
180 Port,
181 SshTunnel,
182 SslCertificate,
183 SslCertificateAuthority,
184 SslKey,
185 SslMode,
186 User,
187 ],
188 CreateConnectionType::IcebergCatalog => &[
189 AwsConnection,
190 CatalogType,
191 Credential,
192 Scope,
193 Url,
194 Warehouse,
195 ],
196 };
197
198 for o in permitted_options {
199 options.remove(o);
200 }
201
202 if !options.is_empty() {
203 sql_bail!(
204 "{} connections do not support {} values",
205 t,
206 options.iter().join(", ")
207 )
208 }
209
210 Ok(())
211}
212
213impl ConnectionOptionExtracted {
214 pub(super) fn ensure_only_valid_options(
215 &self,
216 t: CreateConnectionType,
217 ) -> Result<(), PlanError> {
218 validate_options_per_connection_type(t, self.seen.clone())
219 }
220
221 pub fn try_into_connection_details(
222 self,
223 scx: &StatementContext,
224 connection_type: CreateConnectionType,
225 ) -> Result<ConnectionDetails, PlanError> {
226 self.ensure_only_valid_options(connection_type)?;
227
228 let connection: ConnectionDetails = match connection_type {
229 CreateConnectionType::Aws => {
230 let credentials = match (
231 self.access_key_id,
232 self.secret_access_key,
233 self.session_token,
234 ) {
235 (Some(access_key_id), Some(secret_access_key), session_token) => {
236 Some(AwsCredentials {
237 access_key_id,
238 secret_access_key: secret_access_key.into(),
239 session_token,
240 })
241 }
242 (None, None, None) => None,
243 _ => {
244 sql_bail!(
245 "must specify both ACCESS KEY ID and SECRET ACCESS KEY with optional SESSION TOKEN"
246 );
247 }
248 };
249
250 let assume_role = match (self.assume_role_arn, self.assume_role_session_name) {
251 (Some(arn), session_name) => Some(AwsAssumeRole { arn, session_name }),
252 (None, Some(_)) => {
253 sql_bail!(
254 "must specify ASSUME ROLE ARN with optional ASSUME ROLE SESSION NAME"
255 );
256 }
257 _ => None,
258 };
259
260 let auth = match (credentials, assume_role) {
261 (None, None) => sql_bail!(
262 "must specify either ASSUME ROLE ARN or ACCESS KEY ID and SECRET ACCESS KEY"
263 ),
264 (Some(credentials), None) => AwsAuth::Credentials(credentials),
265 (None, Some(assume_role)) => AwsAuth::AssumeRole(assume_role),
266 (Some(_), Some(_)) => {
267 sql_bail!("cannot specify both ACCESS KEY ID and ASSUME ROLE ARN");
268 }
269 };
270
271 ConnectionDetails::Aws(AwsConnection {
272 auth,
273 endpoint: match self.endpoint {
274 Some(endpoint) if !endpoint.is_empty() => Some(endpoint),
279 _ => None,
280 },
281 region: self.region,
282 })
283 }
284 CreateConnectionType::AwsPrivatelink => {
285 let connection = AwsPrivatelinkConnection {
286 service_name: self
287 .service_name
288 .ok_or_else(|| sql_err!("SERVICE NAME option is required"))?,
289 availability_zones: self
290 .availability_zones
291 .ok_or_else(|| sql_err!("AVAILABILITY ZONES option is required"))?,
292 };
293 if let Some(supported_azs) = scx.catalog.aws_privatelink_availability_zones() {
294 let mut unique_azs: BTreeSet<String> = BTreeSet::new();
295 let mut duplicate_azs: BTreeSet<String> = BTreeSet::new();
296 for connection_az in &connection.availability_zones {
298 if unique_azs.contains(connection_az) {
299 duplicate_azs.insert(connection_az.to_string());
300 } else {
301 unique_azs.insert(connection_az.to_string());
302 }
303 if !supported_azs.contains(connection_az) {
304 return Err(PlanError::InvalidPrivatelinkAvailabilityZone {
305 name: connection_az.to_string(),
306 supported_azs,
307 });
308 }
309 }
310 if duplicate_azs.len() > 0 {
311 return Err(PlanError::DuplicatePrivatelinkAvailabilityZone {
312 duplicate_azs,
313 });
314 }
315 }
316 ConnectionDetails::AwsPrivatelink(connection)
317 }
318 CreateConnectionType::Kafka => {
319 let (tls, sasl) = plan_kafka_security(scx, &self)?;
320 let (static_brokers, matching_rules) = self.get_brokers_and_rules(scx)?;
321
322 if !matching_rules.is_empty() {
323 scx.require_feature_flag(&vars::ENABLE_KAFKA_BROKER_MATCHING_RULES)?;
324 }
325
326 ConnectionDetails::Kafka(KafkaConnection {
327 brokers: static_brokers,
328 default_tunnel: build_tunnel_definition(
329 scx,
330 self.ssh_tunnel,
331 self.aws_privatelink,
332 if matching_rules.is_empty() { None } else { Some(matching_rules) },
333 )?,
334 progress_topic: self.progress_topic,
335 progress_topic_options: KafkaTopicOptions {
336 partition_count: Some(NonNeg::try_from(1).expect("1 is positive")),
340 replication_factor: self.progress_topic_replication_factor.map(|val| {
341 if val <= 0 {
342 Err(sql_err!("invalid CONNECTION: PROGRESS TOPIC REPLICATION FACTOR must be greater than 0"))?
343 }
344 NonNeg::try_from(val).map_err(|e| sql_err!("{e}"))
345 }).transpose()?,
346 topic_config: btreemap! {
347 "cleanup.policy".to_string() => "compact".to_string(),
348 "segment.bytes".to_string() => "134217728".to_string(), },
350 },
351 options: BTreeMap::new(),
352 tls,
353 sasl,
354 })
355 }
356 CreateConnectionType::Csr => {
357 let url: reqwest::Url = match self.url {
358 Some(url) => url
359 .parse()
360 .map_err(|e| sql_err!("parsing schema registry url: {e}"))?,
361 None => sql_bail!("invalid CONNECTION: must specify URL"),
362 };
363 let _ = url
364 .host_str()
365 .ok_or_else(|| sql_err!("invalid CONNECTION: URL must specify domain name"))?;
366 if url.path() != "/" {
367 sql_bail!("invalid CONNECTION: URL must have an empty path");
368 }
369 let cert = self.ssl_certificate;
370 let key = self.ssl_key.map(|secret| secret.into());
371 let tls_identity = match (cert, key) {
372 (None, None) => None,
373 (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
374 _ => sql_bail!(
375 "invalid CONNECTION: reading from SSL-auth Confluent Schema Registry requires both SSL KEY and SSL CERTIFICATE"
376 ),
377 };
378 let http_auth = self.user.map(|username| CsrConnectionHttpAuth {
379 username,
380 password: self.password.map(|secret| secret.into()),
381 });
382
383 if let Some(privatelink) = self.aws_privatelink.as_ref() {
385 if privatelink.port.is_some() {
386 sql_bail!(
387 "invalid CONNECTION: CONFLUENT SCHEMA REGISTRY does not support PORT for AWS PRIVATELINK"
388 )
389 }
390 }
391 let tunnel = build_tunnel_definition(
392 scx,
393 self.ssh_tunnel,
394 self.aws_privatelink,
395 None, )?;
397
398 ConnectionDetails::Csr(CsrConnection {
399 url,
400 tls_root_cert: self.ssl_certificate_authority,
401 tls_identity,
402 http_auth,
403 tunnel,
404 })
405 }
406 CreateConnectionType::GlueSchemaRegistry => {
407 scx.require_feature_flag(&vars::ENABLE_GLUE_SCHEMA_REGISTRY)?;
408
409 let aws_connection = get_aws_connection_reference(scx, &self)?
410 .ok_or_else(|| sql_err!("AWS CONNECTION option is required"))?;
411 let registry_name = self
412 .registry
413 .ok_or_else(|| sql_err!("REGISTRY option is required"))?;
414 if registry_name.is_empty() {
415 sql_bail!("invalid CONNECTION: REGISTRY must not be empty");
416 }
417
418 ConnectionDetails::GlueSchemaRegistry(GlueSchemaRegistryConnection {
419 aws_connection,
420 registry_name,
421 })
422 }
423 CreateConnectionType::Postgres => {
424 let cert = self.ssl_certificate;
425 let key = self.ssl_key.map(|secret| secret.into());
426 let tls_identity = match (cert, key) {
427 (None, None) => None,
428 (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
429 _ => sql_bail!(
430 "invalid CONNECTION: both SSL KEY and SSL CERTIFICATE are required"
431 ),
432 };
433 let tls_mode = match self.ssl_mode.as_ref().map(|m| m.as_str()) {
434 None | Some("disable") => tokio_postgres::config::SslMode::Disable,
435 Some("require") | Some("required") => tokio_postgres::config::SslMode::Require,
438 Some("verify_ca") | Some("verify-ca") => {
439 tokio_postgres::config::SslMode::VerifyCa
440 }
441 Some("verify_full") | Some("verify-full") => {
442 tokio_postgres::config::SslMode::VerifyFull
443 }
444 Some(m) => sql_bail!("invalid CONNECTION: unknown SSL MODE {}", m.quoted()),
445 };
446
447 if let Some(privatelink) = self.aws_privatelink.as_ref() {
449 if privatelink.port.is_some() {
450 sql_bail!(
451 "invalid CONNECTION: POSTGRES does not support PORT for AWS PRIVATELINK"
452 )
453 }
454 }
455 let tunnel = build_tunnel_definition(
456 scx,
457 self.ssh_tunnel,
458 self.aws_privatelink,
459 None, )?;
461
462 ConnectionDetails::Postgres(PostgresConnection {
463 database: self
464 .database
465 .ok_or_else(|| sql_err!("DATABASE option is required"))?,
466 password: self.password.map(|password| password.into()),
467 host: self
468 .host
469 .ok_or_else(|| sql_err!("HOST option is required"))?,
470 port: self.port.unwrap_or(5432_u16),
471 tunnel,
472 tls_mode,
473 tls_root_cert: self.ssl_certificate_authority,
474 tls_identity,
475 user: self
476 .user
477 .ok_or_else(|| sql_err!("USER option is required"))?,
478 })
479 }
480 CreateConnectionType::Ssh => {
481 let ensure_key = |public_key| match public_key {
482 Some(public_key) => Ok::<_, anyhow::Error>(SshKey::PublicOnly(public_key)),
483 None => {
484 let key = SshKeyPair::new().context("creating SSH key")?;
485 Ok(SshKey::Both(key))
486 }
487 };
488 ConnectionDetails::Ssh {
489 connection: SshConnection {
490 host: self
491 .host
492 .ok_or_else(|| sql_err!("HOST option is required"))?,
493 port: self.port.unwrap_or(22_u16),
494 user: match self
495 .user
496 .ok_or_else(|| sql_err!("USER option is required"))?
497 {
498 StringOrSecret::String(user) => user,
499 StringOrSecret::Secret(_) => {
500 sql_bail!(
501 "SSH connections do not support supplying USER value as SECRET"
502 )
503 }
504 },
505 },
506 key_1: ensure_key(self.public_key1)?,
507 key_2: ensure_key(self.public_key2)?,
508 }
509 }
510 CreateConnectionType::MySql => {
511 let aws_connection = get_aws_connection_reference(scx, &self)?;
512 if aws_connection.is_some() && self.password.is_some() {
513 sql_bail!(
514 "invalid CONNECTION: AWS IAM authentication is not supported with password"
515 );
516 }
517
518 let cert = self.ssl_certificate;
519 let key = self.ssl_key.map(|secret| secret.into());
520 let tls_identity = match (cert, key) {
521 (None, None) => None,
522 (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
523 _ => sql_bail!(
524 "invalid CONNECTION: both SSL KEY and SSL CERTIFICATE are required"
525 ),
526 };
527 let tls_mode = match self
530 .ssl_mode
531 .map(|f| f.to_uppercase())
532 .as_ref()
533 .map(|m| m.as_str())
534 {
535 None | Some("DISABLED") => {
536 if aws_connection.is_some() {
537 sql_bail!(
538 "invalid CONNECTION: AWS IAM authentication requires SSL to be enabled"
539 )
540 }
541 MySqlSslMode::Disabled
542 }
543 Some("REQUIRED") | Some("REQUIRE") => MySqlSslMode::Required,
546 Some("VERIFY_CA") | Some("VERIFY-CA") => MySqlSslMode::VerifyCa,
547 Some("VERIFY_IDENTITY") | Some("VERIFY-IDENTITY") => {
548 MySqlSslMode::VerifyIdentity
549 }
550 Some(m) => sql_bail!("invalid CONNECTION: unknown SSL MODE {}", m.quoted()),
551 };
552
553 if let Some(privatelink) = self.aws_privatelink.as_ref() {
555 if privatelink.port.is_some() {
556 sql_bail!(
557 "invalid CONNECTION: MYSQL does not support PORT for AWS PRIVATELINK"
558 )
559 }
560 }
561 let tunnel = build_tunnel_definition(
562 scx,
563 self.ssh_tunnel,
564 self.aws_privatelink,
565 None, )?;
567
568 ConnectionDetails::MySql(MySqlConnection {
569 password: self.password.map(|password| password.into()),
570 host: self
571 .host
572 .ok_or_else(|| sql_err!("HOST option is required"))?,
573 port: self.port.unwrap_or(3306_u16),
574 tunnel,
575 tls_mode,
576 tls_root_cert: self.ssl_certificate_authority,
577 tls_identity,
578 user: self
579 .user
580 .ok_or_else(|| sql_err!("USER option is required"))?,
581 aws_connection,
582 })
583 }
584 CreateConnectionType::SqlServer => {
585 let aws_connection = get_aws_connection_reference(scx, &self)?;
586 if aws_connection.is_some() && self.password.is_some() {
587 sql_bail!(
588 "invalid CONNECTION: AWS IAM authentication is not supported with password"
589 );
590 }
591
592 let (encryption, certificate_validation_policy) = match self
593 .ssl_mode
594 .map(|mode| mode.to_uppercase())
595 .as_ref()
596 .map(|mode| mode.as_str())
597 {
598 None | Some("DISABLED") => (
599 mz_sql_server_util::config::EncryptionLevel::None,
600 mz_sql_server_util::config::CertificateValidationPolicy::TrustAll,
601 ),
602 Some("REQUIRED") => (
603 mz_sql_server_util::config::EncryptionLevel::Required,
604 mz_sql_server_util::config::CertificateValidationPolicy::TrustAll,
605 ),
606 Some("VERIFY") => (
607 mz_sql_server_util::config::EncryptionLevel::Required,
608 mz_sql_server_util::config::CertificateValidationPolicy::VerifySystem,
609 ),
610 Some("VERIFY_CA") => {
611 if self.ssl_certificate_authority.is_none() {
612 sql_bail!(
613 "invalid CONNECTION: SSL MODE 'verify_ca' requires SSL CERTIFICATE AUTHORITY"
614 );
615 }
616 (
617 mz_sql_server_util::config::EncryptionLevel::Required,
618 mz_sql_server_util::config::CertificateValidationPolicy::VerifyCA,
619 )
620 }
621 Some(mode) => {
622 sql_bail!("invalid CONNECTION: unknown SSL MODE {}", mode.quoted())
623 }
624 };
625
626 if let Some(privatelink) = self.aws_privatelink.as_ref() {
627 if privatelink.port.is_some() {
628 sql_bail!(
629 "invalid CONNECTION: SQL SERVER does not support PORT for AWS PRIVATELINK"
630 )
631 }
632 }
633
634 let port = self.port.unwrap_or(1433_u16);
638 let tunnel = build_tunnel_definition(
639 scx,
640 self.ssh_tunnel,
641 self.aws_privatelink,
642 None, )?;
644
645 ConnectionDetails::SqlServer(SqlServerConnectionDetails {
646 host: self
647 .host
648 .ok_or_else(|| sql_err!("HOST option is required"))?,
649 port,
650 database: self
651 .database
652 .ok_or_else(|| sql_err!("DATABASE option is required"))?,
653 user: self
654 .user
655 .ok_or_else(|| sql_err!("USER option is required"))?,
656 password: self
657 .password
658 .ok_or_else(|| sql_err!("PASSWORD option is required"))
659 .map(|pass| pass.into())?,
660 tunnel,
661 encryption,
662 certificate_validation_policy,
663 tls_root_cert: self.ssl_certificate_authority,
664 })
665 }
666 CreateConnectionType::IcebergCatalog => {
667 let catalog_type = self.catalog_type.clone().ok_or_else(|| {
668 sql_err!("invalid CONNECTION: ICEBERG connections must specify CATALOG TYPE")
669 })?;
670
671 let uri: reqwest::Url = match &self.url {
672 Some(url) => url
673 .parse()
674 .map_err(|e| sql_err!("parsing Iceberg catalog url: {e}"))?,
675 None => sql_bail!("invalid CONNECTION: must specify URL"),
676 };
677
678 let warehouse = self.warehouse.clone();
679 let credential = self.credential.clone();
680 let aws_connection = get_aws_connection_reference(scx, &self)?;
681
682 let catalog = match catalog_type {
683 IcebergCatalogType::S3TablesRest => {
684 let Some(warehouse) = warehouse else {
685 sql_bail!(
686 "invalid CONNECTION: ICEBERG s3tablesrest connections must specify WAREHOUSE"
687 );
688 };
689 let Some(aws_connection) = aws_connection else {
690 sql_bail!(
691 "invalid CONNECTION: ICEBERG s3tablesrest connections require an AWS connection"
692 );
693 };
694
695 IcebergCatalogImpl::S3TablesRest(S3TablesRestIcebergCatalog {
696 aws_connection,
697 warehouse,
698 })
699 }
700 IcebergCatalogType::Rest => {
701 let Some(credential) = credential else {
702 sql_bail!(
703 "invalid CONNECTION: ICEBERG rest connections require a CREDENTIAL"
704 );
705 };
706
707 IcebergCatalogImpl::Rest(RestIcebergCatalog {
708 credential,
709 scope: self.scope.clone(),
710 warehouse,
711 })
712 }
713 };
714
715 ConnectionDetails::IcebergCatalog(IcebergCatalogConnection { catalog, uri })
716 }
717 };
718
719 Ok(connection)
720 }
721
722 pub fn get_brokers_and_rules(
723 &self,
724 scx: &StatementContext,
725 ) -> Result<
726 (
727 Vec<mz_storage_types::connections::KafkaBroker<ReferencedConnection>>,
728 Vec<KafkaMatchingBrokerRule<Aug>>,
729 ),
730 PlanError,
731 > {
732 let mut all_brokers: Vec<KafkaBroker<Aug>> = vec![];
734 let mut matching_rules: Vec<KafkaMatchingBrokerRule<Aug>> = vec![];
735
736 match (&self.broker, &self.brokers, &self.aws_privatelink) {
738 (Some(broker), None, None) => all_brokers.extend(broker.iter().cloned()),
740 (None, Some(broker_list), None) => {
742 all_brokers.extend(broker_list.static_entries.iter().cloned());
743 matching_rules.extend(broker_list.matching_rules.iter().cloned());
744 }
745 (None, None, Some(_privatelink)) => {
747 }
749 (None, None, None) => {
751 sql_bail!("invalid CONNECTION: must set one of BROKER, BROKERS, or AWS PRIVATELINK")
752 }
753 _ => sql_bail!(
755 "invalid CONNECTION: can only set one of BROKER, BROKERS, or AWS PRIVATELINK"
756 ),
757 };
758
759 if !matching_rules.is_empty() && all_brokers.is_empty() {
761 sql_bail!(
762 "invalid CONNECTION: BROKERS must contain at least one static broker address"
763 );
764 }
765
766 let mut out = vec![];
769 for broker in &all_brokers {
770 if broker.address.contains(',') {
771 sql_bail!(
772 "invalid CONNECTION: cannot specify multiple Kafka broker addresses in one string.\n\nInstead, specify BROKERS using multiple strings, e.g. BROKERS ('kafka:9092', 'kafka:9093')"
773 );
774 }
775
776 let tunnel = match &broker.tunnel {
777 KafkaBrokerTunnel::Direct => Tunnel::Direct,
778 KafkaBrokerTunnel::AwsPrivatelink(aws_privatelink) => {
779 Tunnel::AwsPrivatelink(plan_privatelink(scx, aws_privatelink)?)
780 }
781 KafkaBrokerTunnel::SshTunnel(ssh) => {
782 let id = match &ssh {
783 ResolvedItemName::Item { id, .. } => id,
784 _ => sql_bail!(
785 "internal error: Kafka SSH tunnel connection was not resolved"
786 ),
787 };
788 let ssh_tunnel = scx.catalog.get_item(id);
789 match ssh_tunnel.connection()? {
790 Connection::Ssh(_connection) => Tunnel::Ssh(SshTunnel {
791 connection_id: *id,
792 connection: *id,
793 }),
794 _ => {
795 sql_bail!("{} is not an SSH connection", ssh_tunnel.name().item)
796 }
797 }
798 }
799 };
800
801 out.push(mz_storage_types::connections::KafkaBroker {
802 address: broker.address.clone(),
803 tunnel,
804 });
805 }
806
807 Ok((out, matching_rules))
808 }
809}
810
811fn get_aws_connection_reference(
812 scx: &StatementContext,
813 conn_options: &ConnectionOptionExtracted,
814) -> Result<Option<AwsConnectionReference<ReferencedConnection>>, PlanError> {
815 let Some(aws_connection_id) = conn_options.aws_connection else {
816 return Ok(None);
817 };
818
819 let id = CatalogItemId::from(aws_connection_id);
820 let item = scx.catalog.get_item(&id);
821 Ok(match item.connection()? {
822 Connection::Aws(_) => Some(AwsConnectionReference {
823 connection_id: id,
824 connection: id,
825 }),
826 _ => sql_bail!("{} is not an AWS connection", item.name().item),
827 })
828}
829
830fn plan_kafka_security(
831 scx: &StatementContext,
832 v: &ConnectionOptionExtracted,
833) -> Result<
834 (
835 Option<KafkaTlsConfig>,
836 Option<KafkaSaslConfig<ReferencedConnection>>,
837 ),
838 PlanError,
839> {
840 const SASL_CONFIGS: [ConnectionOptionName; 4] = [
841 ConnectionOptionName::AwsConnection,
842 ConnectionOptionName::SaslMechanisms,
843 ConnectionOptionName::SaslUsername,
844 ConnectionOptionName::SaslPassword,
845 ];
846
847 const ALL_CONFIGS: [ConnectionOptionName; 7] = concat_arrays!(
848 [
849 ConnectionOptionName::SslKey,
850 ConnectionOptionName::SslCertificate,
851 ConnectionOptionName::SslCertificateAuthority,
852 ],
853 SASL_CONFIGS
854 );
855
856 enum SecurityProtocol {
857 Plaintext,
858 Ssl,
859 SaslPlaintext,
860 SaslSsl,
861 }
862
863 let security_protocol = v.security_protocol.as_ref().map(|v| v.to_uppercase());
864 let security_protocol = match security_protocol.as_deref() {
865 Some("PLAINTEXT") => SecurityProtocol::Plaintext,
866 Some("SSL") => SecurityProtocol::Ssl,
867 Some("SASL_PLAINTEXT") => SecurityProtocol::SaslPlaintext,
868 Some("SASL_SSL") => SecurityProtocol::SaslSsl,
869 Some(p) => sql_bail!("unknown security protocol: {}", p),
870 None if SASL_CONFIGS.iter().any(|c| v.seen.contains(c)) => SecurityProtocol::SaslSsl,
875 None => SecurityProtocol::Ssl,
876 };
877
878 let mut outstanding = ALL_CONFIGS
879 .into_iter()
880 .filter(|c| v.seen.contains(c))
881 .collect::<BTreeSet<ConnectionOptionName>>();
882
883 let tls = match security_protocol {
884 SecurityProtocol::Ssl | SecurityProtocol::SaslSsl => {
885 outstanding.remove(&ConnectionOptionName::SslCertificate);
886 let identity = match &v.ssl_certificate {
887 None => None,
888 Some(cert) => {
889 outstanding.remove(&ConnectionOptionName::SslKey);
890 let Some(key) = &v.ssl_key else {
891 sql_bail!("SSL KEY must be specified with SSL CERTIFICATE");
892 };
893 Some(TlsIdentity {
894 cert: cert.clone(),
895 key: (*key).into(),
896 })
897 }
898 };
899 outstanding.remove(&ConnectionOptionName::SslCertificateAuthority);
900 Some(KafkaTlsConfig {
901 identity,
902 root_cert: v.ssl_certificate_authority.clone(),
903 })
904 }
905 _ => None,
906 };
907
908 let sasl = match security_protocol {
909 SecurityProtocol::SaslPlaintext | SecurityProtocol::SaslSsl => {
910 outstanding.remove(&ConnectionOptionName::AwsConnection);
911 match get_aws_connection_reference(scx, v)? {
912 Some(aws) => Some(KafkaSaslConfig {
913 mechanism: "OAUTHBEARER".into(),
914 username: "".into(),
915 password: None,
916 aws: Some(aws),
917 }),
918 None => {
919 outstanding.remove(&ConnectionOptionName::SaslMechanisms);
920 outstanding.remove(&ConnectionOptionName::SaslUsername);
921 outstanding.remove(&ConnectionOptionName::SaslPassword);
922 let Some(mechanism) = &v.sasl_mechanisms else {
925 sql_bail!("SASL MECHANISMS must be specified");
926 };
927 let Some(username) = &v.sasl_username else {
928 sql_bail!("SASL USERNAME must be specified");
929 };
930 let Some(password) = &v.sasl_password else {
931 sql_bail!("SASL PASSWORD must be specified");
932 };
933 Some(KafkaSaslConfig {
934 mechanism: mechanism.to_uppercase(),
945 username: username.clone(),
946 password: Some((*password).into()),
947 aws: None,
948 })
949 }
950 }
951 }
952 _ => None,
953 };
954
955 if let Some(outstanding) = outstanding.first() {
956 sql_bail!("option {outstanding} not supported with this configuration");
957 }
958
959 Ok((tls, sasl))
960}
961pub fn plan_default_privatelink(
962 scx: &StatementContext,
963 pl: &mz_sql_parser::ast::ConnectionDefaultAwsPrivatelink<Aug>,
964) -> Result<AwsPrivatelink, PlanError> {
965 let id = pl.connection.item_id().clone();
966 let entry = scx.catalog.get_item(&id);
967 match entry.connection()? {
968 Connection::AwsPrivatelink(_) => Ok(AwsPrivatelink {
969 connection_id: id,
970 availability_zone: None,
972 port: pl.port,
974 }),
975 _ => sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item),
976 }
977}
978
979pub fn plan_privatelink(
980 scx: &StatementContext,
981 pl: &mz_sql_parser::ast::KafkaBrokerAwsPrivatelink<Aug>,
982) -> Result<AwsPrivatelink, PlanError> {
983 let KafkaBrokerAwsPrivatelinkOptionExtracted {
984 availability_zone,
985 port,
986 seen: _,
987 } = KafkaBrokerAwsPrivatelinkOptionExtracted::try_from(pl.options.clone())?;
988
989 let id = match &pl.connection {
990 ResolvedItemName::Item { id, .. } => id,
991 _ => sql_bail!("internal error: Kafka PrivateLink connection was not resolved"),
992 };
993 let entry = scx.catalog.get_item(id);
994 match entry.connection()? {
995 Connection::AwsPrivatelink(connection) => {
996 if let Some(az) = &availability_zone {
997 if !connection.availability_zones.contains(az) {
998 sql_bail!(
999 "AWS PrivateLink availability zone {} does not match any of the \
1000 availability zones on the AWS PrivateLink connection {}",
1001 az.quoted(),
1002 scx.catalog
1003 .resolve_full_name(entry.name())
1004 .to_string()
1005 .quoted()
1006 )
1007 }
1008 }
1009 Ok(AwsPrivatelink {
1010 connection_id: *id,
1011 availability_zone,
1012 port,
1013 })
1014 }
1015 _ => {
1016 sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item)
1017 }
1018 }
1019}
1020
1021pub(crate) fn build_tunnel_definition(
1022 scx: &StatementContext,
1023 ssh_tunnel: Option<with_options::Object>,
1024 aws_privatelink: Option<ConnectionDefaultAwsPrivatelink<Aug>>,
1025 matching_rules: Option<Vec<KafkaMatchingBrokerRule<Aug>>>,
1026) -> Result<Tunnel<ReferencedConnection>, PlanError> {
1027 Ok(match (ssh_tunnel, aws_privatelink, matching_rules) {
1028 (None, None, None) => Tunnel::Direct,
1029 (Some(ssh_tunnel), None, None) => {
1030 let id = CatalogItemId::from(ssh_tunnel);
1031 let ssh_tunnel = scx.catalog.get_item(&id);
1032 match ssh_tunnel.connection()? {
1033 Connection::Ssh(_connection) => Tunnel::Ssh(SshTunnel {
1034 connection_id: id,
1035 connection: id,
1036 }),
1037 _ => sql_bail!("{} is not an SSH connection", ssh_tunnel.name().item),
1038 }
1039 }
1040 (None, Some(aws_privatelink), None) => {
1041 Tunnel::AwsPrivatelink(plan_default_privatelink(scx, &aws_privatelink)?)
1042 }
1043 (None, None, Some(rules)) => {
1044 if rules.is_empty() {
1045 sql_bail!("BROKERS MATCHING rules list cannot be empty");
1046 }
1047
1048 let rules = rules
1049 .iter()
1050 .map(|rule| {
1051 Ok(AwsPrivatelinkRule {
1052 pattern: rule.pattern.clone(),
1053 to: plan_privatelink(scx, &rule.tunnel)?,
1054 })
1055 })
1056 .collect::<Result<Vec<_>, PlanError>>()?;
1057 Tunnel::AwsPrivatelinks(mz_storage_types::connections::AwsPrivatelinks { rules })
1058 }
1059 _ => {
1060 sql_bail!("cannot specify both SSH TUNNEL and AWS PRIVATELINK");
1061 }
1062 })
1063}