mz_sql/plan/statement/ddl/
connection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data definition language (DDL) utilities for CONNECTION objects.
11
12use std::collections::{BTreeMap, BTreeSet};
13
14use anyhow::Context;
15use array_concat::concat_arrays;
16use itertools::Itertools;
17use maplit::btreemap;
18use mz_ore::num::NonNeg;
19use mz_ore::str::StrExt;
20use mz_repr::CatalogItemId;
21use mz_sql_parser::ast::ConnectionOptionName::*;
22use mz_sql_parser::ast::display::AstDisplay;
23use mz_sql_parser::ast::{
24    ConnectionDefaultAwsPrivatelink, ConnectionOption, ConnectionOptionName, CreateConnectionType,
25    KafkaBroker, KafkaBrokerAwsPrivatelinkOption, KafkaBrokerAwsPrivatelinkOptionName,
26    KafkaBrokerTunnel,
27};
28use mz_ssh_util::keys::SshKeyPair;
29use mz_storage_types::connections::aws::{
30    AwsAssumeRole, AwsAuth, AwsConnection, AwsConnectionReference, AwsCredentials,
31};
32use mz_storage_types::connections::inline::ReferencedConnection;
33use mz_storage_types::connections::string_or_secret::StringOrSecret;
34use mz_storage_types::connections::{
35    AwsPrivatelink, AwsPrivatelinkConnection, CsrConnection, CsrConnectionHttpAuth,
36    IcebergCatalogConnection, IcebergCatalogImpl, IcebergCatalogType, KafkaConnection,
37    KafkaSaslConfig, KafkaTlsConfig, KafkaTopicOptions, MySqlConnection, MySqlSslMode,
38    PostgresConnection, RestIcebergCatalog, S3TablesRestIcebergCatalog, SqlServerConnectionDetails,
39    SshConnection, SshTunnel, TlsIdentity, Tunnel,
40};
41
42use crate::names::Aug;
43use crate::plan::statement::{Connection, ResolvedItemName};
44use crate::plan::with_options::{self};
45use crate::plan::{ConnectionDetails, PlanError, SshKey, StatementContext};
46use crate::session::vars::{self, ENABLE_AWS_MSK_IAM_AUTH};
47
48generate_extracted_config!(
49    ConnectionOption,
50    (AccessKeyId, StringOrSecret),
51    (AssumeRoleArn, String),
52    (AssumeRoleSessionName, String),
53    (AvailabilityZones, Vec<String>),
54    (AwsConnection, with_options::Object),
55    (AwsPrivatelink, ConnectionDefaultAwsPrivatelink<Aug>),
56    // (AwsPrivatelink, with_options::Object),
57    (Broker, Vec<KafkaBroker<Aug>>),
58    (Brokers, Vec<KafkaBroker<Aug>>),
59    (Credential, StringOrSecret),
60    (Database, String),
61    (Endpoint, String),
62    (Host, String),
63    (Password, with_options::Secret),
64    (Port, u16),
65    (ProgressTopic, String),
66    (ProgressTopicReplicationFactor, i32),
67    (PublicKey1, String),
68    (PublicKey2, String),
69    (Region, String),
70    (SaslMechanisms, String),
71    (SaslPassword, with_options::Secret),
72    (SaslUsername, StringOrSecret),
73    (Scope, String),
74    (SecretAccessKey, with_options::Secret),
75    (SecurityProtocol, String),
76    (ServiceName, String),
77    (SshTunnel, with_options::Object),
78    (SslCertificate, StringOrSecret),
79    (SslCertificateAuthority, StringOrSecret),
80    (SslKey, with_options::Secret),
81    (SslMode, String),
82    (SessionToken, StringOrSecret),
83    (CatalogType, IcebergCatalogType),
84    (Url, String),
85    (User, StringOrSecret),
86    (Warehouse, String)
87);
88
89generate_extracted_config!(
90    KafkaBrokerAwsPrivatelinkOption,
91    (AvailabilityZone, String),
92    (Port, u16)
93);
94
95/// Options which cannot be changed using ALTER CONNECTION.
96pub(crate) const INALTERABLE_OPTIONS: &[ConnectionOptionName] =
97    &[ProgressTopic, ProgressTopicReplicationFactor];
98
99/// Options of which only one may be specified.
100pub(crate) const MUTUALLY_EXCLUSIVE_SETS: &[&[ConnectionOptionName]] = &[&[Broker, Brokers]];
101
102pub(super) fn validate_options_per_connection_type(
103    t: CreateConnectionType,
104    mut options: BTreeSet<ConnectionOptionName>,
105) -> Result<(), PlanError> {
106    use mz_sql_parser::ast::ConnectionOptionName::*;
107    let permitted_options = match t {
108        CreateConnectionType::Aws => [
109            AccessKeyId,
110            SecretAccessKey,
111            SessionToken,
112            Endpoint,
113            Region,
114            AssumeRoleArn,
115            AssumeRoleSessionName,
116        ]
117        .as_slice(),
118        CreateConnectionType::AwsPrivatelink => &[AvailabilityZones, Port, ServiceName],
119        CreateConnectionType::Csr => &[
120            AwsPrivatelink,
121            Password,
122            Port,
123            SshTunnel,
124            SslCertificate,
125            SslCertificateAuthority,
126            SslKey,
127            Url,
128            User,
129        ],
130        CreateConnectionType::Kafka => &[
131            AwsConnection,
132            Broker,
133            Brokers,
134            ProgressTopic,
135            ProgressTopicReplicationFactor,
136            AwsPrivatelink,
137            SshTunnel,
138            SslKey,
139            SslCertificate,
140            SslCertificateAuthority,
141            SaslMechanisms,
142            SaslUsername,
143            SaslPassword,
144            SecurityProtocol,
145        ],
146        CreateConnectionType::Postgres => &[
147            AwsPrivatelink,
148            Database,
149            Host,
150            Password,
151            Port,
152            SshTunnel,
153            SslCertificate,
154            SslCertificateAuthority,
155            SslKey,
156            SslMode,
157            User,
158        ],
159        CreateConnectionType::Ssh => &[Host, Port, User, PublicKey1, PublicKey2],
160        CreateConnectionType::MySql => &[
161            AwsPrivatelink,
162            Host,
163            Password,
164            Port,
165            SshTunnel,
166            SslCertificate,
167            SslCertificateAuthority,
168            SslKey,
169            SslMode,
170            User,
171            AwsConnection,
172        ],
173        CreateConnectionType::SqlServer => &[
174            AwsPrivatelink,
175            Database,
176            Host,
177            Password,
178            Port,
179            SshTunnel,
180            SslCertificate,
181            SslCertificateAuthority,
182            SslKey,
183            SslMode,
184            User,
185        ],
186        CreateConnectionType::IcebergCatalog => &[
187            AwsConnection,
188            CatalogType,
189            Credential,
190            Scope,
191            Url,
192            Warehouse,
193        ],
194    };
195
196    for o in permitted_options {
197        options.remove(o);
198    }
199
200    if !options.is_empty() {
201        sql_bail!(
202            "{} connections do not support {} values",
203            t,
204            options.iter().join(", ")
205        )
206    }
207
208    Ok(())
209}
210
211impl ConnectionOptionExtracted {
212    pub(super) fn ensure_only_valid_options(
213        &self,
214        t: CreateConnectionType,
215    ) -> Result<(), PlanError> {
216        validate_options_per_connection_type(t, self.seen.clone())
217    }
218
219    pub fn try_into_connection_details(
220        self,
221        scx: &StatementContext,
222        connection_type: CreateConnectionType,
223    ) -> Result<ConnectionDetails, PlanError> {
224        self.ensure_only_valid_options(connection_type)?;
225
226        let connection: ConnectionDetails = match connection_type {
227            CreateConnectionType::Aws => {
228                let credentials = match (
229                    self.access_key_id,
230                    self.secret_access_key,
231                    self.session_token,
232                ) {
233                    (Some(access_key_id), Some(secret_access_key), session_token) => {
234                        Some(AwsCredentials {
235                            access_key_id,
236                            secret_access_key: secret_access_key.into(),
237                            session_token,
238                        })
239                    }
240                    (None, None, None) => None,
241                    _ => {
242                        sql_bail!(
243                            "must specify both ACCESS KEY ID and SECRET ACCESS KEY with optional SESSION TOKEN"
244                        );
245                    }
246                };
247
248                let assume_role = match (self.assume_role_arn, self.assume_role_session_name) {
249                    (Some(arn), session_name) => Some(AwsAssumeRole { arn, session_name }),
250                    (None, Some(_)) => {
251                        sql_bail!(
252                            "must specify ASSUME ROLE ARN with optional ASSUME ROLE SESSION NAME"
253                        );
254                    }
255                    _ => None,
256                };
257
258                let auth = match (credentials, assume_role) {
259                    (None, None) => sql_bail!(
260                        "must specify either ASSUME ROLE ARN or ACCESS KEY ID and SECRET ACCESS KEY"
261                    ),
262                    (Some(credentials), None) => AwsAuth::Credentials(credentials),
263                    (None, Some(assume_role)) => AwsAuth::AssumeRole(assume_role),
264                    (Some(_), Some(_)) => {
265                        sql_bail!("cannot specify both ACCESS KEY ID and ASSUME ROLE ARN");
266                    }
267                };
268
269                ConnectionDetails::Aws(AwsConnection {
270                    auth,
271                    endpoint: match self.endpoint {
272                        // TODO(benesch): this should not treat an empty endpoint as equivalent to a `NULL`
273                        // endpoint, but making that change now would break testdrive. AWS connections are
274                        // all behind feature flags mode right now, so no particular urgency to correct
275                        // this.
276                        Some(endpoint) if !endpoint.is_empty() => Some(endpoint),
277                        _ => None,
278                    },
279                    region: self.region,
280                })
281            }
282            CreateConnectionType::AwsPrivatelink => {
283                let connection = AwsPrivatelinkConnection {
284                    service_name: self
285                        .service_name
286                        .ok_or_else(|| sql_err!("SERVICE NAME option is required"))?,
287                    availability_zones: self
288                        .availability_zones
289                        .ok_or_else(|| sql_err!("AVAILABILITY ZONES option is required"))?,
290                };
291                if let Some(supported_azs) = scx.catalog.aws_privatelink_availability_zones() {
292                    let mut unique_azs: BTreeSet<String> = BTreeSet::new();
293                    let mut duplicate_azs: BTreeSet<String> = BTreeSet::new();
294                    // Validate each AZ is supported
295                    for connection_az in &connection.availability_zones {
296                        if unique_azs.contains(connection_az) {
297                            duplicate_azs.insert(connection_az.to_string());
298                        } else {
299                            unique_azs.insert(connection_az.to_string());
300                        }
301                        if !supported_azs.contains(connection_az) {
302                            return Err(PlanError::InvalidPrivatelinkAvailabilityZone {
303                                name: connection_az.to_string(),
304                                supported_azs,
305                            });
306                        }
307                    }
308                    if duplicate_azs.len() > 0 {
309                        return Err(PlanError::DuplicatePrivatelinkAvailabilityZone {
310                            duplicate_azs,
311                        });
312                    }
313                }
314                ConnectionDetails::AwsPrivatelink(connection)
315            }
316            CreateConnectionType::Kafka => {
317                let (tls, sasl) = plan_kafka_security(scx, &self)?;
318
319                ConnectionDetails::Kafka(KafkaConnection {
320                    brokers: self.get_brokers(scx)?,
321                    default_tunnel: scx
322                        .build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?,
323                    progress_topic: self.progress_topic,
324                    progress_topic_options: KafkaTopicOptions {
325                        // We only allow configuring the progress topic replication factor for now.
326                        // For correctness, the partition count MUST be one and for performance the compaction
327                        // policy MUST be enabled.
328                        partition_count: Some(NonNeg::try_from(1).expect("1 is positive")),
329                        replication_factor: self.progress_topic_replication_factor.map(|val| {
330                            if val <= 0 {
331                                Err(sql_err!("invalid CONNECTION: PROGRESS TOPIC REPLICATION FACTOR must be greater than 0"))?
332                            }
333                            NonNeg::try_from(val).map_err(|e| sql_err!("{e}"))
334                        }).transpose()?,
335                        topic_config: btreemap! {
336                            "cleanup.policy".to_string() => "compact".to_string(),
337                        },
338                    },
339                    options: BTreeMap::new(),
340                    tls,
341                    sasl,
342                })
343            }
344            CreateConnectionType::Csr => {
345                let url: reqwest::Url = match self.url {
346                    Some(url) => url
347                        .parse()
348                        .map_err(|e| sql_err!("parsing schema registry url: {e}"))?,
349                    None => sql_bail!("invalid CONNECTION: must specify URL"),
350                };
351                let _ = url
352                    .host_str()
353                    .ok_or_else(|| sql_err!("invalid CONNECTION: URL must specify domain name"))?;
354                if url.path() != "/" {
355                    sql_bail!("invalid CONNECTION: URL must have an empty path");
356                }
357                let cert = self.ssl_certificate;
358                let key = self.ssl_key.map(|secret| secret.into());
359                let tls_identity = match (cert, key) {
360                    (None, None) => None,
361                    (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
362                    _ => sql_bail!(
363                        "invalid CONNECTION: reading from SSL-auth Confluent Schema Registry requires both SSL KEY and SSL CERTIFICATE"
364                    ),
365                };
366                let http_auth = self.user.map(|username| CsrConnectionHttpAuth {
367                    username,
368                    password: self.password.map(|secret| secret.into()),
369                });
370
371                // TODO we should move to self.port being unsupported if aws_privatelink is some, see <https://github.com/MaterializeInc/database-issues/issues/7359#issuecomment-1925443977>
372                if let Some(privatelink) = self.aws_privatelink.as_ref() {
373                    if privatelink.port.is_some() {
374                        sql_bail!(
375                            "invalid CONNECTION: CONFLUENT SCHEMA REGISTRY does not support PORT for AWS PRIVATELINK"
376                        )
377                    }
378                }
379                let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?;
380
381                ConnectionDetails::Csr(CsrConnection {
382                    url,
383                    tls_root_cert: self.ssl_certificate_authority,
384                    tls_identity,
385                    http_auth,
386                    tunnel,
387                })
388            }
389            CreateConnectionType::Postgres => {
390                let cert = self.ssl_certificate;
391                let key = self.ssl_key.map(|secret| secret.into());
392                let tls_identity = match (cert, key) {
393                    (None, None) => None,
394                    (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
395                    _ => sql_bail!(
396                        "invalid CONNECTION: both SSL KEY and SSL CERTIFICATE are required"
397                    ),
398                };
399                let tls_mode = match self.ssl_mode.as_ref().map(|m| m.as_str()) {
400                    None | Some("disable") => tokio_postgres::config::SslMode::Disable,
401                    // "prefer" intentionally omitted because it has dubious security
402                    // properties.
403                    Some("require") | Some("required") => tokio_postgres::config::SslMode::Require,
404                    Some("verify_ca") | Some("verify-ca") => {
405                        tokio_postgres::config::SslMode::VerifyCa
406                    }
407                    Some("verify_full") | Some("verify-full") => {
408                        tokio_postgres::config::SslMode::VerifyFull
409                    }
410                    Some(m) => sql_bail!("invalid CONNECTION: unknown SSL MODE {}", m.quoted()),
411                };
412
413                // TODO we should move to self.port being unsupported if aws_privatelink is some, see <https://github.com/MaterializeInc/database-issues/issues/7359#issuecomment-1925443977>
414                if let Some(privatelink) = self.aws_privatelink.as_ref() {
415                    if privatelink.port.is_some() {
416                        sql_bail!(
417                            "invalid CONNECTION: POSTGRES does not support PORT for AWS PRIVATELINK"
418                        )
419                    }
420                }
421                let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?;
422
423                ConnectionDetails::Postgres(PostgresConnection {
424                    database: self
425                        .database
426                        .ok_or_else(|| sql_err!("DATABASE option is required"))?,
427                    password: self.password.map(|password| password.into()),
428                    host: self
429                        .host
430                        .ok_or_else(|| sql_err!("HOST option is required"))?,
431                    port: self.port.unwrap_or(5432_u16),
432                    tunnel,
433                    tls_mode,
434                    tls_root_cert: self.ssl_certificate_authority,
435                    tls_identity,
436                    user: self
437                        .user
438                        .ok_or_else(|| sql_err!("USER option is required"))?,
439                })
440            }
441            CreateConnectionType::Ssh => {
442                let ensure_key = |public_key| match public_key {
443                    Some(public_key) => Ok::<_, anyhow::Error>(SshKey::PublicOnly(public_key)),
444                    None => {
445                        let key = SshKeyPair::new().context("creating SSH key")?;
446                        Ok(SshKey::Both(key))
447                    }
448                };
449                ConnectionDetails::Ssh {
450                    connection: SshConnection {
451                        host: self
452                            .host
453                            .ok_or_else(|| sql_err!("HOST option is required"))?,
454                        port: self.port.unwrap_or(22_u16),
455                        user: match self
456                            .user
457                            .ok_or_else(|| sql_err!("USER option is required"))?
458                        {
459                            StringOrSecret::String(user) => user,
460                            StringOrSecret::Secret(_) => {
461                                sql_bail!(
462                                    "SSH connections do not support supplying USER value as SECRET"
463                                )
464                            }
465                        },
466                    },
467                    key_1: ensure_key(self.public_key1)?,
468                    key_2: ensure_key(self.public_key2)?,
469                }
470            }
471            CreateConnectionType::MySql => {
472                let aws_connection = get_aws_connection_reference(scx, &self)?;
473                if aws_connection.is_some() && self.password.is_some() {
474                    sql_bail!(
475                        "invalid CONNECTION: AWS IAM authentication is not supported with password"
476                    );
477                }
478
479                let cert = self.ssl_certificate;
480                let key = self.ssl_key.map(|secret| secret.into());
481                let tls_identity = match (cert, key) {
482                    (None, None) => None,
483                    (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
484                    _ => sql_bail!(
485                        "invalid CONNECTION: both SSL KEY and SSL CERTIFICATE are required"
486                    ),
487                };
488                // Accepts the same SSL Mode values as the MySQL Client
489                // https://dev.mysql.com/doc/refman/8.0/en/connection-options.html#option_general_ssl-mode
490                let tls_mode = match self
491                    .ssl_mode
492                    .map(|f| f.to_uppercase())
493                    .as_ref()
494                    .map(|m| m.as_str())
495                {
496                    None | Some("DISABLED") => {
497                        if aws_connection.is_some() {
498                            sql_bail!(
499                                "invalid CONNECTION: AWS IAM authentication requires SSL to be enabled"
500                            )
501                        }
502                        MySqlSslMode::Disabled
503                    }
504                    // "preferred" intentionally omitted because it has dubious security
505                    // properties.
506                    Some("REQUIRED") | Some("REQUIRE") => MySqlSslMode::Required,
507                    Some("VERIFY_CA") | Some("VERIFY-CA") => MySqlSslMode::VerifyCa,
508                    Some("VERIFY_IDENTITY") | Some("VERIFY-IDENTITY") => {
509                        MySqlSslMode::VerifyIdentity
510                    }
511                    Some(m) => sql_bail!("invalid CONNECTION: unknown SSL MODE {}", m.quoted()),
512                };
513
514                // TODO we should move to self.port being unsupported if aws_privatelink is some, see <https://github.com/MaterializeInc/database-issues/issues/7359#issuecomment-1925443977>
515                if let Some(privatelink) = self.aws_privatelink.as_ref() {
516                    if privatelink.port.is_some() {
517                        sql_bail!(
518                            "invalid CONNECTION: MYSQL does not support PORT for AWS PRIVATELINK"
519                        )
520                    }
521                }
522                let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?;
523
524                ConnectionDetails::MySql(MySqlConnection {
525                    password: self.password.map(|password| password.into()),
526                    host: self
527                        .host
528                        .ok_or_else(|| sql_err!("HOST option is required"))?,
529                    port: self.port.unwrap_or(3306_u16),
530                    tunnel,
531                    tls_mode,
532                    tls_root_cert: self.ssl_certificate_authority,
533                    tls_identity,
534                    user: self
535                        .user
536                        .ok_or_else(|| sql_err!("USER option is required"))?,
537                    aws_connection,
538                })
539            }
540            CreateConnectionType::SqlServer => {
541                scx.require_feature_flag(&vars::ENABLE_SQL_SERVER_SOURCE)?;
542
543                let aws_connection = get_aws_connection_reference(scx, &self)?;
544                if aws_connection.is_some() && self.password.is_some() {
545                    sql_bail!(
546                        "invalid CONNECTION: AWS IAM authentication is not supported with password"
547                    );
548                }
549
550                let (encryption, certificate_validation_policy) = match self
551                    .ssl_mode
552                    .map(|mode| mode.to_uppercase())
553                    .as_ref()
554                    .map(|mode| mode.as_str())
555                {
556                    None | Some("DISABLED") => (
557                        mz_sql_server_util::config::EncryptionLevel::None,
558                        mz_sql_server_util::config::CertificateValidationPolicy::TrustAll,
559                    ),
560                    Some("REQUIRED") => (
561                        mz_sql_server_util::config::EncryptionLevel::Required,
562                        mz_sql_server_util::config::CertificateValidationPolicy::TrustAll,
563                    ),
564                    Some("VERIFY") => (
565                        mz_sql_server_util::config::EncryptionLevel::Required,
566                        mz_sql_server_util::config::CertificateValidationPolicy::VerifySystem,
567                    ),
568                    Some("VERIFY_CA") => {
569                        if self.ssl_certificate_authority.is_none() {
570                            sql_bail!(
571                                "invalid CONNECTION: SSL MODE 'verify_ca' requires SSL CERTIFICATE AUTHORITY"
572                            );
573                        }
574                        (
575                            mz_sql_server_util::config::EncryptionLevel::Required,
576                            mz_sql_server_util::config::CertificateValidationPolicy::VerifyCA,
577                        )
578                    }
579                    Some(mode) => {
580                        sql_bail!("invalid CONNECTION: unknown SSL MODE {}", mode.quoted())
581                    }
582                };
583
584                if let Some(privatelink) = self.aws_privatelink.as_ref() {
585                    if privatelink.port.is_some() {
586                        sql_bail!(
587                            "invalid CONNECTION: SQL SERVER does not support PORT for AWS PRIVATELINK"
588                        )
589                    }
590                }
591
592                // 1433 is the default port for SQL Server instances running over TCP.
593                //
594                // See: <https://learn.microsoft.com/en-us/sql/database-engine/configure-windows/configure-a-server-to-listen-on-a-specific-tcp-port?view=sql-server-ver16>
595                let port = self.port.unwrap_or(1433_u16);
596                let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?;
597
598                ConnectionDetails::SqlServer(SqlServerConnectionDetails {
599                    host: self
600                        .host
601                        .ok_or_else(|| sql_err!("HOST option is required"))?,
602                    port,
603                    database: self
604                        .database
605                        .ok_or_else(|| sql_err!("DATABASE option is required"))?,
606                    user: self
607                        .user
608                        .ok_or_else(|| sql_err!("USER option is required"))?,
609                    password: self
610                        .password
611                        .ok_or_else(|| sql_err!("PASSWORD option is required"))
612                        .map(|pass| pass.into())?,
613                    tunnel,
614                    encryption,
615                    certificate_validation_policy,
616                    tls_root_cert: self.ssl_certificate_authority,
617                })
618            }
619            CreateConnectionType::IcebergCatalog => {
620                let catalog_type = self.catalog_type.clone().ok_or_else(|| {
621                    sql_err!("invalid CONNECTION: ICEBERG connections must specify CATALOG TYPE")
622                })?;
623
624                let uri: reqwest::Url = match &self.url {
625                    Some(url) => url
626                        .parse()
627                        .map_err(|e| sql_err!("parsing Iceberg catalog url: {e}"))?,
628                    None => sql_bail!("invalid CONNECTION: must specify URL"),
629                };
630
631                let warehouse = self.warehouse.clone();
632                let credential = self.credential.clone();
633                let aws_connection = get_aws_connection_reference(scx, &self)?;
634
635                let catalog = match catalog_type {
636                    IcebergCatalogType::S3TablesRest => {
637                        let Some(warehouse) = warehouse else {
638                            sql_bail!(
639                                "invalid CONNECTION: ICEBERG s3tablesrest connections must specify WAREHOUSE"
640                            );
641                        };
642                        let Some(aws_connection) = aws_connection else {
643                            sql_bail!(
644                                "invalid CONNECTION: ICEBERG s3tablesrest connections require an AWS connection"
645                            );
646                        };
647
648                        IcebergCatalogImpl::S3TablesRest(S3TablesRestIcebergCatalog {
649                            aws_connection,
650                            warehouse,
651                        })
652                    }
653                    IcebergCatalogType::Rest => {
654                        let Some(credential) = credential else {
655                            sql_bail!(
656                                "invalid CONNECTION: ICEBERG rest connections require a CREDENTIAL"
657                            );
658                        };
659
660                        IcebergCatalogImpl::Rest(RestIcebergCatalog {
661                            credential,
662                            scope: self.scope.clone(),
663                            warehouse,
664                        })
665                    }
666                };
667
668                ConnectionDetails::IcebergCatalog(IcebergCatalogConnection { catalog, uri })
669            }
670        };
671
672        Ok(connection)
673    }
674
675    pub fn get_brokers(
676        &self,
677        scx: &StatementContext,
678    ) -> Result<Vec<mz_storage_types::connections::KafkaBroker<ReferencedConnection>>, PlanError>
679    {
680        let mut brokers = match (&self.broker, &self.brokers, &self.aws_privatelink) {
681            (Some(v), None, None) => v.to_vec(),
682            (None, Some(v), None) => v.to_vec(),
683            (None, None, Some(_)) => vec![],
684            (None, None, None) => {
685                sql_bail!("invalid CONNECTION: must set one of BROKER, BROKERS, or AWS PRIVATELINK")
686            }
687            _ => sql_bail!(
688                "invalid CONNECTION: can only set one of BROKER, BROKERS, or AWS PRIVATELINK"
689            ),
690        };
691
692        // NOTE: we allow broker configurations to be mixed and matched. If/when we support
693        // a top-level `SSH TUNNEL` configuration, we will need additional assertions.
694
695        let mut out = vec![];
696        for broker in &mut brokers {
697            if broker.address.contains(',') {
698                sql_bail!("invalid CONNECTION: cannot specify multiple Kafka broker addresses in one string.\n\n
699Instead, specify BROKERS using multiple strings, e.g. BROKERS ('kafka:9092', 'kafka:9093')");
700            }
701
702            let tunnel = match &broker.tunnel {
703                KafkaBrokerTunnel::Direct => Tunnel::Direct,
704                KafkaBrokerTunnel::AwsPrivatelink(aws_privatelink) => {
705                    let KafkaBrokerAwsPrivatelinkOptionExtracted {
706                        availability_zone,
707                        port,
708                        seen: _,
709                    } = KafkaBrokerAwsPrivatelinkOptionExtracted::try_from(
710                        aws_privatelink.options.clone(),
711                    )?;
712
713                    let id = match &aws_privatelink.connection {
714                        ResolvedItemName::Item { id, .. } => id,
715                        _ => sql_bail!(
716                            "internal error: Kafka PrivateLink connection was not resolved"
717                        ),
718                    };
719                    let entry = scx.catalog.get_item(id);
720                    match entry.connection()? {
721                        Connection::AwsPrivatelink(connection) => {
722                            if let Some(az) = &availability_zone {
723                                if !connection.availability_zones.contains(az) {
724                                    sql_bail!(
725                                        "AWS PrivateLink availability zone {} does not match any of the \
726                                      availability zones on the AWS PrivateLink connection {}",
727                                        az.quoted(),
728                                        scx.catalog
729                                            .resolve_full_name(entry.name())
730                                            .to_string()
731                                            .quoted()
732                                    )
733                                }
734                            }
735                            Tunnel::AwsPrivatelink(AwsPrivatelink {
736                                connection_id: *id,
737                                availability_zone,
738                                port,
739                            })
740                        }
741                        _ => {
742                            sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item)
743                        }
744                    }
745                }
746                KafkaBrokerTunnel::SshTunnel(ssh) => {
747                    let id = match &ssh {
748                        ResolvedItemName::Item { id, .. } => id,
749                        _ => sql_bail!(
750                            "internal error: Kafka SSH tunnel connection was not resolved"
751                        ),
752                    };
753                    let ssh_tunnel = scx.catalog.get_item(id);
754                    match ssh_tunnel.connection()? {
755                        Connection::Ssh(_connection) => Tunnel::Ssh(SshTunnel {
756                            connection_id: *id,
757                            connection: *id,
758                        }),
759                        _ => {
760                            sql_bail!("{} is not an SSH connection", ssh_tunnel.name().item)
761                        }
762                    }
763                }
764            };
765
766            out.push(mz_storage_types::connections::KafkaBroker {
767                address: broker.address.clone(),
768                tunnel,
769            });
770        }
771
772        Ok(out)
773    }
774}
775
776fn get_aws_connection_reference(
777    scx: &StatementContext,
778    conn_options: &ConnectionOptionExtracted,
779) -> Result<Option<AwsConnectionReference<ReferencedConnection>>, PlanError> {
780    let Some(aws_connection_id) = conn_options.aws_connection else {
781        return Ok(None);
782    };
783
784    let id = CatalogItemId::from(aws_connection_id);
785    let item = scx.catalog.get_item(&id);
786    Ok(match item.connection()? {
787        Connection::Aws(_) => Some(AwsConnectionReference {
788            connection_id: id,
789            connection: id,
790        }),
791        _ => sql_bail!("{} is not an AWS connection", item.name().item),
792    })
793}
794
795fn plan_kafka_security(
796    scx: &StatementContext,
797    v: &ConnectionOptionExtracted,
798) -> Result<
799    (
800        Option<KafkaTlsConfig>,
801        Option<KafkaSaslConfig<ReferencedConnection>>,
802    ),
803    PlanError,
804> {
805    const SASL_CONFIGS: [ConnectionOptionName; 4] = [
806        ConnectionOptionName::AwsConnection,
807        ConnectionOptionName::SaslMechanisms,
808        ConnectionOptionName::SaslUsername,
809        ConnectionOptionName::SaslPassword,
810    ];
811
812    const ALL_CONFIGS: [ConnectionOptionName; 7] = concat_arrays!(
813        [
814            ConnectionOptionName::SslKey,
815            ConnectionOptionName::SslCertificate,
816            ConnectionOptionName::SslCertificateAuthority,
817        ],
818        SASL_CONFIGS
819    );
820
821    enum SecurityProtocol {
822        Plaintext,
823        Ssl,
824        SaslPlaintext,
825        SaslSsl,
826    }
827
828    let security_protocol = v.security_protocol.as_ref().map(|v| v.to_uppercase());
829    let security_protocol = match security_protocol.as_deref() {
830        Some("PLAINTEXT") => SecurityProtocol::Plaintext,
831        Some("SSL") => SecurityProtocol::Ssl,
832        Some("SASL_PLAINTEXT") => SecurityProtocol::SaslPlaintext,
833        Some("SASL_SSL") => SecurityProtocol::SaslSsl,
834        Some(p) => sql_bail!("unknown security protocol: {}", p),
835        // To be secure by default, if no security protocol is explicitly
836        // specified, we always choose one of the SSL-enabled protocols, using
837        // the presence of any SASL options to guide us between them. Users must
838        // explicitly choose a plaintext mechanism if that's what they want.
839        None if SASL_CONFIGS.iter().any(|c| v.seen.contains(c)) => SecurityProtocol::SaslSsl,
840        None => SecurityProtocol::Ssl,
841    };
842
843    let mut outstanding = ALL_CONFIGS
844        .into_iter()
845        .filter(|c| v.seen.contains(c))
846        .collect::<BTreeSet<ConnectionOptionName>>();
847
848    let tls = match security_protocol {
849        SecurityProtocol::Ssl | SecurityProtocol::SaslSsl => {
850            outstanding.remove(&ConnectionOptionName::SslCertificate);
851            let identity = match &v.ssl_certificate {
852                None => None,
853                Some(cert) => {
854                    outstanding.remove(&ConnectionOptionName::SslKey);
855                    let Some(key) = &v.ssl_key else {
856                        sql_bail!("SSL KEY must be specified with SSL CERTIFICATE");
857                    };
858                    Some(TlsIdentity {
859                        cert: cert.clone(),
860                        key: (*key).into(),
861                    })
862                }
863            };
864            outstanding.remove(&ConnectionOptionName::SslCertificateAuthority);
865            Some(KafkaTlsConfig {
866                identity,
867                root_cert: v.ssl_certificate_authority.clone(),
868            })
869        }
870        _ => None,
871    };
872
873    let sasl = match security_protocol {
874        SecurityProtocol::SaslPlaintext | SecurityProtocol::SaslSsl => {
875            outstanding.remove(&ConnectionOptionName::AwsConnection);
876            match get_aws_connection_reference(scx, v)? {
877                Some(aws) => {
878                    scx.require_feature_flag(&ENABLE_AWS_MSK_IAM_AUTH)?;
879                    Some(KafkaSaslConfig {
880                        mechanism: "OAUTHBEARER".into(),
881                        username: "".into(),
882                        password: None,
883                        aws: Some(aws),
884                    })
885                }
886                None => {
887                    outstanding.remove(&ConnectionOptionName::SaslMechanisms);
888                    outstanding.remove(&ConnectionOptionName::SaslUsername);
889                    outstanding.remove(&ConnectionOptionName::SaslPassword);
890                    // TODO(benesch): support a less confusing `SASL MECHANISM`
891                    // alias, as only a single mechanism that can be specified.
892                    let Some(mechanism) = &v.sasl_mechanisms else {
893                        sql_bail!("SASL MECHANISMS must be specified");
894                    };
895                    let Some(username) = &v.sasl_username else {
896                        sql_bail!("SASL USERNAME must be specified");
897                    };
898                    let Some(password) = &v.sasl_password else {
899                        sql_bail!("SASL PASSWORD must be specified");
900                    };
901                    Some(KafkaSaslConfig {
902                        // librdkafka requires SASL mechanisms to be upper case (PLAIN,
903                        // SCRAM-SHA-256). For usability, we automatically uppercase the
904                        // mechanism that user provides. This avoids a frustrating
905                        // interaction with identifier case folding. Consider `SASL
906                        // MECHANISMS = PLAIN`. Identifier case folding results in a
907                        // SASL mechanism of `plain` (note the lowercase), which
908                        // Materialize previously rejected with an error of "SASL
909                        // mechanism must be uppercase." This was deeply frustarting for
910                        // users who were not familiar with identifier case folding
911                        // rules. See database-issues#6693.
912                        mechanism: mechanism.to_uppercase(),
913                        username: username.clone(),
914                        password: Some((*password).into()),
915                        aws: None,
916                    })
917                }
918            }
919        }
920        _ => None,
921    };
922
923    if let Some(outstanding) = outstanding.first() {
924        sql_bail!("option {outstanding} not supported with this configuration");
925    }
926
927    Ok((tls, sasl))
928}