mz_sql/plan/statement/ddl/
connection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data definition language (DDL) utilities for CONNECTION objects.
11
12use std::collections::{BTreeMap, BTreeSet};
13
14use anyhow::Context;
15use array_concat::concat_arrays;
16use itertools::Itertools;
17use maplit::btreemap;
18use mz_ore::num::NonNeg;
19use mz_ore::str::StrExt;
20use mz_repr::CatalogItemId;
21use mz_sql_parser::ast::ConnectionOptionName::*;
22use mz_sql_parser::ast::display::AstDisplay;
23use mz_sql_parser::ast::{
24    ConnectionDefaultAwsPrivatelink, ConnectionOption, ConnectionOptionName, CreateConnectionType,
25    KafkaBroker, KafkaBrokerAwsPrivatelinkOption, KafkaBrokerAwsPrivatelinkOptionName,
26    KafkaBrokerTunnel,
27};
28use mz_ssh_util::keys::SshKeyPair;
29use mz_storage_types::connections::aws::{
30    AwsAssumeRole, AwsAuth, AwsConnection, AwsConnectionReference, AwsCredentials,
31};
32use mz_storage_types::connections::inline::ReferencedConnection;
33use mz_storage_types::connections::string_or_secret::StringOrSecret;
34use mz_storage_types::connections::{
35    AwsPrivatelink, AwsPrivatelinkConnection, CsrConnection, CsrConnectionHttpAuth,
36    IcebergCatalogConnection, IcebergCatalogImpl, IcebergCatalogType, KafkaConnection,
37    KafkaSaslConfig, KafkaTlsConfig, KafkaTopicOptions, MySqlConnection, MySqlSslMode,
38    PostgresConnection, RestIcebergCatalog, S3TablesRestIcebergCatalog, SqlServerConnectionDetails,
39    SshConnection, SshTunnel, TlsIdentity, Tunnel,
40};
41
42use crate::names::Aug;
43use crate::plan::statement::{Connection, ResolvedItemName};
44use crate::plan::with_options::{self};
45use crate::plan::{ConnectionDetails, PlanError, SshKey, StatementContext};
46use crate::session::vars::{self, ENABLE_AWS_MSK_IAM_AUTH};
47
48generate_extracted_config!(
49    ConnectionOption,
50    (AccessKeyId, StringOrSecret),
51    (AssumeRoleArn, String),
52    (AssumeRoleSessionName, String),
53    (AvailabilityZones, Vec<String>),
54    (AwsConnection, with_options::Object),
55    (AwsPrivatelink, ConnectionDefaultAwsPrivatelink<Aug>),
56    // (AwsPrivatelink, with_options::Object),
57    (Broker, Vec<KafkaBroker<Aug>>),
58    (Brokers, Vec<KafkaBroker<Aug>>),
59    (Credential, StringOrSecret),
60    (Database, String),
61    (Endpoint, String),
62    (Host, String),
63    (Password, with_options::Secret),
64    (Port, u16),
65    (ProgressTopic, String),
66    (ProgressTopicReplicationFactor, i32),
67    (PublicKey1, String),
68    (PublicKey2, String),
69    (Region, String),
70    (SaslMechanisms, String),
71    (SaslPassword, with_options::Secret),
72    (SaslUsername, StringOrSecret),
73    (Scope, String),
74    (SecretAccessKey, with_options::Secret),
75    (SecurityProtocol, String),
76    (ServiceName, String),
77    (SshTunnel, with_options::Object),
78    (SslCertificate, StringOrSecret),
79    (SslCertificateAuthority, StringOrSecret),
80    (SslKey, with_options::Secret),
81    (SslMode, String),
82    (SessionToken, StringOrSecret),
83    (CatalogType, IcebergCatalogType),
84    (Url, String),
85    (User, StringOrSecret),
86    (Warehouse, String)
87);
88
89generate_extracted_config!(
90    KafkaBrokerAwsPrivatelinkOption,
91    (AvailabilityZone, String),
92    (Port, u16)
93);
94
95/// Options which cannot be changed using ALTER CONNECTION.
96pub(crate) const INALTERABLE_OPTIONS: &[ConnectionOptionName] =
97    &[ProgressTopic, ProgressTopicReplicationFactor];
98
99/// Options of which only one may be specified.
100pub(crate) const MUTUALLY_EXCLUSIVE_SETS: &[&[ConnectionOptionName]] = &[&[Broker, Brokers]];
101
102pub(super) fn validate_options_per_connection_type(
103    t: CreateConnectionType,
104    mut options: BTreeSet<ConnectionOptionName>,
105) -> Result<(), PlanError> {
106    use mz_sql_parser::ast::ConnectionOptionName::*;
107    let permitted_options = match t {
108        CreateConnectionType::Aws => [
109            AccessKeyId,
110            SecretAccessKey,
111            SessionToken,
112            Endpoint,
113            Region,
114            AssumeRoleArn,
115            AssumeRoleSessionName,
116        ]
117        .as_slice(),
118        CreateConnectionType::AwsPrivatelink => &[AvailabilityZones, Port, ServiceName],
119        CreateConnectionType::Csr => &[
120            AwsPrivatelink,
121            Password,
122            Port,
123            SshTunnel,
124            SslCertificate,
125            SslCertificateAuthority,
126            SslKey,
127            Url,
128            User,
129        ],
130        CreateConnectionType::Kafka => &[
131            AwsConnection,
132            Broker,
133            Brokers,
134            ProgressTopic,
135            ProgressTopicReplicationFactor,
136            AwsPrivatelink,
137            SshTunnel,
138            SslKey,
139            SslCertificate,
140            SslCertificateAuthority,
141            SaslMechanisms,
142            SaslUsername,
143            SaslPassword,
144            SecurityProtocol,
145        ],
146        CreateConnectionType::Postgres => &[
147            AwsPrivatelink,
148            Database,
149            Host,
150            Password,
151            Port,
152            SshTunnel,
153            SslCertificate,
154            SslCertificateAuthority,
155            SslKey,
156            SslMode,
157            User,
158        ],
159        CreateConnectionType::Ssh => &[Host, Port, User, PublicKey1, PublicKey2],
160        CreateConnectionType::MySql => &[
161            AwsPrivatelink,
162            Host,
163            Password,
164            Port,
165            SshTunnel,
166            SslCertificate,
167            SslCertificateAuthority,
168            SslKey,
169            SslMode,
170            User,
171            AwsConnection,
172        ],
173        CreateConnectionType::SqlServer => &[
174            AwsPrivatelink,
175            Database,
176            Host,
177            Password,
178            Port,
179            SshTunnel,
180            SslCertificate,
181            SslCertificateAuthority,
182            SslKey,
183            SslMode,
184            User,
185        ],
186        CreateConnectionType::IcebergCatalog => &[
187            AwsConnection,
188            CatalogType,
189            Credential,
190            Scope,
191            Url,
192            Warehouse,
193        ],
194    };
195
196    for o in permitted_options {
197        options.remove(o);
198    }
199
200    if !options.is_empty() {
201        sql_bail!(
202            "{} connections do not support {} values",
203            t,
204            options.iter().join(", ")
205        )
206    }
207
208    Ok(())
209}
210
211impl ConnectionOptionExtracted {
212    pub(super) fn ensure_only_valid_options(
213        &self,
214        t: CreateConnectionType,
215    ) -> Result<(), PlanError> {
216        validate_options_per_connection_type(t, self.seen.clone())
217    }
218
219    pub fn try_into_connection_details(
220        self,
221        scx: &StatementContext,
222        connection_type: CreateConnectionType,
223    ) -> Result<ConnectionDetails, PlanError> {
224        self.ensure_only_valid_options(connection_type)?;
225
226        let connection: ConnectionDetails = match connection_type {
227            CreateConnectionType::Aws => {
228                let credentials = match (
229                    self.access_key_id,
230                    self.secret_access_key,
231                    self.session_token,
232                ) {
233                    (Some(access_key_id), Some(secret_access_key), session_token) => {
234                        Some(AwsCredentials {
235                            access_key_id,
236                            secret_access_key: secret_access_key.into(),
237                            session_token,
238                        })
239                    }
240                    (None, None, None) => None,
241                    _ => {
242                        sql_bail!(
243                            "must specify both ACCESS KEY ID and SECRET ACCESS KEY with optional SESSION TOKEN"
244                        );
245                    }
246                };
247
248                let assume_role = match (self.assume_role_arn, self.assume_role_session_name) {
249                    (Some(arn), session_name) => Some(AwsAssumeRole { arn, session_name }),
250                    (None, Some(_)) => {
251                        sql_bail!(
252                            "must specify ASSUME ROLE ARN with optional ASSUME ROLE SESSION NAME"
253                        );
254                    }
255                    _ => None,
256                };
257
258                let auth = match (credentials, assume_role) {
259                    (None, None) => sql_bail!(
260                        "must specify either ASSUME ROLE ARN or ACCESS KEY ID and SECRET ACCESS KEY"
261                    ),
262                    (Some(credentials), None) => AwsAuth::Credentials(credentials),
263                    (None, Some(assume_role)) => AwsAuth::AssumeRole(assume_role),
264                    (Some(_), Some(_)) => {
265                        sql_bail!("cannot specify both ACCESS KEY ID and ASSUME ROLE ARN");
266                    }
267                };
268
269                ConnectionDetails::Aws(AwsConnection {
270                    auth,
271                    endpoint: match self.endpoint {
272                        // TODO(benesch): this should not treat an empty endpoint as equivalent to a `NULL`
273                        // endpoint, but making that change now would break testdrive. AWS connections are
274                        // all behind feature flags mode right now, so no particular urgency to correct
275                        // this.
276                        Some(endpoint) if !endpoint.is_empty() => Some(endpoint),
277                        _ => None,
278                    },
279                    region: self.region,
280                })
281            }
282            CreateConnectionType::AwsPrivatelink => {
283                let connection = AwsPrivatelinkConnection {
284                    service_name: self
285                        .service_name
286                        .ok_or_else(|| sql_err!("SERVICE NAME option is required"))?,
287                    availability_zones: self
288                        .availability_zones
289                        .ok_or_else(|| sql_err!("AVAILABILITY ZONES option is required"))?,
290                };
291                if let Some(supported_azs) = scx.catalog.aws_privatelink_availability_zones() {
292                    let mut unique_azs: BTreeSet<String> = BTreeSet::new();
293                    let mut duplicate_azs: BTreeSet<String> = BTreeSet::new();
294                    // Validate each AZ is supported
295                    for connection_az in &connection.availability_zones {
296                        if unique_azs.contains(connection_az) {
297                            duplicate_azs.insert(connection_az.to_string());
298                        } else {
299                            unique_azs.insert(connection_az.to_string());
300                        }
301                        if !supported_azs.contains(connection_az) {
302                            return Err(PlanError::InvalidPrivatelinkAvailabilityZone {
303                                name: connection_az.to_string(),
304                                supported_azs,
305                            });
306                        }
307                    }
308                    if duplicate_azs.len() > 0 {
309                        return Err(PlanError::DuplicatePrivatelinkAvailabilityZone {
310                            duplicate_azs,
311                        });
312                    }
313                }
314                ConnectionDetails::AwsPrivatelink(connection)
315            }
316            CreateConnectionType::Kafka => {
317                let (tls, sasl) = plan_kafka_security(scx, &self)?;
318
319                ConnectionDetails::Kafka(KafkaConnection {
320                    brokers: self.get_brokers(scx)?,
321                    default_tunnel: scx
322                        .build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?,
323                    progress_topic: self.progress_topic,
324                    progress_topic_options: KafkaTopicOptions {
325                        // We only allow configuring the progress topic replication factor for now.
326                        // For correctness, the partition count MUST be one and for performance the compaction
327                        // policy MUST be enabled.
328                        partition_count: Some(NonNeg::try_from(1).expect("1 is positive")),
329                        replication_factor: self.progress_topic_replication_factor.map(|val| {
330                            if val <= 0 {
331                                Err(sql_err!("invalid CONNECTION: PROGRESS TOPIC REPLICATION FACTOR must be greater than 0"))?
332                            }
333                            NonNeg::try_from(val).map_err(|e| sql_err!("{e}"))
334                        }).transpose()?,
335                        topic_config: btreemap! {
336                            "cleanup.policy".to_string() => "compact".to_string(),
337                            "segment.bytes".to_string() => "134217728".to_string(), // 128 MB
338                        },
339                    },
340                    options: BTreeMap::new(),
341                    tls,
342                    sasl,
343                })
344            }
345            CreateConnectionType::Csr => {
346                let url: reqwest::Url = match self.url {
347                    Some(url) => url
348                        .parse()
349                        .map_err(|e| sql_err!("parsing schema registry url: {e}"))?,
350                    None => sql_bail!("invalid CONNECTION: must specify URL"),
351                };
352                let _ = url
353                    .host_str()
354                    .ok_or_else(|| sql_err!("invalid CONNECTION: URL must specify domain name"))?;
355                if url.path() != "/" {
356                    sql_bail!("invalid CONNECTION: URL must have an empty path");
357                }
358                let cert = self.ssl_certificate;
359                let key = self.ssl_key.map(|secret| secret.into());
360                let tls_identity = match (cert, key) {
361                    (None, None) => None,
362                    (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
363                    _ => sql_bail!(
364                        "invalid CONNECTION: reading from SSL-auth Confluent Schema Registry requires both SSL KEY and SSL CERTIFICATE"
365                    ),
366                };
367                let http_auth = self.user.map(|username| CsrConnectionHttpAuth {
368                    username,
369                    password: self.password.map(|secret| secret.into()),
370                });
371
372                // TODO we should move to self.port being unsupported if aws_privatelink is some, see <https://github.com/MaterializeInc/database-issues/issues/7359#issuecomment-1925443977>
373                if let Some(privatelink) = self.aws_privatelink.as_ref() {
374                    if privatelink.port.is_some() {
375                        sql_bail!(
376                            "invalid CONNECTION: CONFLUENT SCHEMA REGISTRY does not support PORT for AWS PRIVATELINK"
377                        )
378                    }
379                }
380                let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?;
381
382                ConnectionDetails::Csr(CsrConnection {
383                    url,
384                    tls_root_cert: self.ssl_certificate_authority,
385                    tls_identity,
386                    http_auth,
387                    tunnel,
388                })
389            }
390            CreateConnectionType::Postgres => {
391                let cert = self.ssl_certificate;
392                let key = self.ssl_key.map(|secret| secret.into());
393                let tls_identity = match (cert, key) {
394                    (None, None) => None,
395                    (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
396                    _ => sql_bail!(
397                        "invalid CONNECTION: both SSL KEY and SSL CERTIFICATE are required"
398                    ),
399                };
400                let tls_mode = match self.ssl_mode.as_ref().map(|m| m.as_str()) {
401                    None | Some("disable") => tokio_postgres::config::SslMode::Disable,
402                    // "prefer" intentionally omitted because it has dubious security
403                    // properties.
404                    Some("require") | Some("required") => tokio_postgres::config::SslMode::Require,
405                    Some("verify_ca") | Some("verify-ca") => {
406                        tokio_postgres::config::SslMode::VerifyCa
407                    }
408                    Some("verify_full") | Some("verify-full") => {
409                        tokio_postgres::config::SslMode::VerifyFull
410                    }
411                    Some(m) => sql_bail!("invalid CONNECTION: unknown SSL MODE {}", m.quoted()),
412                };
413
414                // TODO we should move to self.port being unsupported if aws_privatelink is some, see <https://github.com/MaterializeInc/database-issues/issues/7359#issuecomment-1925443977>
415                if let Some(privatelink) = self.aws_privatelink.as_ref() {
416                    if privatelink.port.is_some() {
417                        sql_bail!(
418                            "invalid CONNECTION: POSTGRES does not support PORT for AWS PRIVATELINK"
419                        )
420                    }
421                }
422                let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?;
423
424                ConnectionDetails::Postgres(PostgresConnection {
425                    database: self
426                        .database
427                        .ok_or_else(|| sql_err!("DATABASE option is required"))?,
428                    password: self.password.map(|password| password.into()),
429                    host: self
430                        .host
431                        .ok_or_else(|| sql_err!("HOST option is required"))?,
432                    port: self.port.unwrap_or(5432_u16),
433                    tunnel,
434                    tls_mode,
435                    tls_root_cert: self.ssl_certificate_authority,
436                    tls_identity,
437                    user: self
438                        .user
439                        .ok_or_else(|| sql_err!("USER option is required"))?,
440                })
441            }
442            CreateConnectionType::Ssh => {
443                let ensure_key = |public_key| match public_key {
444                    Some(public_key) => Ok::<_, anyhow::Error>(SshKey::PublicOnly(public_key)),
445                    None => {
446                        let key = SshKeyPair::new().context("creating SSH key")?;
447                        Ok(SshKey::Both(key))
448                    }
449                };
450                ConnectionDetails::Ssh {
451                    connection: SshConnection {
452                        host: self
453                            .host
454                            .ok_or_else(|| sql_err!("HOST option is required"))?,
455                        port: self.port.unwrap_or(22_u16),
456                        user: match self
457                            .user
458                            .ok_or_else(|| sql_err!("USER option is required"))?
459                        {
460                            StringOrSecret::String(user) => user,
461                            StringOrSecret::Secret(_) => {
462                                sql_bail!(
463                                    "SSH connections do not support supplying USER value as SECRET"
464                                )
465                            }
466                        },
467                    },
468                    key_1: ensure_key(self.public_key1)?,
469                    key_2: ensure_key(self.public_key2)?,
470                }
471            }
472            CreateConnectionType::MySql => {
473                let aws_connection = get_aws_connection_reference(scx, &self)?;
474                if aws_connection.is_some() && self.password.is_some() {
475                    sql_bail!(
476                        "invalid CONNECTION: AWS IAM authentication is not supported with password"
477                    );
478                }
479
480                let cert = self.ssl_certificate;
481                let key = self.ssl_key.map(|secret| secret.into());
482                let tls_identity = match (cert, key) {
483                    (None, None) => None,
484                    (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
485                    _ => sql_bail!(
486                        "invalid CONNECTION: both SSL KEY and SSL CERTIFICATE are required"
487                    ),
488                };
489                // Accepts the same SSL Mode values as the MySQL Client
490                // https://dev.mysql.com/doc/refman/8.0/en/connection-options.html#option_general_ssl-mode
491                let tls_mode = match self
492                    .ssl_mode
493                    .map(|f| f.to_uppercase())
494                    .as_ref()
495                    .map(|m| m.as_str())
496                {
497                    None | Some("DISABLED") => {
498                        if aws_connection.is_some() {
499                            sql_bail!(
500                                "invalid CONNECTION: AWS IAM authentication requires SSL to be enabled"
501                            )
502                        }
503                        MySqlSslMode::Disabled
504                    }
505                    // "preferred" intentionally omitted because it has dubious security
506                    // properties.
507                    Some("REQUIRED") | Some("REQUIRE") => MySqlSslMode::Required,
508                    Some("VERIFY_CA") | Some("VERIFY-CA") => MySqlSslMode::VerifyCa,
509                    Some("VERIFY_IDENTITY") | Some("VERIFY-IDENTITY") => {
510                        MySqlSslMode::VerifyIdentity
511                    }
512                    Some(m) => sql_bail!("invalid CONNECTION: unknown SSL MODE {}", m.quoted()),
513                };
514
515                // TODO we should move to self.port being unsupported if aws_privatelink is some, see <https://github.com/MaterializeInc/database-issues/issues/7359#issuecomment-1925443977>
516                if let Some(privatelink) = self.aws_privatelink.as_ref() {
517                    if privatelink.port.is_some() {
518                        sql_bail!(
519                            "invalid CONNECTION: MYSQL does not support PORT for AWS PRIVATELINK"
520                        )
521                    }
522                }
523                let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?;
524
525                ConnectionDetails::MySql(MySqlConnection {
526                    password: self.password.map(|password| password.into()),
527                    host: self
528                        .host
529                        .ok_or_else(|| sql_err!("HOST option is required"))?,
530                    port: self.port.unwrap_or(3306_u16),
531                    tunnel,
532                    tls_mode,
533                    tls_root_cert: self.ssl_certificate_authority,
534                    tls_identity,
535                    user: self
536                        .user
537                        .ok_or_else(|| sql_err!("USER option is required"))?,
538                    aws_connection,
539                })
540            }
541            CreateConnectionType::SqlServer => {
542                scx.require_feature_flag(&vars::ENABLE_SQL_SERVER_SOURCE)?;
543
544                let aws_connection = get_aws_connection_reference(scx, &self)?;
545                if aws_connection.is_some() && self.password.is_some() {
546                    sql_bail!(
547                        "invalid CONNECTION: AWS IAM authentication is not supported with password"
548                    );
549                }
550
551                let (encryption, certificate_validation_policy) = match self
552                    .ssl_mode
553                    .map(|mode| mode.to_uppercase())
554                    .as_ref()
555                    .map(|mode| mode.as_str())
556                {
557                    None | Some("DISABLED") => (
558                        mz_sql_server_util::config::EncryptionLevel::None,
559                        mz_sql_server_util::config::CertificateValidationPolicy::TrustAll,
560                    ),
561                    Some("REQUIRED") => (
562                        mz_sql_server_util::config::EncryptionLevel::Required,
563                        mz_sql_server_util::config::CertificateValidationPolicy::TrustAll,
564                    ),
565                    Some("VERIFY") => (
566                        mz_sql_server_util::config::EncryptionLevel::Required,
567                        mz_sql_server_util::config::CertificateValidationPolicy::VerifySystem,
568                    ),
569                    Some("VERIFY_CA") => {
570                        if self.ssl_certificate_authority.is_none() {
571                            sql_bail!(
572                                "invalid CONNECTION: SSL MODE 'verify_ca' requires SSL CERTIFICATE AUTHORITY"
573                            );
574                        }
575                        (
576                            mz_sql_server_util::config::EncryptionLevel::Required,
577                            mz_sql_server_util::config::CertificateValidationPolicy::VerifyCA,
578                        )
579                    }
580                    Some(mode) => {
581                        sql_bail!("invalid CONNECTION: unknown SSL MODE {}", mode.quoted())
582                    }
583                };
584
585                if let Some(privatelink) = self.aws_privatelink.as_ref() {
586                    if privatelink.port.is_some() {
587                        sql_bail!(
588                            "invalid CONNECTION: SQL SERVER does not support PORT for AWS PRIVATELINK"
589                        )
590                    }
591                }
592
593                // 1433 is the default port for SQL Server instances running over TCP.
594                //
595                // See: <https://learn.microsoft.com/en-us/sql/database-engine/configure-windows/configure-a-server-to-listen-on-a-specific-tcp-port?view=sql-server-ver16>
596                let port = self.port.unwrap_or(1433_u16);
597                let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?;
598
599                ConnectionDetails::SqlServer(SqlServerConnectionDetails {
600                    host: self
601                        .host
602                        .ok_or_else(|| sql_err!("HOST option is required"))?,
603                    port,
604                    database: self
605                        .database
606                        .ok_or_else(|| sql_err!("DATABASE option is required"))?,
607                    user: self
608                        .user
609                        .ok_or_else(|| sql_err!("USER option is required"))?,
610                    password: self
611                        .password
612                        .ok_or_else(|| sql_err!("PASSWORD option is required"))
613                        .map(|pass| pass.into())?,
614                    tunnel,
615                    encryption,
616                    certificate_validation_policy,
617                    tls_root_cert: self.ssl_certificate_authority,
618                })
619            }
620            CreateConnectionType::IcebergCatalog => {
621                let catalog_type = self.catalog_type.clone().ok_or_else(|| {
622                    sql_err!("invalid CONNECTION: ICEBERG connections must specify CATALOG TYPE")
623                })?;
624
625                let uri: reqwest::Url = match &self.url {
626                    Some(url) => url
627                        .parse()
628                        .map_err(|e| sql_err!("parsing Iceberg catalog url: {e}"))?,
629                    None => sql_bail!("invalid CONNECTION: must specify URL"),
630                };
631
632                let warehouse = self.warehouse.clone();
633                let credential = self.credential.clone();
634                let aws_connection = get_aws_connection_reference(scx, &self)?;
635
636                let catalog = match catalog_type {
637                    IcebergCatalogType::S3TablesRest => {
638                        let Some(warehouse) = warehouse else {
639                            sql_bail!(
640                                "invalid CONNECTION: ICEBERG s3tablesrest connections must specify WAREHOUSE"
641                            );
642                        };
643                        let Some(aws_connection) = aws_connection else {
644                            sql_bail!(
645                                "invalid CONNECTION: ICEBERG s3tablesrest connections require an AWS connection"
646                            );
647                        };
648
649                        IcebergCatalogImpl::S3TablesRest(S3TablesRestIcebergCatalog {
650                            aws_connection,
651                            warehouse,
652                        })
653                    }
654                    IcebergCatalogType::Rest => {
655                        let Some(credential) = credential else {
656                            sql_bail!(
657                                "invalid CONNECTION: ICEBERG rest connections require a CREDENTIAL"
658                            );
659                        };
660
661                        IcebergCatalogImpl::Rest(RestIcebergCatalog {
662                            credential,
663                            scope: self.scope.clone(),
664                            warehouse,
665                        })
666                    }
667                };
668
669                ConnectionDetails::IcebergCatalog(IcebergCatalogConnection { catalog, uri })
670            }
671        };
672
673        Ok(connection)
674    }
675
676    pub fn get_brokers(
677        &self,
678        scx: &StatementContext,
679    ) -> Result<Vec<mz_storage_types::connections::KafkaBroker<ReferencedConnection>>, PlanError>
680    {
681        let mut brokers = match (&self.broker, &self.brokers, &self.aws_privatelink) {
682            (Some(v), None, None) => v.to_vec(),
683            (None, Some(v), None) => v.to_vec(),
684            (None, None, Some(_)) => vec![],
685            (None, None, None) => {
686                sql_bail!("invalid CONNECTION: must set one of BROKER, BROKERS, or AWS PRIVATELINK")
687            }
688            _ => sql_bail!(
689                "invalid CONNECTION: can only set one of BROKER, BROKERS, or AWS PRIVATELINK"
690            ),
691        };
692
693        // NOTE: we allow broker configurations to be mixed and matched. If/when we support
694        // a top-level `SSH TUNNEL` configuration, we will need additional assertions.
695
696        let mut out = vec![];
697        for broker in &mut brokers {
698            if broker.address.contains(',') {
699                sql_bail!("invalid CONNECTION: cannot specify multiple Kafka broker addresses in one string.\n\n
700Instead, specify BROKERS using multiple strings, e.g. BROKERS ('kafka:9092', 'kafka:9093')");
701            }
702
703            let tunnel = match &broker.tunnel {
704                KafkaBrokerTunnel::Direct => Tunnel::Direct,
705                KafkaBrokerTunnel::AwsPrivatelink(aws_privatelink) => {
706                    let KafkaBrokerAwsPrivatelinkOptionExtracted {
707                        availability_zone,
708                        port,
709                        seen: _,
710                    } = KafkaBrokerAwsPrivatelinkOptionExtracted::try_from(
711                        aws_privatelink.options.clone(),
712                    )?;
713
714                    let id = match &aws_privatelink.connection {
715                        ResolvedItemName::Item { id, .. } => id,
716                        _ => sql_bail!(
717                            "internal error: Kafka PrivateLink connection was not resolved"
718                        ),
719                    };
720                    let entry = scx.catalog.get_item(id);
721                    match entry.connection()? {
722                        Connection::AwsPrivatelink(connection) => {
723                            if let Some(az) = &availability_zone {
724                                if !connection.availability_zones.contains(az) {
725                                    sql_bail!(
726                                        "AWS PrivateLink availability zone {} does not match any of the \
727                                      availability zones on the AWS PrivateLink connection {}",
728                                        az.quoted(),
729                                        scx.catalog
730                                            .resolve_full_name(entry.name())
731                                            .to_string()
732                                            .quoted()
733                                    )
734                                }
735                            }
736                            Tunnel::AwsPrivatelink(AwsPrivatelink {
737                                connection_id: *id,
738                                availability_zone,
739                                port,
740                            })
741                        }
742                        _ => {
743                            sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item)
744                        }
745                    }
746                }
747                KafkaBrokerTunnel::SshTunnel(ssh) => {
748                    let id = match &ssh {
749                        ResolvedItemName::Item { id, .. } => id,
750                        _ => sql_bail!(
751                            "internal error: Kafka SSH tunnel connection was not resolved"
752                        ),
753                    };
754                    let ssh_tunnel = scx.catalog.get_item(id);
755                    match ssh_tunnel.connection()? {
756                        Connection::Ssh(_connection) => Tunnel::Ssh(SshTunnel {
757                            connection_id: *id,
758                            connection: *id,
759                        }),
760                        _ => {
761                            sql_bail!("{} is not an SSH connection", ssh_tunnel.name().item)
762                        }
763                    }
764                }
765            };
766
767            out.push(mz_storage_types::connections::KafkaBroker {
768                address: broker.address.clone(),
769                tunnel,
770            });
771        }
772
773        Ok(out)
774    }
775}
776
777fn get_aws_connection_reference(
778    scx: &StatementContext,
779    conn_options: &ConnectionOptionExtracted,
780) -> Result<Option<AwsConnectionReference<ReferencedConnection>>, PlanError> {
781    let Some(aws_connection_id) = conn_options.aws_connection else {
782        return Ok(None);
783    };
784
785    let id = CatalogItemId::from(aws_connection_id);
786    let item = scx.catalog.get_item(&id);
787    Ok(match item.connection()? {
788        Connection::Aws(_) => Some(AwsConnectionReference {
789            connection_id: id,
790            connection: id,
791        }),
792        _ => sql_bail!("{} is not an AWS connection", item.name().item),
793    })
794}
795
796fn plan_kafka_security(
797    scx: &StatementContext,
798    v: &ConnectionOptionExtracted,
799) -> Result<
800    (
801        Option<KafkaTlsConfig>,
802        Option<KafkaSaslConfig<ReferencedConnection>>,
803    ),
804    PlanError,
805> {
806    const SASL_CONFIGS: [ConnectionOptionName; 4] = [
807        ConnectionOptionName::AwsConnection,
808        ConnectionOptionName::SaslMechanisms,
809        ConnectionOptionName::SaslUsername,
810        ConnectionOptionName::SaslPassword,
811    ];
812
813    const ALL_CONFIGS: [ConnectionOptionName; 7] = concat_arrays!(
814        [
815            ConnectionOptionName::SslKey,
816            ConnectionOptionName::SslCertificate,
817            ConnectionOptionName::SslCertificateAuthority,
818        ],
819        SASL_CONFIGS
820    );
821
822    enum SecurityProtocol {
823        Plaintext,
824        Ssl,
825        SaslPlaintext,
826        SaslSsl,
827    }
828
829    let security_protocol = v.security_protocol.as_ref().map(|v| v.to_uppercase());
830    let security_protocol = match security_protocol.as_deref() {
831        Some("PLAINTEXT") => SecurityProtocol::Plaintext,
832        Some("SSL") => SecurityProtocol::Ssl,
833        Some("SASL_PLAINTEXT") => SecurityProtocol::SaslPlaintext,
834        Some("SASL_SSL") => SecurityProtocol::SaslSsl,
835        Some(p) => sql_bail!("unknown security protocol: {}", p),
836        // To be secure by default, if no security protocol is explicitly
837        // specified, we always choose one of the SSL-enabled protocols, using
838        // the presence of any SASL options to guide us between them. Users must
839        // explicitly choose a plaintext mechanism if that's what they want.
840        None if SASL_CONFIGS.iter().any(|c| v.seen.contains(c)) => SecurityProtocol::SaslSsl,
841        None => SecurityProtocol::Ssl,
842    };
843
844    let mut outstanding = ALL_CONFIGS
845        .into_iter()
846        .filter(|c| v.seen.contains(c))
847        .collect::<BTreeSet<ConnectionOptionName>>();
848
849    let tls = match security_protocol {
850        SecurityProtocol::Ssl | SecurityProtocol::SaslSsl => {
851            outstanding.remove(&ConnectionOptionName::SslCertificate);
852            let identity = match &v.ssl_certificate {
853                None => None,
854                Some(cert) => {
855                    outstanding.remove(&ConnectionOptionName::SslKey);
856                    let Some(key) = &v.ssl_key else {
857                        sql_bail!("SSL KEY must be specified with SSL CERTIFICATE");
858                    };
859                    Some(TlsIdentity {
860                        cert: cert.clone(),
861                        key: (*key).into(),
862                    })
863                }
864            };
865            outstanding.remove(&ConnectionOptionName::SslCertificateAuthority);
866            Some(KafkaTlsConfig {
867                identity,
868                root_cert: v.ssl_certificate_authority.clone(),
869            })
870        }
871        _ => None,
872    };
873
874    let sasl = match security_protocol {
875        SecurityProtocol::SaslPlaintext | SecurityProtocol::SaslSsl => {
876            outstanding.remove(&ConnectionOptionName::AwsConnection);
877            match get_aws_connection_reference(scx, v)? {
878                Some(aws) => {
879                    scx.require_feature_flag(&ENABLE_AWS_MSK_IAM_AUTH)?;
880                    Some(KafkaSaslConfig {
881                        mechanism: "OAUTHBEARER".into(),
882                        username: "".into(),
883                        password: None,
884                        aws: Some(aws),
885                    })
886                }
887                None => {
888                    outstanding.remove(&ConnectionOptionName::SaslMechanisms);
889                    outstanding.remove(&ConnectionOptionName::SaslUsername);
890                    outstanding.remove(&ConnectionOptionName::SaslPassword);
891                    // TODO(benesch): support a less confusing `SASL MECHANISM`
892                    // alias, as only a single mechanism that can be specified.
893                    let Some(mechanism) = &v.sasl_mechanisms else {
894                        sql_bail!("SASL MECHANISMS must be specified");
895                    };
896                    let Some(username) = &v.sasl_username else {
897                        sql_bail!("SASL USERNAME must be specified");
898                    };
899                    let Some(password) = &v.sasl_password else {
900                        sql_bail!("SASL PASSWORD must be specified");
901                    };
902                    Some(KafkaSaslConfig {
903                        // librdkafka requires SASL mechanisms to be upper case (PLAIN,
904                        // SCRAM-SHA-256). For usability, we automatically uppercase the
905                        // mechanism that user provides. This avoids a frustrating
906                        // interaction with identifier case folding. Consider `SASL
907                        // MECHANISMS = PLAIN`. Identifier case folding results in a
908                        // SASL mechanism of `plain` (note the lowercase), which
909                        // Materialize previously rejected with an error of "SASL
910                        // mechanism must be uppercase." This was deeply frustarting for
911                        // users who were not familiar with identifier case folding
912                        // rules. See database-issues#6693.
913                        mechanism: mechanism.to_uppercase(),
914                        username: username.clone(),
915                        password: Some((*password).into()),
916                        aws: None,
917                    })
918                }
919            }
920        }
921        _ => None,
922    };
923
924    if let Some(outstanding) = outstanding.first() {
925        sql_bail!("option {outstanding} not supported with this configuration");
926    }
927
928    Ok((tls, sasl))
929}