Skip to main content

mz_sql/plan/statement/ddl/
connection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data definition language (DDL) utilities for CONNECTION objects.
11
12use std::collections::{BTreeMap, BTreeSet};
13
14use anyhow::Context;
15use array_concat::concat_arrays;
16use itertools::Itertools;
17use maplit::btreemap;
18use mz_ore::num::NonNeg;
19use mz_ore::str::StrExt;
20use mz_repr::CatalogItemId;
21use mz_sql_parser::ast::ConnectionOptionName::*;
22use mz_sql_parser::ast::display::AstDisplay;
23use mz_sql_parser::ast::{
24    ConnectionDefaultAwsPrivatelink, ConnectionOption, ConnectionOptionName, CreateConnectionType,
25    KafkaBroker, KafkaBrokerAwsPrivatelinkOption, KafkaBrokerAwsPrivatelinkOptionName,
26    KafkaBrokerTunnel, KafkaMatchingBrokerRule,
27};
28use mz_ssh_util::keys::SshKeyPair;
29use mz_storage_types::connections::aws::{
30    AwsAssumeRole, AwsAuth, AwsConnection, AwsConnectionReference, AwsCredentials,
31};
32use mz_storage_types::connections::inline::ReferencedConnection;
33use mz_storage_types::connections::string_or_secret::StringOrSecret;
34use mz_storage_types::connections::{
35    AwsPrivatelink, AwsPrivatelinkConnection, AwsPrivatelinkRule, CsrConnection,
36    CsrConnectionHttpAuth, IcebergCatalogConnection, IcebergCatalogImpl, IcebergCatalogType,
37    KafkaConnection, KafkaSaslConfig, KafkaTlsConfig, KafkaTopicOptions, MySqlConnection,
38    MySqlSslMode, PostgresConnection, RestIcebergCatalog, S3TablesRestIcebergCatalog,
39    SqlServerConnectionDetails, SshConnection, SshTunnel, TlsIdentity, Tunnel,
40};
41
42use crate::names::Aug;
43use crate::plan::statement::{Connection, ResolvedItemName};
44use crate::plan::with_options::{self};
45use crate::plan::{ConnectionDetails, PlanError, SshKey, StatementContext};
46use crate::session::vars;
47
48generate_extracted_config!(
49    ConnectionOption,
50    (AccessKeyId, StringOrSecret),
51    (AssumeRoleArn, String),
52    (AssumeRoleSessionName, String),
53    (AvailabilityZones, Vec<String>),
54    (AwsConnection, with_options::Object),
55    (AwsPrivatelink, ConnectionDefaultAwsPrivatelink<Aug>),
56    (Broker, Vec<KafkaBroker<Aug>>),
57    (Brokers, with_options::BrokersList),
58    (Credential, StringOrSecret),
59    (Database, String),
60    (Endpoint, String),
61    (Host, String),
62    (Password, with_options::Secret),
63    (Port, u16),
64    (ProgressTopic, String),
65    (ProgressTopicReplicationFactor, i32),
66    (PublicKey1, String),
67    (PublicKey2, String),
68    (Region, String),
69    (SaslMechanisms, String),
70    (SaslPassword, with_options::Secret),
71    (SaslUsername, StringOrSecret),
72    (Scope, String),
73    (SecretAccessKey, with_options::Secret),
74    (SecurityProtocol, String),
75    (ServiceName, String),
76    (SshTunnel, with_options::Object),
77    (SslCertificate, StringOrSecret),
78    (SslCertificateAuthority, StringOrSecret),
79    (SslKey, with_options::Secret),
80    (SslMode, String),
81    (SessionToken, StringOrSecret),
82    (CatalogType, IcebergCatalogType),
83    (Url, String),
84    (User, StringOrSecret),
85    (Warehouse, String)
86);
87
88generate_extracted_config!(
89    KafkaBrokerAwsPrivatelinkOption,
90    (AvailabilityZone, String),
91    (Port, u16)
92);
93
94/// Options which cannot be changed using ALTER CONNECTION.
95pub(crate) const INALTERABLE_OPTIONS: &[ConnectionOptionName] =
96    &[ProgressTopic, ProgressTopicReplicationFactor];
97
98/// Options of which only one may be specified.
99pub(crate) const MUTUALLY_EXCLUSIVE_SETS: &[&[ConnectionOptionName]] = &[&[Broker, Brokers]];
100
101pub(super) fn validate_options_per_connection_type(
102    t: CreateConnectionType,
103    mut options: BTreeSet<ConnectionOptionName>,
104) -> Result<(), PlanError> {
105    use mz_sql_parser::ast::ConnectionOptionName::*;
106    let permitted_options = match t {
107        CreateConnectionType::Aws => [
108            AccessKeyId,
109            SecretAccessKey,
110            SessionToken,
111            Endpoint,
112            Region,
113            AssumeRoleArn,
114            AssumeRoleSessionName,
115        ]
116        .as_slice(),
117        CreateConnectionType::AwsPrivatelink => &[AvailabilityZones, Port, ServiceName],
118        CreateConnectionType::Csr => &[
119            AwsPrivatelink,
120            Password,
121            Port,
122            SshTunnel,
123            SslCertificate,
124            SslCertificateAuthority,
125            SslKey,
126            Url,
127            User,
128        ],
129        CreateConnectionType::Kafka => &[
130            AwsConnection,
131            Broker,
132            Brokers,
133            ProgressTopic,
134            ProgressTopicReplicationFactor,
135            AwsPrivatelink,
136            SshTunnel,
137            SslKey,
138            SslCertificate,
139            SslCertificateAuthority,
140            SaslMechanisms,
141            SaslUsername,
142            SaslPassword,
143            SecurityProtocol,
144        ],
145        CreateConnectionType::Postgres => &[
146            AwsPrivatelink,
147            Database,
148            Host,
149            Password,
150            Port,
151            SshTunnel,
152            SslCertificate,
153            SslCertificateAuthority,
154            SslKey,
155            SslMode,
156            User,
157        ],
158        CreateConnectionType::Ssh => &[Host, Port, User, PublicKey1, PublicKey2],
159        CreateConnectionType::MySql => &[
160            AwsPrivatelink,
161            Host,
162            Password,
163            Port,
164            SshTunnel,
165            SslCertificate,
166            SslCertificateAuthority,
167            SslKey,
168            SslMode,
169            User,
170            AwsConnection,
171        ],
172        CreateConnectionType::SqlServer => &[
173            AwsPrivatelink,
174            Database,
175            Host,
176            Password,
177            Port,
178            SshTunnel,
179            SslCertificate,
180            SslCertificateAuthority,
181            SslKey,
182            SslMode,
183            User,
184        ],
185        CreateConnectionType::IcebergCatalog => &[
186            AwsConnection,
187            CatalogType,
188            Credential,
189            Scope,
190            Url,
191            Warehouse,
192        ],
193    };
194
195    for o in permitted_options {
196        options.remove(o);
197    }
198
199    if !options.is_empty() {
200        sql_bail!(
201            "{} connections do not support {} values",
202            t,
203            options.iter().join(", ")
204        )
205    }
206
207    Ok(())
208}
209
210impl ConnectionOptionExtracted {
211    pub(super) fn ensure_only_valid_options(
212        &self,
213        t: CreateConnectionType,
214    ) -> Result<(), PlanError> {
215        validate_options_per_connection_type(t, self.seen.clone())
216    }
217
218    pub fn try_into_connection_details(
219        self,
220        scx: &StatementContext,
221        connection_type: CreateConnectionType,
222    ) -> Result<ConnectionDetails, PlanError> {
223        self.ensure_only_valid_options(connection_type)?;
224
225        let connection: ConnectionDetails = match connection_type {
226            CreateConnectionType::Aws => {
227                let credentials = match (
228                    self.access_key_id,
229                    self.secret_access_key,
230                    self.session_token,
231                ) {
232                    (Some(access_key_id), Some(secret_access_key), session_token) => {
233                        Some(AwsCredentials {
234                            access_key_id,
235                            secret_access_key: secret_access_key.into(),
236                            session_token,
237                        })
238                    }
239                    (None, None, None) => None,
240                    _ => {
241                        sql_bail!(
242                            "must specify both ACCESS KEY ID and SECRET ACCESS KEY with optional SESSION TOKEN"
243                        );
244                    }
245                };
246
247                let assume_role = match (self.assume_role_arn, self.assume_role_session_name) {
248                    (Some(arn), session_name) => Some(AwsAssumeRole { arn, session_name }),
249                    (None, Some(_)) => {
250                        sql_bail!(
251                            "must specify ASSUME ROLE ARN with optional ASSUME ROLE SESSION NAME"
252                        );
253                    }
254                    _ => None,
255                };
256
257                let auth = match (credentials, assume_role) {
258                    (None, None) => sql_bail!(
259                        "must specify either ASSUME ROLE ARN or ACCESS KEY ID and SECRET ACCESS KEY"
260                    ),
261                    (Some(credentials), None) => AwsAuth::Credentials(credentials),
262                    (None, Some(assume_role)) => AwsAuth::AssumeRole(assume_role),
263                    (Some(_), Some(_)) => {
264                        sql_bail!("cannot specify both ACCESS KEY ID and ASSUME ROLE ARN");
265                    }
266                };
267
268                ConnectionDetails::Aws(AwsConnection {
269                    auth,
270                    endpoint: match self.endpoint {
271                        // TODO(benesch): this should not treat an empty endpoint as equivalent to a `NULL`
272                        // endpoint, but making that change now would break testdrive. AWS connections are
273                        // all behind feature flags mode right now, so no particular urgency to correct
274                        // this.
275                        Some(endpoint) if !endpoint.is_empty() => Some(endpoint),
276                        _ => None,
277                    },
278                    region: self.region,
279                })
280            }
281            CreateConnectionType::AwsPrivatelink => {
282                let connection = AwsPrivatelinkConnection {
283                    service_name: self
284                        .service_name
285                        .ok_or_else(|| sql_err!("SERVICE NAME option is required"))?,
286                    availability_zones: self
287                        .availability_zones
288                        .ok_or_else(|| sql_err!("AVAILABILITY ZONES option is required"))?,
289                };
290                if let Some(supported_azs) = scx.catalog.aws_privatelink_availability_zones() {
291                    let mut unique_azs: BTreeSet<String> = BTreeSet::new();
292                    let mut duplicate_azs: BTreeSet<String> = BTreeSet::new();
293                    // Validate each AZ is supported
294                    for connection_az in &connection.availability_zones {
295                        if unique_azs.contains(connection_az) {
296                            duplicate_azs.insert(connection_az.to_string());
297                        } else {
298                            unique_azs.insert(connection_az.to_string());
299                        }
300                        if !supported_azs.contains(connection_az) {
301                            return Err(PlanError::InvalidPrivatelinkAvailabilityZone {
302                                name: connection_az.to_string(),
303                                supported_azs,
304                            });
305                        }
306                    }
307                    if duplicate_azs.len() > 0 {
308                        return Err(PlanError::DuplicatePrivatelinkAvailabilityZone {
309                            duplicate_azs,
310                        });
311                    }
312                }
313                ConnectionDetails::AwsPrivatelink(connection)
314            }
315            CreateConnectionType::Kafka => {
316                let (tls, sasl) = plan_kafka_security(scx, &self)?;
317                let (static_brokers, matching_rules) = self.get_brokers_and_rules(scx)?;
318
319                if !matching_rules.is_empty() {
320                    scx.require_feature_flag(&vars::ENABLE_KAFKA_BROKER_MATCHING_RULES)?;
321                }
322
323                ConnectionDetails::Kafka(KafkaConnection {
324                    brokers: static_brokers,
325                    default_tunnel: build_tunnel_definition(
326                            scx,
327                            self.ssh_tunnel,
328                            self.aws_privatelink,
329                            if matching_rules.is_empty() { None } else { Some(matching_rules) },
330                        )?,
331                    progress_topic: self.progress_topic,
332                    progress_topic_options: KafkaTopicOptions {
333                        // We only allow configuring the progress topic replication factor for now.
334                        // For correctness, the partition count MUST be one and for performance the compaction
335                        // policy MUST be enabled.
336                        partition_count: Some(NonNeg::try_from(1).expect("1 is positive")),
337                        replication_factor: self.progress_topic_replication_factor.map(|val| {
338                            if val <= 0 {
339                                Err(sql_err!("invalid CONNECTION: PROGRESS TOPIC REPLICATION FACTOR must be greater than 0"))?
340                            }
341                            NonNeg::try_from(val).map_err(|e| sql_err!("{e}"))
342                        }).transpose()?,
343                        topic_config: btreemap! {
344                            "cleanup.policy".to_string() => "compact".to_string(),
345                            "segment.bytes".to_string() => "134217728".to_string(), // 128 MiB
346                        },
347                    },
348                    options: BTreeMap::new(),
349                    tls,
350                    sasl,
351                })
352            }
353            CreateConnectionType::Csr => {
354                let url: reqwest::Url = match self.url {
355                    Some(url) => url
356                        .parse()
357                        .map_err(|e| sql_err!("parsing schema registry url: {e}"))?,
358                    None => sql_bail!("invalid CONNECTION: must specify URL"),
359                };
360                let _ = url
361                    .host_str()
362                    .ok_or_else(|| sql_err!("invalid CONNECTION: URL must specify domain name"))?;
363                if url.path() != "/" {
364                    sql_bail!("invalid CONNECTION: URL must have an empty path");
365                }
366                let cert = self.ssl_certificate;
367                let key = self.ssl_key.map(|secret| secret.into());
368                let tls_identity = match (cert, key) {
369                    (None, None) => None,
370                    (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
371                    _ => sql_bail!(
372                        "invalid CONNECTION: reading from SSL-auth Confluent Schema Registry requires both SSL KEY and SSL CERTIFICATE"
373                    ),
374                };
375                let http_auth = self.user.map(|username| CsrConnectionHttpAuth {
376                    username,
377                    password: self.password.map(|secret| secret.into()),
378                });
379
380                // TODO we should move to self.port being unsupported if aws_privatelink is some, see <https://github.com/MaterializeInc/database-issues/issues/7359#issuecomment-1925443977>
381                if let Some(privatelink) = self.aws_privatelink.as_ref() {
382                    if privatelink.port.is_some() {
383                        sql_bail!(
384                            "invalid CONNECTION: CONFLUENT SCHEMA REGISTRY does not support PORT for AWS PRIVATELINK"
385                        )
386                    }
387                }
388                let tunnel = build_tunnel_definition(
389                    scx,
390                    self.ssh_tunnel,
391                    self.aws_privatelink,
392                    None, /* Rule-based PrivateLink is not supported for CSR. */
393                )?;
394
395                ConnectionDetails::Csr(CsrConnection {
396                    url,
397                    tls_root_cert: self.ssl_certificate_authority,
398                    tls_identity,
399                    http_auth,
400                    tunnel,
401                })
402            }
403            CreateConnectionType::Postgres => {
404                let cert = self.ssl_certificate;
405                let key = self.ssl_key.map(|secret| secret.into());
406                let tls_identity = match (cert, key) {
407                    (None, None) => None,
408                    (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
409                    _ => sql_bail!(
410                        "invalid CONNECTION: both SSL KEY and SSL CERTIFICATE are required"
411                    ),
412                };
413                let tls_mode = match self.ssl_mode.as_ref().map(|m| m.as_str()) {
414                    None | Some("disable") => tokio_postgres::config::SslMode::Disable,
415                    // "prefer" intentionally omitted because it has dubious security
416                    // properties.
417                    Some("require") | Some("required") => tokio_postgres::config::SslMode::Require,
418                    Some("verify_ca") | Some("verify-ca") => {
419                        tokio_postgres::config::SslMode::VerifyCa
420                    }
421                    Some("verify_full") | Some("verify-full") => {
422                        tokio_postgres::config::SslMode::VerifyFull
423                    }
424                    Some(m) => sql_bail!("invalid CONNECTION: unknown SSL MODE {}", m.quoted()),
425                };
426
427                // TODO we should move to self.port being unsupported if aws_privatelink is some, see <https://github.com/MaterializeInc/database-issues/issues/7359#issuecomment-1925443977>
428                if let Some(privatelink) = self.aws_privatelink.as_ref() {
429                    if privatelink.port.is_some() {
430                        sql_bail!(
431                            "invalid CONNECTION: POSTGRES does not support PORT for AWS PRIVATELINK"
432                        )
433                    }
434                }
435                let tunnel = build_tunnel_definition(
436                    scx,
437                    self.ssh_tunnel,
438                    self.aws_privatelink,
439                    None, /* Rule-based PrivateLink is not supported for Postgres. */
440                )?;
441
442                ConnectionDetails::Postgres(PostgresConnection {
443                    database: self
444                        .database
445                        .ok_or_else(|| sql_err!("DATABASE option is required"))?,
446                    password: self.password.map(|password| password.into()),
447                    host: self
448                        .host
449                        .ok_or_else(|| sql_err!("HOST option is required"))?,
450                    port: self.port.unwrap_or(5432_u16),
451                    tunnel,
452                    tls_mode,
453                    tls_root_cert: self.ssl_certificate_authority,
454                    tls_identity,
455                    user: self
456                        .user
457                        .ok_or_else(|| sql_err!("USER option is required"))?,
458                })
459            }
460            CreateConnectionType::Ssh => {
461                let ensure_key = |public_key| match public_key {
462                    Some(public_key) => Ok::<_, anyhow::Error>(SshKey::PublicOnly(public_key)),
463                    None => {
464                        let key = SshKeyPair::new().context("creating SSH key")?;
465                        Ok(SshKey::Both(key))
466                    }
467                };
468                ConnectionDetails::Ssh {
469                    connection: SshConnection {
470                        host: self
471                            .host
472                            .ok_or_else(|| sql_err!("HOST option is required"))?,
473                        port: self.port.unwrap_or(22_u16),
474                        user: match self
475                            .user
476                            .ok_or_else(|| sql_err!("USER option is required"))?
477                        {
478                            StringOrSecret::String(user) => user,
479                            StringOrSecret::Secret(_) => {
480                                sql_bail!(
481                                    "SSH connections do not support supplying USER value as SECRET"
482                                )
483                            }
484                        },
485                    },
486                    key_1: ensure_key(self.public_key1)?,
487                    key_2: ensure_key(self.public_key2)?,
488                }
489            }
490            CreateConnectionType::MySql => {
491                let aws_connection = get_aws_connection_reference(scx, &self)?;
492                if aws_connection.is_some() && self.password.is_some() {
493                    sql_bail!(
494                        "invalid CONNECTION: AWS IAM authentication is not supported with password"
495                    );
496                }
497
498                let cert = self.ssl_certificate;
499                let key = self.ssl_key.map(|secret| secret.into());
500                let tls_identity = match (cert, key) {
501                    (None, None) => None,
502                    (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
503                    _ => sql_bail!(
504                        "invalid CONNECTION: both SSL KEY and SSL CERTIFICATE are required"
505                    ),
506                };
507                // Accepts the same SSL Mode values as the MySQL Client
508                // https://dev.mysql.com/doc/refman/8.0/en/connection-options.html#option_general_ssl-mode
509                let tls_mode = match self
510                    .ssl_mode
511                    .map(|f| f.to_uppercase())
512                    .as_ref()
513                    .map(|m| m.as_str())
514                {
515                    None | Some("DISABLED") => {
516                        if aws_connection.is_some() {
517                            sql_bail!(
518                                "invalid CONNECTION: AWS IAM authentication requires SSL to be enabled"
519                            )
520                        }
521                        MySqlSslMode::Disabled
522                    }
523                    // "preferred" intentionally omitted because it has dubious security
524                    // properties.
525                    Some("REQUIRED") | Some("REQUIRE") => MySqlSslMode::Required,
526                    Some("VERIFY_CA") | Some("VERIFY-CA") => MySqlSslMode::VerifyCa,
527                    Some("VERIFY_IDENTITY") | Some("VERIFY-IDENTITY") => {
528                        MySqlSslMode::VerifyIdentity
529                    }
530                    Some(m) => sql_bail!("invalid CONNECTION: unknown SSL MODE {}", m.quoted()),
531                };
532
533                // TODO we should move to self.port being unsupported if aws_privatelink is some, see <https://github.com/MaterializeInc/database-issues/issues/7359#issuecomment-1925443977>
534                if let Some(privatelink) = self.aws_privatelink.as_ref() {
535                    if privatelink.port.is_some() {
536                        sql_bail!(
537                            "invalid CONNECTION: MYSQL does not support PORT for AWS PRIVATELINK"
538                        )
539                    }
540                }
541                let tunnel = build_tunnel_definition(
542                    scx,
543                    self.ssh_tunnel,
544                    self.aws_privatelink,
545                    None, /* Rule-based PrivateLink is not supported for MySQL. */
546                )?;
547
548                ConnectionDetails::MySql(MySqlConnection {
549                    password: self.password.map(|password| password.into()),
550                    host: self
551                        .host
552                        .ok_or_else(|| sql_err!("HOST option is required"))?,
553                    port: self.port.unwrap_or(3306_u16),
554                    tunnel,
555                    tls_mode,
556                    tls_root_cert: self.ssl_certificate_authority,
557                    tls_identity,
558                    user: self
559                        .user
560                        .ok_or_else(|| sql_err!("USER option is required"))?,
561                    aws_connection,
562                })
563            }
564            CreateConnectionType::SqlServer => {
565                let aws_connection = get_aws_connection_reference(scx, &self)?;
566                if aws_connection.is_some() && self.password.is_some() {
567                    sql_bail!(
568                        "invalid CONNECTION: AWS IAM authentication is not supported with password"
569                    );
570                }
571
572                let (encryption, certificate_validation_policy) = match self
573                    .ssl_mode
574                    .map(|mode| mode.to_uppercase())
575                    .as_ref()
576                    .map(|mode| mode.as_str())
577                {
578                    None | Some("DISABLED") => (
579                        mz_sql_server_util::config::EncryptionLevel::None,
580                        mz_sql_server_util::config::CertificateValidationPolicy::TrustAll,
581                    ),
582                    Some("REQUIRED") => (
583                        mz_sql_server_util::config::EncryptionLevel::Required,
584                        mz_sql_server_util::config::CertificateValidationPolicy::TrustAll,
585                    ),
586                    Some("VERIFY") => (
587                        mz_sql_server_util::config::EncryptionLevel::Required,
588                        mz_sql_server_util::config::CertificateValidationPolicy::VerifySystem,
589                    ),
590                    Some("VERIFY_CA") => {
591                        if self.ssl_certificate_authority.is_none() {
592                            sql_bail!(
593                                "invalid CONNECTION: SSL MODE 'verify_ca' requires SSL CERTIFICATE AUTHORITY"
594                            );
595                        }
596                        (
597                            mz_sql_server_util::config::EncryptionLevel::Required,
598                            mz_sql_server_util::config::CertificateValidationPolicy::VerifyCA,
599                        )
600                    }
601                    Some(mode) => {
602                        sql_bail!("invalid CONNECTION: unknown SSL MODE {}", mode.quoted())
603                    }
604                };
605
606                if let Some(privatelink) = self.aws_privatelink.as_ref() {
607                    if privatelink.port.is_some() {
608                        sql_bail!(
609                            "invalid CONNECTION: SQL SERVER does not support PORT for AWS PRIVATELINK"
610                        )
611                    }
612                }
613
614                // 1433 is the default port for SQL Server instances running over TCP.
615                //
616                // See: <https://learn.microsoft.com/en-us/sql/database-engine/configure-windows/configure-a-server-to-listen-on-a-specific-tcp-port?view=sql-server-ver16>
617                let port = self.port.unwrap_or(1433_u16);
618                let tunnel = build_tunnel_definition(
619                    scx,
620                    self.ssh_tunnel,
621                    self.aws_privatelink,
622                    None, /* Rule-based PrivateLink is not supported for SQL Server. */
623                )?;
624
625                ConnectionDetails::SqlServer(SqlServerConnectionDetails {
626                    host: self
627                        .host
628                        .ok_or_else(|| sql_err!("HOST option is required"))?,
629                    port,
630                    database: self
631                        .database
632                        .ok_or_else(|| sql_err!("DATABASE option is required"))?,
633                    user: self
634                        .user
635                        .ok_or_else(|| sql_err!("USER option is required"))?,
636                    password: self
637                        .password
638                        .ok_or_else(|| sql_err!("PASSWORD option is required"))
639                        .map(|pass| pass.into())?,
640                    tunnel,
641                    encryption,
642                    certificate_validation_policy,
643                    tls_root_cert: self.ssl_certificate_authority,
644                })
645            }
646            CreateConnectionType::IcebergCatalog => {
647                let catalog_type = self.catalog_type.clone().ok_or_else(|| {
648                    sql_err!("invalid CONNECTION: ICEBERG connections must specify CATALOG TYPE")
649                })?;
650
651                let uri: reqwest::Url = match &self.url {
652                    Some(url) => url
653                        .parse()
654                        .map_err(|e| sql_err!("parsing Iceberg catalog url: {e}"))?,
655                    None => sql_bail!("invalid CONNECTION: must specify URL"),
656                };
657
658                let warehouse = self.warehouse.clone();
659                let credential = self.credential.clone();
660                let aws_connection = get_aws_connection_reference(scx, &self)?;
661
662                let catalog = match catalog_type {
663                    IcebergCatalogType::S3TablesRest => {
664                        let Some(warehouse) = warehouse else {
665                            sql_bail!(
666                                "invalid CONNECTION: ICEBERG s3tablesrest connections must specify WAREHOUSE"
667                            );
668                        };
669                        let Some(aws_connection) = aws_connection else {
670                            sql_bail!(
671                                "invalid CONNECTION: ICEBERG s3tablesrest connections require an AWS connection"
672                            );
673                        };
674
675                        IcebergCatalogImpl::S3TablesRest(S3TablesRestIcebergCatalog {
676                            aws_connection,
677                            warehouse,
678                        })
679                    }
680                    IcebergCatalogType::Rest => {
681                        let Some(credential) = credential else {
682                            sql_bail!(
683                                "invalid CONNECTION: ICEBERG rest connections require a CREDENTIAL"
684                            );
685                        };
686
687                        IcebergCatalogImpl::Rest(RestIcebergCatalog {
688                            credential,
689                            scope: self.scope.clone(),
690                            warehouse,
691                        })
692                    }
693                };
694
695                ConnectionDetails::IcebergCatalog(IcebergCatalogConnection { catalog, uri })
696            }
697        };
698
699        Ok(connection)
700    }
701
702    pub fn get_brokers_and_rules(
703        &self,
704        scx: &StatementContext,
705    ) -> Result<
706        (
707            Vec<mz_storage_types::connections::KafkaBroker<ReferencedConnection>>,
708            Vec<KafkaMatchingBrokerRule<Aug>>,
709        ),
710        PlanError,
711    > {
712        // Collect all static broker sources and matching rules.
713        let mut all_brokers: Vec<KafkaBroker<Aug>> = vec![];
714        let mut matching_rules: Vec<KafkaMatchingBrokerRule<Aug>> = vec![];
715
716        // Check exclusions and extract brokers
717        match (&self.broker, &self.brokers, &self.aws_privatelink) {
718            // BROKER
719            (Some(broker), None, None) => all_brokers.extend(broker.iter().cloned()),
720            // BROKERS
721            (None, Some(broker_list), None) => {
722                all_brokers.extend(broker_list.static_entries.iter().cloned());
723                matching_rules.extend(broker_list.matching_rules.iter().cloned());
724            }
725            // AWS PRIVATELINK
726            (None, None, Some(_privatelink)) => {
727                // Noting to do here
728            }
729            // noting - invalid
730            (None, None, None) => {
731                sql_bail!("invalid CONNECTION: must set one of BROKER, BROKERS, or AWS PRIVATELINK")
732            }
733            // exclusive keywords provided
734            _ => sql_bail!(
735                "invalid CONNECTION: can only set one of BROKER, BROKERS, or AWS PRIVATELINK"
736            ),
737        };
738
739        // MATCHING rules require at least one static broker for bootstrapping.
740        if !matching_rules.is_empty() && all_brokers.is_empty() {
741            sql_bail!(
742                "invalid CONNECTION: BROKERS must contain at least one static broker address"
743            );
744        }
745
746        // NOTE: we allow broker configurations to be mixed and matched. If/when we support
747        // a top-level `SSH TUNNEL` configuration, we will need additional assertions.
748        let mut out = vec![];
749        for broker in &all_brokers {
750            if broker.address.contains(',') {
751                sql_bail!(
752                    "invalid CONNECTION: cannot specify multiple Kafka broker addresses in one string.\n\nInstead, specify BROKERS using multiple strings, e.g. BROKERS ('kafka:9092', 'kafka:9093')"
753                );
754            }
755
756            let tunnel = match &broker.tunnel {
757                KafkaBrokerTunnel::Direct => Tunnel::Direct,
758                KafkaBrokerTunnel::AwsPrivatelink(aws_privatelink) => {
759                    Tunnel::AwsPrivatelink(plan_privatelink(scx, aws_privatelink)?)
760                }
761                KafkaBrokerTunnel::SshTunnel(ssh) => {
762                    let id = match &ssh {
763                        ResolvedItemName::Item { id, .. } => id,
764                        _ => sql_bail!(
765                            "internal error: Kafka SSH tunnel connection was not resolved"
766                        ),
767                    };
768                    let ssh_tunnel = scx.catalog.get_item(id);
769                    match ssh_tunnel.connection()? {
770                        Connection::Ssh(_connection) => Tunnel::Ssh(SshTunnel {
771                            connection_id: *id,
772                            connection: *id,
773                        }),
774                        _ => {
775                            sql_bail!("{} is not an SSH connection", ssh_tunnel.name().item)
776                        }
777                    }
778                }
779            };
780
781            out.push(mz_storage_types::connections::KafkaBroker {
782                address: broker.address.clone(),
783                tunnel,
784            });
785        }
786
787        Ok((out, matching_rules))
788    }
789}
790
791fn get_aws_connection_reference(
792    scx: &StatementContext,
793    conn_options: &ConnectionOptionExtracted,
794) -> Result<Option<AwsConnectionReference<ReferencedConnection>>, PlanError> {
795    let Some(aws_connection_id) = conn_options.aws_connection else {
796        return Ok(None);
797    };
798
799    let id = CatalogItemId::from(aws_connection_id);
800    let item = scx.catalog.get_item(&id);
801    Ok(match item.connection()? {
802        Connection::Aws(_) => Some(AwsConnectionReference {
803            connection_id: id,
804            connection: id,
805        }),
806        _ => sql_bail!("{} is not an AWS connection", item.name().item),
807    })
808}
809
810fn plan_kafka_security(
811    scx: &StatementContext,
812    v: &ConnectionOptionExtracted,
813) -> Result<
814    (
815        Option<KafkaTlsConfig>,
816        Option<KafkaSaslConfig<ReferencedConnection>>,
817    ),
818    PlanError,
819> {
820    const SASL_CONFIGS: [ConnectionOptionName; 4] = [
821        ConnectionOptionName::AwsConnection,
822        ConnectionOptionName::SaslMechanisms,
823        ConnectionOptionName::SaslUsername,
824        ConnectionOptionName::SaslPassword,
825    ];
826
827    const ALL_CONFIGS: [ConnectionOptionName; 7] = concat_arrays!(
828        [
829            ConnectionOptionName::SslKey,
830            ConnectionOptionName::SslCertificate,
831            ConnectionOptionName::SslCertificateAuthority,
832        ],
833        SASL_CONFIGS
834    );
835
836    enum SecurityProtocol {
837        Plaintext,
838        Ssl,
839        SaslPlaintext,
840        SaslSsl,
841    }
842
843    let security_protocol = v.security_protocol.as_ref().map(|v| v.to_uppercase());
844    let security_protocol = match security_protocol.as_deref() {
845        Some("PLAINTEXT") => SecurityProtocol::Plaintext,
846        Some("SSL") => SecurityProtocol::Ssl,
847        Some("SASL_PLAINTEXT") => SecurityProtocol::SaslPlaintext,
848        Some("SASL_SSL") => SecurityProtocol::SaslSsl,
849        Some(p) => sql_bail!("unknown security protocol: {}", p),
850        // To be secure by default, if no security protocol is explicitly
851        // specified, we always choose one of the SSL-enabled protocols, using
852        // the presence of any SASL options to guide us between them. Users must
853        // explicitly choose a plaintext mechanism if that's what they want.
854        None if SASL_CONFIGS.iter().any(|c| v.seen.contains(c)) => SecurityProtocol::SaslSsl,
855        None => SecurityProtocol::Ssl,
856    };
857
858    let mut outstanding = ALL_CONFIGS
859        .into_iter()
860        .filter(|c| v.seen.contains(c))
861        .collect::<BTreeSet<ConnectionOptionName>>();
862
863    let tls = match security_protocol {
864        SecurityProtocol::Ssl | SecurityProtocol::SaslSsl => {
865            outstanding.remove(&ConnectionOptionName::SslCertificate);
866            let identity = match &v.ssl_certificate {
867                None => None,
868                Some(cert) => {
869                    outstanding.remove(&ConnectionOptionName::SslKey);
870                    let Some(key) = &v.ssl_key else {
871                        sql_bail!("SSL KEY must be specified with SSL CERTIFICATE");
872                    };
873                    Some(TlsIdentity {
874                        cert: cert.clone(),
875                        key: (*key).into(),
876                    })
877                }
878            };
879            outstanding.remove(&ConnectionOptionName::SslCertificateAuthority);
880            Some(KafkaTlsConfig {
881                identity,
882                root_cert: v.ssl_certificate_authority.clone(),
883            })
884        }
885        _ => None,
886    };
887
888    let sasl = match security_protocol {
889        SecurityProtocol::SaslPlaintext | SecurityProtocol::SaslSsl => {
890            outstanding.remove(&ConnectionOptionName::AwsConnection);
891            match get_aws_connection_reference(scx, v)? {
892                Some(aws) => Some(KafkaSaslConfig {
893                    mechanism: "OAUTHBEARER".into(),
894                    username: "".into(),
895                    password: None,
896                    aws: Some(aws),
897                }),
898                None => {
899                    outstanding.remove(&ConnectionOptionName::SaslMechanisms);
900                    outstanding.remove(&ConnectionOptionName::SaslUsername);
901                    outstanding.remove(&ConnectionOptionName::SaslPassword);
902                    // TODO(benesch): support a less confusing `SASL MECHANISM`
903                    // alias, as only a single mechanism that can be specified.
904                    let Some(mechanism) = &v.sasl_mechanisms else {
905                        sql_bail!("SASL MECHANISMS must be specified");
906                    };
907                    let Some(username) = &v.sasl_username else {
908                        sql_bail!("SASL USERNAME must be specified");
909                    };
910                    let Some(password) = &v.sasl_password else {
911                        sql_bail!("SASL PASSWORD must be specified");
912                    };
913                    Some(KafkaSaslConfig {
914                        // librdkafka requires SASL mechanisms to be upper case (PLAIN,
915                        // SCRAM-SHA-256). For usability, we automatically uppercase the
916                        // mechanism that user provides. This avoids a frustrating
917                        // interaction with identifier case folding. Consider `SASL
918                        // MECHANISMS = PLAIN`. Identifier case folding results in a
919                        // SASL mechanism of `plain` (note the lowercase), which
920                        // Materialize previously rejected with an error of "SASL
921                        // mechanism must be uppercase." This was deeply frustarting for
922                        // users who were not familiar with identifier case folding
923                        // rules. See database-issues#6693.
924                        mechanism: mechanism.to_uppercase(),
925                        username: username.clone(),
926                        password: Some((*password).into()),
927                        aws: None,
928                    })
929                }
930            }
931        }
932        _ => None,
933    };
934
935    if let Some(outstanding) = outstanding.first() {
936        sql_bail!("option {outstanding} not supported with this configuration");
937    }
938
939    Ok((tls, sasl))
940}
941pub fn plan_default_privatelink(
942    scx: &StatementContext,
943    pl: &mz_sql_parser::ast::ConnectionDefaultAwsPrivatelink<Aug>,
944) -> Result<AwsPrivatelink, PlanError> {
945    let id = pl.connection.item_id().clone();
946    let entry = scx.catalog.get_item(&id);
947    match entry.connection()? {
948        Connection::AwsPrivatelink(_) => Ok(AwsPrivatelink {
949            connection_id: id,
950            // By default we do not specify an availability zone for the tunnel.
951            availability_zone: None,
952            // We always use the port as specified by the top-level connection.
953            port: pl.port,
954        }),
955        _ => sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item),
956    }
957}
958
959pub fn plan_privatelink(
960    scx: &StatementContext,
961    pl: &mz_sql_parser::ast::KafkaBrokerAwsPrivatelink<Aug>,
962) -> Result<AwsPrivatelink, PlanError> {
963    let KafkaBrokerAwsPrivatelinkOptionExtracted {
964        availability_zone,
965        port,
966        seen: _,
967    } = KafkaBrokerAwsPrivatelinkOptionExtracted::try_from(pl.options.clone())?;
968
969    let id = match &pl.connection {
970        ResolvedItemName::Item { id, .. } => id,
971        _ => sql_bail!("internal error: Kafka PrivateLink connection was not resolved"),
972    };
973    let entry = scx.catalog.get_item(id);
974    match entry.connection()? {
975        Connection::AwsPrivatelink(connection) => {
976            if let Some(az) = &availability_zone {
977                if !connection.availability_zones.contains(az) {
978                    sql_bail!(
979                        "AWS PrivateLink availability zone {} does not match any of the \
980                                      availability zones on the AWS PrivateLink connection {}",
981                        az.quoted(),
982                        scx.catalog
983                            .resolve_full_name(entry.name())
984                            .to_string()
985                            .quoted()
986                    )
987                }
988            }
989            Ok(AwsPrivatelink {
990                connection_id: *id,
991                availability_zone,
992                port,
993            })
994        }
995        _ => {
996            sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item)
997        }
998    }
999}
1000
1001pub(crate) fn build_tunnel_definition(
1002    scx: &StatementContext,
1003    ssh_tunnel: Option<with_options::Object>,
1004    aws_privatelink: Option<ConnectionDefaultAwsPrivatelink<Aug>>,
1005    matching_rules: Option<Vec<KafkaMatchingBrokerRule<Aug>>>,
1006) -> Result<Tunnel<ReferencedConnection>, PlanError> {
1007    Ok(match (ssh_tunnel, aws_privatelink, matching_rules) {
1008        (None, None, None) => Tunnel::Direct,
1009        (Some(ssh_tunnel), None, None) => {
1010            let id = CatalogItemId::from(ssh_tunnel);
1011            let ssh_tunnel = scx.catalog.get_item(&id);
1012            match ssh_tunnel.connection()? {
1013                Connection::Ssh(_connection) => Tunnel::Ssh(SshTunnel {
1014                    connection_id: id,
1015                    connection: id,
1016                }),
1017                _ => sql_bail!("{} is not an SSH connection", ssh_tunnel.name().item),
1018            }
1019        }
1020        (None, Some(aws_privatelink), None) => {
1021            Tunnel::AwsPrivatelink(plan_default_privatelink(scx, &aws_privatelink)?)
1022        }
1023        (None, None, Some(rules)) => {
1024            if rules.is_empty() {
1025                sql_bail!("BROKERS MATCHING rules list cannot be empty");
1026            }
1027
1028            let rules = rules
1029                .iter()
1030                .map(|rule| {
1031                    Ok(AwsPrivatelinkRule {
1032                        pattern: rule.pattern.clone(),
1033                        to: plan_privatelink(scx, &rule.tunnel)?,
1034                    })
1035                })
1036                .collect::<Result<Vec<_>, PlanError>>()?;
1037            Tunnel::AwsPrivatelinks(mz_storage_types::connections::AwsPrivatelinks { rules })
1038        }
1039        _ => {
1040            sql_bail!("cannot specify both SSH TUNNEL and AWS PRIVATELINK");
1041        }
1042    })
1043}