1use std::collections::{BTreeMap, BTreeSet};
13
14use anyhow::Context;
15use array_concat::concat_arrays;
16use itertools::Itertools;
17use maplit::btreemap;
18use mz_ore::num::NonNeg;
19use mz_ore::str::StrExt;
20use mz_repr::CatalogItemId;
21use mz_sql_parser::ast::ConnectionOptionName::*;
22use mz_sql_parser::ast::display::AstDisplay;
23use mz_sql_parser::ast::{
24 ConnectionDefaultAwsPrivatelink, ConnectionOption, ConnectionOptionName, CreateConnectionType,
25 KafkaBroker, KafkaBrokerAwsPrivatelinkOption, KafkaBrokerAwsPrivatelinkOptionName,
26 KafkaBrokerTunnel, KafkaMatchingBrokerRule,
27};
28use mz_ssh_util::keys::SshKeyPair;
29use mz_storage_types::connections::aws::{
30 AwsAssumeRole, AwsAuth, AwsConnection, AwsConnectionReference, AwsCredentials,
31};
32use mz_storage_types::connections::gcp::{GcpConnection, GcpConnectionReference};
33use mz_storage_types::connections::inline::ReferencedConnection;
34use mz_storage_types::connections::string_or_secret::StringOrSecret;
35use mz_storage_types::connections::{
36 AwsPrivatelink, AwsPrivatelinkConnection, AwsPrivatelinkRule, CsrConnection,
37 CsrConnectionHttpAuth, GlueSchemaRegistryConnection, IcebergCatalogAuth,
38 IcebergCatalogConnection, IcebergCatalogImpl, IcebergCatalogType, KafkaConnection,
39 KafkaSaslConfig, KafkaTlsConfig, KafkaTopicOptions, MySqlConnection, MySqlSslMode,
40 PostgresConnection, RestIcebergCatalog, S3TablesRestIcebergCatalog, SqlServerConnectionDetails,
41 SshConnection, SshTunnel, TlsIdentity, Tunnel,
42};
43
44use crate::names::Aug;
45use crate::plan::statement::{Connection, ResolvedItemName};
46use crate::plan::with_options::{self};
47use crate::plan::{ConnectionDetails, PlanError, SshKey, StatementContext};
48use crate::session::vars;
49
50generate_extracted_config!(
51 ConnectionOption,
52 (AccessKeyId, StringOrSecret),
53 (AssumeRoleArn, String),
54 (AssumeRoleSessionName, String),
55 (AvailabilityZones, Vec<String>),
56 (AwsConnection, with_options::Object),
57 (AwsPrivatelink, ConnectionDefaultAwsPrivatelink<Aug>),
58 (Broker, Vec<KafkaBroker<Aug>>),
59 (Brokers, with_options::BrokersList),
60 (Credential, StringOrSecret),
61 (Database, String),
62 (Endpoint, String),
63 (GcpConnection, with_options::Object),
64 (Host, String),
65 (Password, with_options::Secret),
66 (Port, u16),
67 (ProgressTopic, String),
68 (ProgressTopicReplicationFactor, i32),
69 (PublicKey1, String),
70 (PublicKey2, String),
71 (Region, String),
72 (Registry, String),
73 (SaslMechanisms, String),
74 (SaslPassword, with_options::Secret),
75 (SaslUsername, StringOrSecret),
76 (Scope, String),
77 (SecretAccessKey, with_options::Secret),
78 (SecurityProtocol, String),
79 (ServiceAccountKey, with_options::Secret),
80 (ServiceName, String),
81 (SshTunnel, with_options::Object),
82 (SslCertificate, StringOrSecret),
83 (SslCertificateAuthority, StringOrSecret),
84 (SslKey, with_options::Secret),
85 (SslMode, String),
86 (SessionToken, StringOrSecret),
87 (CatalogType, IcebergCatalogType),
88 (Url, String),
89 (User, StringOrSecret),
90 (Warehouse, String)
91);
92
93generate_extracted_config!(
94 KafkaBrokerAwsPrivatelinkOption,
95 (AvailabilityZone, String),
96 (Port, u16)
97);
98
99pub(crate) const INALTERABLE_OPTIONS: &[ConnectionOptionName] =
101 &[ProgressTopic, ProgressTopicReplicationFactor];
102
103pub(crate) const MUTUALLY_EXCLUSIVE_SETS: &[&[ConnectionOptionName]] = &[&[Broker, Brokers]];
105
106pub(super) fn validate_options_per_connection_type(
107 t: CreateConnectionType,
108 mut options: BTreeSet<ConnectionOptionName>,
109) -> Result<(), PlanError> {
110 use mz_sql_parser::ast::ConnectionOptionName::*;
111 let permitted_options = match t {
112 CreateConnectionType::Aws => [
113 AccessKeyId,
114 SecretAccessKey,
115 SessionToken,
116 Endpoint,
117 Region,
118 AssumeRoleArn,
119 AssumeRoleSessionName,
120 ]
121 .as_slice(),
122 CreateConnectionType::AwsPrivatelink => &[AvailabilityZones, Port, ServiceName],
123 CreateConnectionType::GlueSchemaRegistry => &[AwsConnection, Registry],
124 CreateConnectionType::Gcp => &[ServiceAccountKey],
125 CreateConnectionType::Csr => &[
126 AwsPrivatelink,
127 Password,
128 Port,
129 SshTunnel,
130 SslCertificate,
131 SslCertificateAuthority,
132 SslKey,
133 Url,
134 User,
135 ],
136 CreateConnectionType::Kafka => &[
137 AwsConnection,
138 Broker,
139 Brokers,
140 ProgressTopic,
141 ProgressTopicReplicationFactor,
142 AwsPrivatelink,
143 SshTunnel,
144 SslKey,
145 SslCertificate,
146 SslCertificateAuthority,
147 SaslMechanisms,
148 SaslUsername,
149 SaslPassword,
150 SecurityProtocol,
151 ],
152 CreateConnectionType::Postgres => &[
153 AwsPrivatelink,
154 Database,
155 Host,
156 Password,
157 Port,
158 SshTunnel,
159 SslCertificate,
160 SslCertificateAuthority,
161 SslKey,
162 SslMode,
163 User,
164 ],
165 CreateConnectionType::Ssh => &[Host, Port, User, PublicKey1, PublicKey2],
166 CreateConnectionType::MySql => &[
167 AwsPrivatelink,
168 Host,
169 Password,
170 Port,
171 SshTunnel,
172 SslCertificate,
173 SslCertificateAuthority,
174 SslKey,
175 SslMode,
176 User,
177 AwsConnection,
178 ],
179 CreateConnectionType::SqlServer => &[
180 AwsPrivatelink,
181 Database,
182 Host,
183 Password,
184 Port,
185 SshTunnel,
186 SslCertificate,
187 SslCertificateAuthority,
188 SslKey,
189 SslMode,
190 User,
191 ],
192 CreateConnectionType::IcebergCatalog => &[
193 AwsConnection,
194 CatalogType,
195 Credential,
196 GcpConnection,
197 Scope,
198 Url,
199 Warehouse,
200 ],
201 };
202
203 for o in permitted_options {
204 options.remove(o);
205 }
206
207 if !options.is_empty() {
208 sql_bail!(
209 "{} connections do not support {} values",
210 t,
211 options.iter().join(", ")
212 )
213 }
214
215 Ok(())
216}
217
218impl ConnectionOptionExtracted {
219 pub(super) fn ensure_only_valid_options(
220 &self,
221 t: CreateConnectionType,
222 ) -> Result<(), PlanError> {
223 validate_options_per_connection_type(t, self.seen.clone())
224 }
225
226 pub fn try_into_connection_details(
227 self,
228 scx: &StatementContext,
229 connection_type: CreateConnectionType,
230 ) -> Result<ConnectionDetails, PlanError> {
231 self.ensure_only_valid_options(connection_type)?;
232
233 let connection: ConnectionDetails = match connection_type {
234 CreateConnectionType::Aws => {
235 let credentials = match (
236 self.access_key_id,
237 self.secret_access_key,
238 self.session_token,
239 ) {
240 (Some(access_key_id), Some(secret_access_key), session_token) => {
241 Some(AwsCredentials {
242 access_key_id,
243 secret_access_key: secret_access_key.into(),
244 session_token,
245 })
246 }
247 (None, None, None) => None,
248 _ => {
249 sql_bail!(
250 "must specify both ACCESS KEY ID and SECRET ACCESS KEY with optional SESSION TOKEN"
251 );
252 }
253 };
254
255 let assume_role = match (self.assume_role_arn, self.assume_role_session_name) {
256 (Some(arn), session_name) => Some(AwsAssumeRole { arn, session_name }),
257 (None, Some(_)) => {
258 sql_bail!(
259 "must specify ASSUME ROLE ARN with optional ASSUME ROLE SESSION NAME"
260 );
261 }
262 _ => None,
263 };
264
265 let auth = match (credentials, assume_role) {
266 (None, None) => sql_bail!(
267 "must specify either ASSUME ROLE ARN or ACCESS KEY ID and SECRET ACCESS KEY"
268 ),
269 (Some(credentials), None) => AwsAuth::Credentials(credentials),
270 (None, Some(assume_role)) => AwsAuth::AssumeRole(assume_role),
271 (Some(_), Some(_)) => {
272 sql_bail!("cannot specify both ACCESS KEY ID and ASSUME ROLE ARN");
273 }
274 };
275
276 ConnectionDetails::Aws(AwsConnection {
277 auth,
278 endpoint: match self.endpoint {
279 Some(endpoint) if !endpoint.is_empty() => Some(endpoint),
284 _ => None,
285 },
286 region: self.region,
287 })
288 }
289 CreateConnectionType::AwsPrivatelink => {
290 let connection = AwsPrivatelinkConnection {
291 service_name: self
292 .service_name
293 .ok_or_else(|| sql_err!("SERVICE NAME option is required"))?,
294 availability_zones: self
295 .availability_zones
296 .ok_or_else(|| sql_err!("AVAILABILITY ZONES option is required"))?,
297 };
298 if let Some(supported_azs) = scx.catalog.aws_privatelink_availability_zones() {
299 let mut unique_azs: BTreeSet<String> = BTreeSet::new();
300 let mut duplicate_azs: BTreeSet<String> = BTreeSet::new();
301 for connection_az in &connection.availability_zones {
303 if unique_azs.contains(connection_az) {
304 duplicate_azs.insert(connection_az.to_string());
305 } else {
306 unique_azs.insert(connection_az.to_string());
307 }
308 if !supported_azs.contains(connection_az) {
309 return Err(PlanError::InvalidPrivatelinkAvailabilityZone {
310 name: connection_az.to_string(),
311 supported_azs,
312 });
313 }
314 }
315 if duplicate_azs.len() > 0 {
316 return Err(PlanError::DuplicatePrivatelinkAvailabilityZone {
317 duplicate_azs,
318 });
319 }
320 }
321 ConnectionDetails::AwsPrivatelink(connection)
322 }
323 CreateConnectionType::Gcp => {
324 let credentials_json = self
325 .service_account_key
326 .ok_or_else(|| sql_err!("SERVICE ACCOUNT KEY option is required"))?
327 .into();
328 ConnectionDetails::Gcp(GcpConnection { credentials_json })
329 }
330 CreateConnectionType::Kafka => {
331 let (tls, sasl) = plan_kafka_security(scx, &self)?;
332 let (static_brokers, matching_rules) = self.get_brokers_and_rules(scx)?;
333
334 if !matching_rules.is_empty() {
335 scx.require_feature_flag(&vars::ENABLE_KAFKA_BROKER_MATCHING_RULES)?;
336 }
337
338 ConnectionDetails::Kafka(KafkaConnection {
339 brokers: static_brokers,
340 default_tunnel: build_tunnel_definition(
341 scx,
342 self.ssh_tunnel,
343 self.aws_privatelink,
344 if matching_rules.is_empty() { None } else { Some(matching_rules) },
345 )?,
346 progress_topic: self.progress_topic,
347 progress_topic_options: KafkaTopicOptions {
348 partition_count: Some(NonNeg::try_from(1).expect("1 is positive")),
352 replication_factor: self.progress_topic_replication_factor.map(|val| {
353 if val <= 0 {
354 Err(sql_err!("invalid CONNECTION: PROGRESS TOPIC REPLICATION FACTOR must be greater than 0"))?
355 }
356 NonNeg::try_from(val).map_err(|e| sql_err!("{e}"))
357 }).transpose()?,
358 topic_config: btreemap! {
359 "cleanup.policy".to_string() => "compact".to_string(),
360 "segment.bytes".to_string() => "134217728".to_string(), },
362 },
363 options: BTreeMap::new(),
364 tls,
365 sasl,
366 })
367 }
368 CreateConnectionType::Csr => {
369 let url: reqwest::Url = match self.url {
370 Some(url) => url
371 .parse()
372 .map_err(|e| sql_err!("parsing schema registry url: {e}"))?,
373 None => sql_bail!("invalid CONNECTION: must specify URL"),
374 };
375 let _ = url
376 .host_str()
377 .ok_or_else(|| sql_err!("invalid CONNECTION: URL must specify domain name"))?;
378 if url.path() != "/" {
379 sql_bail!("invalid CONNECTION: URL must have an empty path");
380 }
381 let cert = self.ssl_certificate;
382 let key = self.ssl_key.map(|secret| secret.into());
383 let tls_identity = match (cert, key) {
384 (None, None) => None,
385 (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
386 _ => sql_bail!(
387 "invalid CONNECTION: reading from SSL-auth Confluent Schema Registry requires both SSL KEY and SSL CERTIFICATE"
388 ),
389 };
390 let http_auth = self.user.map(|username| CsrConnectionHttpAuth {
391 username,
392 password: self.password.map(|secret| secret.into()),
393 });
394
395 if let Some(privatelink) = self.aws_privatelink.as_ref() {
397 if privatelink.port.is_some() {
398 sql_bail!(
399 "invalid CONNECTION: CONFLUENT SCHEMA REGISTRY does not support PORT for AWS PRIVATELINK"
400 )
401 }
402 }
403 let tunnel = build_tunnel_definition(
404 scx,
405 self.ssh_tunnel,
406 self.aws_privatelink,
407 None, )?;
409
410 ConnectionDetails::Csr(CsrConnection {
411 url,
412 tls_root_cert: self.ssl_certificate_authority,
413 tls_identity,
414 http_auth,
415 tunnel,
416 })
417 }
418 CreateConnectionType::GlueSchemaRegistry => {
419 scx.require_feature_flag(&vars::ENABLE_GLUE_SCHEMA_REGISTRY)?;
420
421 let aws_connection = get_aws_connection_reference(scx, &self)?
422 .ok_or_else(|| sql_err!("AWS CONNECTION option is required"))?;
423 let registry_name = self
424 .registry
425 .ok_or_else(|| sql_err!("REGISTRY option is required"))?;
426 if registry_name.is_empty() {
427 sql_bail!("invalid CONNECTION: REGISTRY must not be empty");
428 }
429
430 ConnectionDetails::GlueSchemaRegistry(GlueSchemaRegistryConnection {
431 aws_connection,
432 registry_name,
433 })
434 }
435 CreateConnectionType::Postgres => {
436 let cert = self.ssl_certificate;
437 let key = self.ssl_key.map(|secret| secret.into());
438 let tls_identity = match (cert, key) {
439 (None, None) => None,
440 (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
441 _ => sql_bail!(
442 "invalid CONNECTION: both SSL KEY and SSL CERTIFICATE are required"
443 ),
444 };
445 let tls_mode = match self.ssl_mode.as_ref().map(|m| m.as_str()) {
446 None | Some("disable") => tokio_postgres::config::SslMode::Disable,
447 Some("require") | Some("required") => tokio_postgres::config::SslMode::Require,
450 Some("verify_ca") | Some("verify-ca") => {
451 tokio_postgres::config::SslMode::VerifyCa
452 }
453 Some("verify_full") | Some("verify-full") => {
454 tokio_postgres::config::SslMode::VerifyFull
455 }
456 Some(m) => sql_bail!("invalid CONNECTION: unknown SSL MODE {}", m.quoted()),
457 };
458
459 if let Some(privatelink) = self.aws_privatelink.as_ref() {
461 if privatelink.port.is_some() {
462 sql_bail!(
463 "invalid CONNECTION: POSTGRES does not support PORT for AWS PRIVATELINK"
464 )
465 }
466 }
467 let tunnel = build_tunnel_definition(
468 scx,
469 self.ssh_tunnel,
470 self.aws_privatelink,
471 None, )?;
473
474 ConnectionDetails::Postgres(PostgresConnection {
475 database: self
476 .database
477 .ok_or_else(|| sql_err!("DATABASE option is required"))?,
478 password: self.password.map(|password| password.into()),
479 host: self
480 .host
481 .ok_or_else(|| sql_err!("HOST option is required"))?,
482 port: self.port.unwrap_or(5432_u16),
483 tunnel,
484 tls_mode,
485 tls_root_cert: self.ssl_certificate_authority,
486 tls_identity,
487 user: self
488 .user
489 .ok_or_else(|| sql_err!("USER option is required"))?,
490 })
491 }
492 CreateConnectionType::Ssh => {
493 let ensure_key = |public_key| match public_key {
494 Some(public_key) => Ok::<_, anyhow::Error>(SshKey::PublicOnly(public_key)),
495 None => {
496 let key = SshKeyPair::new().context("creating SSH key")?;
497 Ok(SshKey::Both(key))
498 }
499 };
500 ConnectionDetails::Ssh {
501 connection: SshConnection {
502 host: self
503 .host
504 .ok_or_else(|| sql_err!("HOST option is required"))?,
505 port: self.port.unwrap_or(22_u16),
506 user: match self
507 .user
508 .ok_or_else(|| sql_err!("USER option is required"))?
509 {
510 StringOrSecret::String(user) => user,
511 StringOrSecret::Secret(_) => {
512 sql_bail!(
513 "SSH connections do not support supplying USER value as SECRET"
514 )
515 }
516 },
517 },
518 key_1: ensure_key(self.public_key1)?,
519 key_2: ensure_key(self.public_key2)?,
520 }
521 }
522 CreateConnectionType::MySql => {
523 let aws_connection = get_aws_connection_reference(scx, &self)?;
524 if aws_connection.is_some() && self.password.is_some() {
525 sql_bail!(
526 "invalid CONNECTION: AWS IAM authentication is not supported with password"
527 );
528 }
529
530 let cert = self.ssl_certificate;
531 let key = self.ssl_key.map(|secret| secret.into());
532 let tls_identity = match (cert, key) {
533 (None, None) => None,
534 (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
535 _ => sql_bail!(
536 "invalid CONNECTION: both SSL KEY and SSL CERTIFICATE are required"
537 ),
538 };
539 let tls_mode = match self
542 .ssl_mode
543 .map(|f| f.to_uppercase())
544 .as_ref()
545 .map(|m| m.as_str())
546 {
547 None | Some("DISABLED") => {
548 if aws_connection.is_some() {
549 sql_bail!(
550 "invalid CONNECTION: AWS IAM authentication requires SSL to be enabled"
551 )
552 }
553 MySqlSslMode::Disabled
554 }
555 Some("REQUIRED") | Some("REQUIRE") => MySqlSslMode::Required,
558 Some("VERIFY_CA") | Some("VERIFY-CA") => MySqlSslMode::VerifyCa,
559 Some("VERIFY_IDENTITY") | Some("VERIFY-IDENTITY") => {
560 MySqlSslMode::VerifyIdentity
561 }
562 Some(m) => sql_bail!("invalid CONNECTION: unknown SSL MODE {}", m.quoted()),
563 };
564
565 if let Some(privatelink) = self.aws_privatelink.as_ref() {
567 if privatelink.port.is_some() {
568 sql_bail!(
569 "invalid CONNECTION: MYSQL does not support PORT for AWS PRIVATELINK"
570 )
571 }
572 }
573 let tunnel = build_tunnel_definition(
574 scx,
575 self.ssh_tunnel,
576 self.aws_privatelink,
577 None, )?;
579
580 ConnectionDetails::MySql(MySqlConnection {
581 password: self.password.map(|password| password.into()),
582 host: self
583 .host
584 .ok_or_else(|| sql_err!("HOST option is required"))?,
585 port: self.port.unwrap_or(3306_u16),
586 tunnel,
587 tls_mode,
588 tls_root_cert: self.ssl_certificate_authority,
589 tls_identity,
590 user: self
591 .user
592 .ok_or_else(|| sql_err!("USER option is required"))?,
593 aws_connection,
594 })
595 }
596 CreateConnectionType::SqlServer => {
597 let aws_connection = get_aws_connection_reference(scx, &self)?;
598 if aws_connection.is_some() && self.password.is_some() {
599 sql_bail!(
600 "invalid CONNECTION: AWS IAM authentication is not supported with password"
601 );
602 }
603
604 let (encryption, certificate_validation_policy) = match self
605 .ssl_mode
606 .map(|mode| mode.to_uppercase())
607 .as_ref()
608 .map(|mode| mode.as_str())
609 {
610 None | Some("DISABLED") => (
611 mz_sql_server_util::config::EncryptionLevel::None,
612 mz_sql_server_util::config::CertificateValidationPolicy::TrustAll,
613 ),
614 Some("REQUIRED") => (
615 mz_sql_server_util::config::EncryptionLevel::Required,
616 mz_sql_server_util::config::CertificateValidationPolicy::TrustAll,
617 ),
618 Some("VERIFY") => (
619 mz_sql_server_util::config::EncryptionLevel::Required,
620 mz_sql_server_util::config::CertificateValidationPolicy::VerifySystem,
621 ),
622 Some("VERIFY_CA") => {
623 if self.ssl_certificate_authority.is_none() {
624 sql_bail!(
625 "invalid CONNECTION: SSL MODE 'verify_ca' requires SSL CERTIFICATE AUTHORITY"
626 );
627 }
628 (
629 mz_sql_server_util::config::EncryptionLevel::Required,
630 mz_sql_server_util::config::CertificateValidationPolicy::VerifyCA,
631 )
632 }
633 Some(mode) => {
634 sql_bail!("invalid CONNECTION: unknown SSL MODE {}", mode.quoted())
635 }
636 };
637
638 if let Some(privatelink) = self.aws_privatelink.as_ref() {
639 if privatelink.port.is_some() {
640 sql_bail!(
641 "invalid CONNECTION: SQL SERVER does not support PORT for AWS PRIVATELINK"
642 )
643 }
644 }
645
646 let port = self.port.unwrap_or(1433_u16);
650 let tunnel = build_tunnel_definition(
651 scx,
652 self.ssh_tunnel,
653 self.aws_privatelink,
654 None, )?;
656
657 ConnectionDetails::SqlServer(SqlServerConnectionDetails {
658 host: self
659 .host
660 .ok_or_else(|| sql_err!("HOST option is required"))?,
661 port,
662 database: self
663 .database
664 .ok_or_else(|| sql_err!("DATABASE option is required"))?,
665 user: self
666 .user
667 .ok_or_else(|| sql_err!("USER option is required"))?,
668 password: self
669 .password
670 .ok_or_else(|| sql_err!("PASSWORD option is required"))
671 .map(|pass| pass.into())?,
672 tunnel,
673 encryption,
674 certificate_validation_policy,
675 tls_root_cert: self.ssl_certificate_authority,
676 })
677 }
678 CreateConnectionType::IcebergCatalog => {
679 let catalog_type = self.catalog_type.clone().ok_or_else(|| {
680 sql_err!("invalid CONNECTION: ICEBERG connections must specify CATALOG TYPE")
681 })?;
682
683 let uri: reqwest::Url = match &self.url {
684 Some(url) => url
685 .parse()
686 .map_err(|e| sql_err!("parsing Iceberg catalog url: {e}"))?,
687 None => sql_bail!("invalid CONNECTION: must specify URL"),
688 };
689
690 let warehouse = self.warehouse.clone();
691 let credential = self.credential.clone();
692 let aws_connection = get_aws_connection_reference(scx, &self)?;
693 let gcp_connection = get_gcp_connection_reference(scx, &self)?;
694
695 let catalog = match catalog_type {
696 IcebergCatalogType::S3TablesRest => {
697 if gcp_connection.is_some() {
698 sql_bail!(
699 "invalid CONNECTION: ICEBERG s3tablesrest connections do not support GCP CONNECTION"
700 );
701 }
702 let Some(warehouse) = warehouse else {
703 sql_bail!(
704 "invalid CONNECTION: ICEBERG s3tablesrest connections must specify WAREHOUSE"
705 );
706 };
707 let Some(aws_connection) = aws_connection else {
708 sql_bail!(
709 "invalid CONNECTION: ICEBERG s3tablesrest connections require an AWS connection"
710 );
711 };
712
713 IcebergCatalogImpl::S3TablesRest(S3TablesRestIcebergCatalog {
714 aws_connection,
715 warehouse,
716 })
717 }
718 IcebergCatalogType::Rest => {
719 if aws_connection.is_some() {
720 sql_bail!(
721 "invalid CONNECTION: ICEBERG rest connections do not support AWS CONNECTION.\n\nTry s3tablesrest instead."
722 );
723 }
724 let auth = match (credential, gcp_connection) {
725 (Some(_), Some(_)) => sql_bail!(
726 "invalid CONNECTION: ICEBERG rest connections may set CREDENTIAL or GCP CONNECTION, not both"
727 ),
728 (Some(credential), None) => IcebergCatalogAuth::OAuth {
729 credential,
730 scope: self.scope.clone(),
731 },
732 (None, Some(gcp_connection)) => {
733 const BIGLAKE_CATALOG_URI: &str =
735 "https://biglake.googleapis.com/iceberg/v1/restcatalog";
736 if uri.to_string() != BIGLAKE_CATALOG_URI {
737 sql_bail!(
738 "GCP connection can only be used with '{}'",
739 BIGLAKE_CATALOG_URI
740 );
741 }
742 IcebergCatalogAuth::Gcp(gcp_connection)
743 }
744 (None, None) => sql_bail!(
745 "invalid CONNECTION: ICEBERG rest connections require a CREDENTIAL or GCP CONNECTION"
746 ),
747 };
748
749 IcebergCatalogImpl::Rest(RestIcebergCatalog { auth, warehouse })
750 }
751 };
752
753 ConnectionDetails::IcebergCatalog(IcebergCatalogConnection { catalog, uri })
754 }
755 };
756
757 Ok(connection)
758 }
759
760 pub fn get_brokers_and_rules(
761 &self,
762 scx: &StatementContext,
763 ) -> Result<
764 (
765 Vec<mz_storage_types::connections::KafkaBroker<ReferencedConnection>>,
766 Vec<KafkaMatchingBrokerRule<Aug>>,
767 ),
768 PlanError,
769 > {
770 let mut all_brokers: Vec<KafkaBroker<Aug>> = vec![];
772 let mut matching_rules: Vec<KafkaMatchingBrokerRule<Aug>> = vec![];
773
774 match (&self.broker, &self.brokers, &self.aws_privatelink) {
776 (Some(broker), None, None) => all_brokers.extend(broker.iter().cloned()),
778 (None, Some(broker_list), None) => {
780 all_brokers.extend(broker_list.static_entries.iter().cloned());
781 matching_rules.extend(broker_list.matching_rules.iter().cloned());
782 }
783 (None, None, Some(_privatelink)) => {
785 }
787 (None, None, None) => {
789 sql_bail!("invalid CONNECTION: must set one of BROKER, BROKERS, or AWS PRIVATELINK")
790 }
791 _ => sql_bail!(
793 "invalid CONNECTION: can only set one of BROKER, BROKERS, or AWS PRIVATELINK"
794 ),
795 };
796
797 if !matching_rules.is_empty() && all_brokers.is_empty() {
799 sql_bail!(
800 "invalid CONNECTION: BROKERS must contain at least one static broker address"
801 );
802 }
803
804 let mut out = vec![];
807 for broker in &all_brokers {
808 if broker.address.contains(',') {
809 sql_bail!(
810 "invalid CONNECTION: cannot specify multiple Kafka broker addresses in one string.\n\nInstead, specify BROKERS using multiple strings, e.g. BROKERS ('kafka:9092', 'kafka:9093')"
811 );
812 }
813
814 let tunnel = match &broker.tunnel {
815 KafkaBrokerTunnel::Direct => Tunnel::Direct,
816 KafkaBrokerTunnel::AwsPrivatelink(aws_privatelink) => {
817 Tunnel::AwsPrivatelink(plan_privatelink(scx, aws_privatelink)?)
818 }
819 KafkaBrokerTunnel::SshTunnel(ssh) => {
820 let id = match &ssh {
821 ResolvedItemName::Item { id, .. } => id,
822 _ => sql_bail!(
823 "internal error: Kafka SSH tunnel connection was not resolved"
824 ),
825 };
826 let ssh_tunnel = scx.catalog.get_item(id);
827 match ssh_tunnel.connection()? {
828 Connection::Ssh(_connection) => Tunnel::Ssh(SshTunnel {
829 connection_id: *id,
830 connection: *id,
831 }),
832 _ => {
833 sql_bail!("{} is not an SSH connection", ssh_tunnel.name().item)
834 }
835 }
836 }
837 };
838
839 out.push(mz_storage_types::connections::KafkaBroker {
840 address: broker.address.clone(),
841 tunnel,
842 });
843 }
844
845 Ok((out, matching_rules))
846 }
847}
848
849fn get_aws_connection_reference(
850 scx: &StatementContext,
851 conn_options: &ConnectionOptionExtracted,
852) -> Result<Option<AwsConnectionReference<ReferencedConnection>>, PlanError> {
853 let Some(aws_connection_id) = conn_options.aws_connection else {
854 return Ok(None);
855 };
856
857 let id = CatalogItemId::from(aws_connection_id);
858 let item = scx.catalog.get_item(&id);
859 Ok(match item.connection()? {
860 Connection::Aws(_) => Some(AwsConnectionReference {
861 connection_id: id,
862 connection: id,
863 }),
864 _ => sql_bail!("{} is not an AWS connection", item.name().item),
865 })
866}
867fn get_gcp_connection_reference(
868 scx: &StatementContext,
869 conn_options: &ConnectionOptionExtracted,
870) -> Result<Option<GcpConnectionReference<ReferencedConnection>>, PlanError> {
871 let Some(gcp_connection_id) = conn_options.gcp_connection else {
872 return Ok(None);
873 };
874
875 let id = CatalogItemId::from(gcp_connection_id);
876 let item = scx.catalog.get_item(&id);
877 Ok(match item.connection()? {
878 Connection::Gcp(_) => Some(GcpConnectionReference {
879 connection_id: id,
880 connection: id,
881 }),
882 _ => sql_bail!("{} is not a GCP connection", item.name().item),
883 })
884}
885
886fn plan_kafka_security(
887 scx: &StatementContext,
888 v: &ConnectionOptionExtracted,
889) -> Result<
890 (
891 Option<KafkaTlsConfig>,
892 Option<KafkaSaslConfig<ReferencedConnection>>,
893 ),
894 PlanError,
895> {
896 const SASL_CONFIGS: [ConnectionOptionName; 4] = [
897 ConnectionOptionName::AwsConnection,
898 ConnectionOptionName::SaslMechanisms,
899 ConnectionOptionName::SaslUsername,
900 ConnectionOptionName::SaslPassword,
901 ];
902
903 const ALL_CONFIGS: [ConnectionOptionName; 7] = concat_arrays!(
904 [
905 ConnectionOptionName::SslKey,
906 ConnectionOptionName::SslCertificate,
907 ConnectionOptionName::SslCertificateAuthority,
908 ],
909 SASL_CONFIGS
910 );
911
912 enum SecurityProtocol {
913 Plaintext,
914 Ssl,
915 SaslPlaintext,
916 SaslSsl,
917 }
918
919 let security_protocol = v.security_protocol.as_ref().map(|v| v.to_uppercase());
920 let security_protocol = match security_protocol.as_deref() {
921 Some("PLAINTEXT") => SecurityProtocol::Plaintext,
922 Some("SSL") => SecurityProtocol::Ssl,
923 Some("SASL_PLAINTEXT") => SecurityProtocol::SaslPlaintext,
924 Some("SASL_SSL") => SecurityProtocol::SaslSsl,
925 Some(p) => sql_bail!("unknown security protocol: {}", p),
926 None if SASL_CONFIGS.iter().any(|c| v.seen.contains(c)) => SecurityProtocol::SaslSsl,
931 None => SecurityProtocol::Ssl,
932 };
933
934 let mut outstanding = ALL_CONFIGS
935 .into_iter()
936 .filter(|c| v.seen.contains(c))
937 .collect::<BTreeSet<ConnectionOptionName>>();
938
939 let tls = match security_protocol {
940 SecurityProtocol::Ssl | SecurityProtocol::SaslSsl => {
941 outstanding.remove(&ConnectionOptionName::SslCertificate);
942 let identity = match &v.ssl_certificate {
943 None => None,
944 Some(cert) => {
945 outstanding.remove(&ConnectionOptionName::SslKey);
946 let Some(key) = &v.ssl_key else {
947 sql_bail!("SSL KEY must be specified with SSL CERTIFICATE");
948 };
949 Some(TlsIdentity {
950 cert: cert.clone(),
951 key: (*key).into(),
952 })
953 }
954 };
955 outstanding.remove(&ConnectionOptionName::SslCertificateAuthority);
956 Some(KafkaTlsConfig {
957 identity,
958 root_cert: v.ssl_certificate_authority.clone(),
959 })
960 }
961 _ => None,
962 };
963
964 let sasl = match security_protocol {
965 SecurityProtocol::SaslPlaintext | SecurityProtocol::SaslSsl => {
966 outstanding.remove(&ConnectionOptionName::AwsConnection);
967 match get_aws_connection_reference(scx, v)? {
968 Some(aws) => Some(KafkaSaslConfig {
969 mechanism: "OAUTHBEARER".into(),
970 username: "".into(),
971 password: None,
972 aws: Some(aws),
973 }),
974 None => {
975 outstanding.remove(&ConnectionOptionName::SaslMechanisms);
976 outstanding.remove(&ConnectionOptionName::SaslUsername);
977 outstanding.remove(&ConnectionOptionName::SaslPassword);
978 let Some(mechanism) = &v.sasl_mechanisms else {
981 sql_bail!("SASL MECHANISMS must be specified");
982 };
983 let Some(username) = &v.sasl_username else {
984 sql_bail!("SASL USERNAME must be specified");
985 };
986 let Some(password) = &v.sasl_password else {
987 sql_bail!("SASL PASSWORD must be specified");
988 };
989 Some(KafkaSaslConfig {
990 mechanism: mechanism.to_uppercase(),
1001 username: username.clone(),
1002 password: Some((*password).into()),
1003 aws: None,
1004 })
1005 }
1006 }
1007 }
1008 _ => None,
1009 };
1010
1011 if let Some(outstanding) = outstanding.first() {
1012 sql_bail!("option {outstanding} not supported with this configuration");
1013 }
1014
1015 Ok((tls, sasl))
1016}
1017pub fn plan_default_privatelink(
1018 scx: &StatementContext,
1019 pl: &mz_sql_parser::ast::ConnectionDefaultAwsPrivatelink<Aug>,
1020) -> Result<AwsPrivatelink, PlanError> {
1021 let id = pl.connection.item_id().clone();
1022 let entry = scx.catalog.get_item(&id);
1023 match entry.connection()? {
1024 Connection::AwsPrivatelink(_) => Ok(AwsPrivatelink {
1025 connection_id: id,
1026 availability_zone: None,
1028 port: pl.port,
1030 }),
1031 _ => sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item),
1032 }
1033}
1034
1035pub fn plan_privatelink(
1036 scx: &StatementContext,
1037 pl: &mz_sql_parser::ast::KafkaBrokerAwsPrivatelink<Aug>,
1038) -> Result<AwsPrivatelink, PlanError> {
1039 let KafkaBrokerAwsPrivatelinkOptionExtracted {
1040 availability_zone,
1041 port,
1042 seen: _,
1043 } = KafkaBrokerAwsPrivatelinkOptionExtracted::try_from(pl.options.clone())?;
1044
1045 let id = match &pl.connection {
1046 ResolvedItemName::Item { id, .. } => id,
1047 _ => sql_bail!("internal error: Kafka PrivateLink connection was not resolved"),
1048 };
1049 let entry = scx.catalog.get_item(id);
1050 match entry.connection()? {
1051 Connection::AwsPrivatelink(connection) => {
1052 if let Some(az) = &availability_zone {
1053 if !connection.availability_zones.contains(az) {
1054 sql_bail!(
1055 "AWS PrivateLink availability zone {} does not match any of the \
1056 availability zones on the AWS PrivateLink connection {}",
1057 az.quoted(),
1058 scx.catalog
1059 .resolve_full_name(entry.name())
1060 .to_string()
1061 .quoted()
1062 )
1063 }
1064 }
1065 Ok(AwsPrivatelink {
1066 connection_id: *id,
1067 availability_zone,
1068 port,
1069 })
1070 }
1071 _ => {
1072 sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item)
1073 }
1074 }
1075}
1076
1077pub(crate) fn build_tunnel_definition(
1078 scx: &StatementContext,
1079 ssh_tunnel: Option<with_options::Object>,
1080 aws_privatelink: Option<ConnectionDefaultAwsPrivatelink<Aug>>,
1081 matching_rules: Option<Vec<KafkaMatchingBrokerRule<Aug>>>,
1082) -> Result<Tunnel<ReferencedConnection>, PlanError> {
1083 Ok(match (ssh_tunnel, aws_privatelink, matching_rules) {
1084 (None, None, None) => Tunnel::Direct,
1085 (Some(ssh_tunnel), None, None) => {
1086 let id = CatalogItemId::from(ssh_tunnel);
1087 let ssh_tunnel = scx.catalog.get_item(&id);
1088 match ssh_tunnel.connection()? {
1089 Connection::Ssh(_connection) => Tunnel::Ssh(SshTunnel {
1090 connection_id: id,
1091 connection: id,
1092 }),
1093 _ => sql_bail!("{} is not an SSH connection", ssh_tunnel.name().item),
1094 }
1095 }
1096 (None, Some(aws_privatelink), None) => {
1097 Tunnel::AwsPrivatelink(plan_default_privatelink(scx, &aws_privatelink)?)
1098 }
1099 (None, None, Some(rules)) => {
1100 if rules.is_empty() {
1101 sql_bail!("BROKERS MATCHING rules list cannot be empty");
1102 }
1103
1104 let rules = rules
1105 .iter()
1106 .map(|rule| {
1107 Ok(AwsPrivatelinkRule {
1108 pattern: rule.pattern.clone(),
1109 to: plan_privatelink(scx, &rule.tunnel)?,
1110 })
1111 })
1112 .collect::<Result<Vec<_>, PlanError>>()?;
1113 Tunnel::AwsPrivatelinks(mz_storage_types::connections::AwsPrivatelinks { rules })
1114 }
1115 _ => {
1116 sql_bail!("cannot specify both SSH TUNNEL and AWS PRIVATELINK");
1117 }
1118 })
1119}