Skip to main content

mz_sql/plan/statement/ddl/
connection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data definition language (DDL) utilities for CONNECTION objects.
11
12use std::collections::{BTreeMap, BTreeSet};
13
14use anyhow::Context;
15use array_concat::concat_arrays;
16use itertools::Itertools;
17use maplit::btreemap;
18use mz_ore::num::NonNeg;
19use mz_ore::str::StrExt;
20use mz_repr::CatalogItemId;
21use mz_sql_parser::ast::ConnectionOptionName::*;
22use mz_sql_parser::ast::display::AstDisplay;
23use mz_sql_parser::ast::{
24    ConnectionDefaultAwsPrivatelink, ConnectionOption, ConnectionOptionName, CreateConnectionType,
25    KafkaBroker, KafkaBrokerAwsPrivatelinkOption, KafkaBrokerAwsPrivatelinkOptionName,
26    KafkaBrokerTunnel, KafkaMatchingBrokerRule,
27};
28use mz_ssh_util::keys::SshKeyPair;
29use mz_storage_types::connections::aws::{
30    AwsAssumeRole, AwsAuth, AwsConnection, AwsConnectionReference, AwsCredentials,
31};
32use mz_storage_types::connections::inline::ReferencedConnection;
33use mz_storage_types::connections::string_or_secret::StringOrSecret;
34use mz_storage_types::connections::{
35    AwsPrivatelink, AwsPrivatelinkConnection, AwsPrivatelinkRule, CsrConnection,
36    CsrConnectionHttpAuth, IcebergCatalogConnection, IcebergCatalogImpl, IcebergCatalogType,
37    KafkaConnection, KafkaSaslConfig, KafkaTlsConfig, KafkaTopicOptions, MySqlConnection,
38    MySqlSslMode, PostgresConnection, RestIcebergCatalog, S3TablesRestIcebergCatalog,
39    SqlServerConnectionDetails, SshConnection, SshTunnel, TlsIdentity, Tunnel,
40};
41
42use crate::names::Aug;
43use crate::plan::statement::{Connection, ResolvedItemName};
44use crate::plan::with_options::{self};
45use crate::plan::{ConnectionDetails, PlanError, SshKey, StatementContext};
46use crate::session::vars::{self, ENABLE_AWS_MSK_IAM_AUTH};
47
48generate_extracted_config!(
49    ConnectionOption,
50    (AccessKeyId, StringOrSecret),
51    (AssumeRoleArn, String),
52    (AssumeRoleSessionName, String),
53    (AvailabilityZones, Vec<String>),
54    (AwsConnection, with_options::Object),
55    (AwsPrivatelink, ConnectionDefaultAwsPrivatelink<Aug>),
56    (Broker, Vec<KafkaBroker<Aug>>),
57    (Brokers, with_options::BrokersList),
58    (Credential, StringOrSecret),
59    (Database, String),
60    (Endpoint, String),
61    (Host, String),
62    (Password, with_options::Secret),
63    (Port, u16),
64    (ProgressTopic, String),
65    (ProgressTopicReplicationFactor, i32),
66    (PublicKey1, String),
67    (PublicKey2, String),
68    (Region, String),
69    (SaslMechanisms, String),
70    (SaslPassword, with_options::Secret),
71    (SaslUsername, StringOrSecret),
72    (Scope, String),
73    (SecretAccessKey, with_options::Secret),
74    (SecurityProtocol, String),
75    (ServiceName, String),
76    (SshTunnel, with_options::Object),
77    (SslCertificate, StringOrSecret),
78    (SslCertificateAuthority, StringOrSecret),
79    (SslKey, with_options::Secret),
80    (SslMode, String),
81    (SessionToken, StringOrSecret),
82    (CatalogType, IcebergCatalogType),
83    (Url, String),
84    (User, StringOrSecret),
85    (Warehouse, String)
86);
87
88generate_extracted_config!(
89    KafkaBrokerAwsPrivatelinkOption,
90    (AvailabilityZone, String),
91    (Port, u16)
92);
93
94/// Options which cannot be changed using ALTER CONNECTION.
95pub(crate) const INALTERABLE_OPTIONS: &[ConnectionOptionName] =
96    &[ProgressTopic, ProgressTopicReplicationFactor];
97
98/// Options of which only one may be specified.
99pub(crate) const MUTUALLY_EXCLUSIVE_SETS: &[&[ConnectionOptionName]] = &[&[Broker, Brokers]];
100
101pub(super) fn validate_options_per_connection_type(
102    t: CreateConnectionType,
103    mut options: BTreeSet<ConnectionOptionName>,
104) -> Result<(), PlanError> {
105    use mz_sql_parser::ast::ConnectionOptionName::*;
106    let permitted_options = match t {
107        CreateConnectionType::Aws => [
108            AccessKeyId,
109            SecretAccessKey,
110            SessionToken,
111            Endpoint,
112            Region,
113            AssumeRoleArn,
114            AssumeRoleSessionName,
115        ]
116        .as_slice(),
117        CreateConnectionType::AwsPrivatelink => &[AvailabilityZones, Port, ServiceName],
118        CreateConnectionType::Csr => &[
119            AwsPrivatelink,
120            Password,
121            Port,
122            SshTunnel,
123            SslCertificate,
124            SslCertificateAuthority,
125            SslKey,
126            Url,
127            User,
128        ],
129        CreateConnectionType::Kafka => &[
130            AwsConnection,
131            Broker,
132            Brokers,
133            ProgressTopic,
134            ProgressTopicReplicationFactor,
135            AwsPrivatelink,
136            SshTunnel,
137            SslKey,
138            SslCertificate,
139            SslCertificateAuthority,
140            SaslMechanisms,
141            SaslUsername,
142            SaslPassword,
143            SecurityProtocol,
144        ],
145        CreateConnectionType::Postgres => &[
146            AwsPrivatelink,
147            Database,
148            Host,
149            Password,
150            Port,
151            SshTunnel,
152            SslCertificate,
153            SslCertificateAuthority,
154            SslKey,
155            SslMode,
156            User,
157        ],
158        CreateConnectionType::Ssh => &[Host, Port, User, PublicKey1, PublicKey2],
159        CreateConnectionType::MySql => &[
160            AwsPrivatelink,
161            Host,
162            Password,
163            Port,
164            SshTunnel,
165            SslCertificate,
166            SslCertificateAuthority,
167            SslKey,
168            SslMode,
169            User,
170            AwsConnection,
171        ],
172        CreateConnectionType::SqlServer => &[
173            AwsPrivatelink,
174            Database,
175            Host,
176            Password,
177            Port,
178            SshTunnel,
179            SslCertificate,
180            SslCertificateAuthority,
181            SslKey,
182            SslMode,
183            User,
184        ],
185        CreateConnectionType::IcebergCatalog => &[
186            AwsConnection,
187            CatalogType,
188            Credential,
189            Scope,
190            Url,
191            Warehouse,
192        ],
193    };
194
195    for o in permitted_options {
196        options.remove(o);
197    }
198
199    if !options.is_empty() {
200        sql_bail!(
201            "{} connections do not support {} values",
202            t,
203            options.iter().join(", ")
204        )
205    }
206
207    Ok(())
208}
209
210impl ConnectionOptionExtracted {
211    pub(super) fn ensure_only_valid_options(
212        &self,
213        t: CreateConnectionType,
214    ) -> Result<(), PlanError> {
215        validate_options_per_connection_type(t, self.seen.clone())
216    }
217
218    pub fn try_into_connection_details(
219        self,
220        scx: &StatementContext,
221        connection_type: CreateConnectionType,
222    ) -> Result<ConnectionDetails, PlanError> {
223        self.ensure_only_valid_options(connection_type)?;
224
225        let connection: ConnectionDetails = match connection_type {
226            CreateConnectionType::Aws => {
227                let credentials = match (
228                    self.access_key_id,
229                    self.secret_access_key,
230                    self.session_token,
231                ) {
232                    (Some(access_key_id), Some(secret_access_key), session_token) => {
233                        Some(AwsCredentials {
234                            access_key_id,
235                            secret_access_key: secret_access_key.into(),
236                            session_token,
237                        })
238                    }
239                    (None, None, None) => None,
240                    _ => {
241                        sql_bail!(
242                            "must specify both ACCESS KEY ID and SECRET ACCESS KEY with optional SESSION TOKEN"
243                        );
244                    }
245                };
246
247                let assume_role = match (self.assume_role_arn, self.assume_role_session_name) {
248                    (Some(arn), session_name) => Some(AwsAssumeRole { arn, session_name }),
249                    (None, Some(_)) => {
250                        sql_bail!(
251                            "must specify ASSUME ROLE ARN with optional ASSUME ROLE SESSION NAME"
252                        );
253                    }
254                    _ => None,
255                };
256
257                let auth = match (credentials, assume_role) {
258                    (None, None) => sql_bail!(
259                        "must specify either ASSUME ROLE ARN or ACCESS KEY ID and SECRET ACCESS KEY"
260                    ),
261                    (Some(credentials), None) => AwsAuth::Credentials(credentials),
262                    (None, Some(assume_role)) => AwsAuth::AssumeRole(assume_role),
263                    (Some(_), Some(_)) => {
264                        sql_bail!("cannot specify both ACCESS KEY ID and ASSUME ROLE ARN");
265                    }
266                };
267
268                ConnectionDetails::Aws(AwsConnection {
269                    auth,
270                    endpoint: match self.endpoint {
271                        // TODO(benesch): this should not treat an empty endpoint as equivalent to a `NULL`
272                        // endpoint, but making that change now would break testdrive. AWS connections are
273                        // all behind feature flags mode right now, so no particular urgency to correct
274                        // this.
275                        Some(endpoint) if !endpoint.is_empty() => Some(endpoint),
276                        _ => None,
277                    },
278                    region: self.region,
279                })
280            }
281            CreateConnectionType::AwsPrivatelink => {
282                let connection = AwsPrivatelinkConnection {
283                    service_name: self
284                        .service_name
285                        .ok_or_else(|| sql_err!("SERVICE NAME option is required"))?,
286                    availability_zones: self
287                        .availability_zones
288                        .ok_or_else(|| sql_err!("AVAILABILITY ZONES option is required"))?,
289                };
290                if let Some(supported_azs) = scx.catalog.aws_privatelink_availability_zones() {
291                    let mut unique_azs: BTreeSet<String> = BTreeSet::new();
292                    let mut duplicate_azs: BTreeSet<String> = BTreeSet::new();
293                    // Validate each AZ is supported
294                    for connection_az in &connection.availability_zones {
295                        if unique_azs.contains(connection_az) {
296                            duplicate_azs.insert(connection_az.to_string());
297                        } else {
298                            unique_azs.insert(connection_az.to_string());
299                        }
300                        if !supported_azs.contains(connection_az) {
301                            return Err(PlanError::InvalidPrivatelinkAvailabilityZone {
302                                name: connection_az.to_string(),
303                                supported_azs,
304                            });
305                        }
306                    }
307                    if duplicate_azs.len() > 0 {
308                        return Err(PlanError::DuplicatePrivatelinkAvailabilityZone {
309                            duplicate_azs,
310                        });
311                    }
312                }
313                ConnectionDetails::AwsPrivatelink(connection)
314            }
315            CreateConnectionType::Kafka => {
316                let (tls, sasl) = plan_kafka_security(scx, &self)?;
317                let (static_brokers, matching_rules) = self.get_brokers_and_rules(scx)?;
318
319                if !matching_rules.is_empty() {
320                    scx.require_feature_flag(&vars::ENABLE_KAFKA_BROKER_MATCHING_RULES)?;
321                }
322
323                ConnectionDetails::Kafka(KafkaConnection {
324                    brokers: static_brokers,
325                    default_tunnel: build_tunnel_definition(
326                            scx,
327                            self.ssh_tunnel,
328                            self.aws_privatelink,
329                            if matching_rules.is_empty() { None } else { Some(matching_rules) },
330                        )?,
331                    progress_topic: self.progress_topic,
332                    progress_topic_options: KafkaTopicOptions {
333                        // We only allow configuring the progress topic replication factor for now.
334                        // For correctness, the partition count MUST be one and for performance the compaction
335                        // policy MUST be enabled.
336                        partition_count: Some(NonNeg::try_from(1).expect("1 is positive")),
337                        replication_factor: self.progress_topic_replication_factor.map(|val| {
338                            if val <= 0 {
339                                Err(sql_err!("invalid CONNECTION: PROGRESS TOPIC REPLICATION FACTOR must be greater than 0"))?
340                            }
341                            NonNeg::try_from(val).map_err(|e| sql_err!("{e}"))
342                        }).transpose()?,
343                        topic_config: btreemap! {
344                            "cleanup.policy".to_string() => "compact".to_string(),
345                            "segment.bytes".to_string() => "134217728".to_string(), // 128 MiB
346                        },
347                    },
348                    options: BTreeMap::new(),
349                    tls,
350                    sasl,
351                })
352            }
353            CreateConnectionType::Csr => {
354                let url: reqwest::Url = match self.url {
355                    Some(url) => url
356                        .parse()
357                        .map_err(|e| sql_err!("parsing schema registry url: {e}"))?,
358                    None => sql_bail!("invalid CONNECTION: must specify URL"),
359                };
360                let _ = url
361                    .host_str()
362                    .ok_or_else(|| sql_err!("invalid CONNECTION: URL must specify domain name"))?;
363                if url.path() != "/" {
364                    sql_bail!("invalid CONNECTION: URL must have an empty path");
365                }
366                let cert = self.ssl_certificate;
367                let key = self.ssl_key.map(|secret| secret.into());
368                let tls_identity = match (cert, key) {
369                    (None, None) => None,
370                    (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
371                    _ => sql_bail!(
372                        "invalid CONNECTION: reading from SSL-auth Confluent Schema Registry requires both SSL KEY and SSL CERTIFICATE"
373                    ),
374                };
375                let http_auth = self.user.map(|username| CsrConnectionHttpAuth {
376                    username,
377                    password: self.password.map(|secret| secret.into()),
378                });
379
380                // TODO we should move to self.port being unsupported if aws_privatelink is some, see <https://github.com/MaterializeInc/database-issues/issues/7359#issuecomment-1925443977>
381                if let Some(privatelink) = self.aws_privatelink.as_ref() {
382                    if privatelink.port.is_some() {
383                        sql_bail!(
384                            "invalid CONNECTION: CONFLUENT SCHEMA REGISTRY does not support PORT for AWS PRIVATELINK"
385                        )
386                    }
387                }
388                let tunnel = build_tunnel_definition(
389                    scx,
390                    self.ssh_tunnel,
391                    self.aws_privatelink,
392                    None, /* Rule-based PrivateLink is not supported for CSR. */
393                )?;
394
395                ConnectionDetails::Csr(CsrConnection {
396                    url,
397                    tls_root_cert: self.ssl_certificate_authority,
398                    tls_identity,
399                    http_auth,
400                    tunnel,
401                })
402            }
403            CreateConnectionType::Postgres => {
404                let cert = self.ssl_certificate;
405                let key = self.ssl_key.map(|secret| secret.into());
406                let tls_identity = match (cert, key) {
407                    (None, None) => None,
408                    (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
409                    _ => sql_bail!(
410                        "invalid CONNECTION: both SSL KEY and SSL CERTIFICATE are required"
411                    ),
412                };
413                let tls_mode = match self.ssl_mode.as_ref().map(|m| m.as_str()) {
414                    None | Some("disable") => tokio_postgres::config::SslMode::Disable,
415                    // "prefer" intentionally omitted because it has dubious security
416                    // properties.
417                    Some("require") | Some("required") => tokio_postgres::config::SslMode::Require,
418                    Some("verify_ca") | Some("verify-ca") => {
419                        tokio_postgres::config::SslMode::VerifyCa
420                    }
421                    Some("verify_full") | Some("verify-full") => {
422                        tokio_postgres::config::SslMode::VerifyFull
423                    }
424                    Some(m) => sql_bail!("invalid CONNECTION: unknown SSL MODE {}", m.quoted()),
425                };
426
427                // TODO we should move to self.port being unsupported if aws_privatelink is some, see <https://github.com/MaterializeInc/database-issues/issues/7359#issuecomment-1925443977>
428                if let Some(privatelink) = self.aws_privatelink.as_ref() {
429                    if privatelink.port.is_some() {
430                        sql_bail!(
431                            "invalid CONNECTION: POSTGRES does not support PORT for AWS PRIVATELINK"
432                        )
433                    }
434                }
435                let tunnel = build_tunnel_definition(
436                    scx,
437                    self.ssh_tunnel,
438                    self.aws_privatelink,
439                    None, /* Rule-based PrivateLink is not supported for Postgres. */
440                )?;
441
442                ConnectionDetails::Postgres(PostgresConnection {
443                    database: self
444                        .database
445                        .ok_or_else(|| sql_err!("DATABASE option is required"))?,
446                    password: self.password.map(|password| password.into()),
447                    host: self
448                        .host
449                        .ok_or_else(|| sql_err!("HOST option is required"))?,
450                    port: self.port.unwrap_or(5432_u16),
451                    tunnel,
452                    tls_mode,
453                    tls_root_cert: self.ssl_certificate_authority,
454                    tls_identity,
455                    user: self
456                        .user
457                        .ok_or_else(|| sql_err!("USER option is required"))?,
458                })
459            }
460            CreateConnectionType::Ssh => {
461                let ensure_key = |public_key| match public_key {
462                    Some(public_key) => Ok::<_, anyhow::Error>(SshKey::PublicOnly(public_key)),
463                    None => {
464                        let key = SshKeyPair::new().context("creating SSH key")?;
465                        Ok(SshKey::Both(key))
466                    }
467                };
468                ConnectionDetails::Ssh {
469                    connection: SshConnection {
470                        host: self
471                            .host
472                            .ok_or_else(|| sql_err!("HOST option is required"))?,
473                        port: self.port.unwrap_or(22_u16),
474                        user: match self
475                            .user
476                            .ok_or_else(|| sql_err!("USER option is required"))?
477                        {
478                            StringOrSecret::String(user) => user,
479                            StringOrSecret::Secret(_) => {
480                                sql_bail!(
481                                    "SSH connections do not support supplying USER value as SECRET"
482                                )
483                            }
484                        },
485                    },
486                    key_1: ensure_key(self.public_key1)?,
487                    key_2: ensure_key(self.public_key2)?,
488                }
489            }
490            CreateConnectionType::MySql => {
491                let aws_connection = get_aws_connection_reference(scx, &self)?;
492                if aws_connection.is_some() && self.password.is_some() {
493                    sql_bail!(
494                        "invalid CONNECTION: AWS IAM authentication is not supported with password"
495                    );
496                }
497
498                let cert = self.ssl_certificate;
499                let key = self.ssl_key.map(|secret| secret.into());
500                let tls_identity = match (cert, key) {
501                    (None, None) => None,
502                    (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
503                    _ => sql_bail!(
504                        "invalid CONNECTION: both SSL KEY and SSL CERTIFICATE are required"
505                    ),
506                };
507                // Accepts the same SSL Mode values as the MySQL Client
508                // https://dev.mysql.com/doc/refman/8.0/en/connection-options.html#option_general_ssl-mode
509                let tls_mode = match self
510                    .ssl_mode
511                    .map(|f| f.to_uppercase())
512                    .as_ref()
513                    .map(|m| m.as_str())
514                {
515                    None | Some("DISABLED") => {
516                        if aws_connection.is_some() {
517                            sql_bail!(
518                                "invalid CONNECTION: AWS IAM authentication requires SSL to be enabled"
519                            )
520                        }
521                        MySqlSslMode::Disabled
522                    }
523                    // "preferred" intentionally omitted because it has dubious security
524                    // properties.
525                    Some("REQUIRED") | Some("REQUIRE") => MySqlSslMode::Required,
526                    Some("VERIFY_CA") | Some("VERIFY-CA") => MySqlSslMode::VerifyCa,
527                    Some("VERIFY_IDENTITY") | Some("VERIFY-IDENTITY") => {
528                        MySqlSslMode::VerifyIdentity
529                    }
530                    Some(m) => sql_bail!("invalid CONNECTION: unknown SSL MODE {}", m.quoted()),
531                };
532
533                // TODO we should move to self.port being unsupported if aws_privatelink is some, see <https://github.com/MaterializeInc/database-issues/issues/7359#issuecomment-1925443977>
534                if let Some(privatelink) = self.aws_privatelink.as_ref() {
535                    if privatelink.port.is_some() {
536                        sql_bail!(
537                            "invalid CONNECTION: MYSQL does not support PORT for AWS PRIVATELINK"
538                        )
539                    }
540                }
541                let tunnel = build_tunnel_definition(
542                    scx,
543                    self.ssh_tunnel,
544                    self.aws_privatelink,
545                    None, /* Rule-based PrivateLink is not supported for MySQL. */
546                )?;
547
548                ConnectionDetails::MySql(MySqlConnection {
549                    password: self.password.map(|password| password.into()),
550                    host: self
551                        .host
552                        .ok_or_else(|| sql_err!("HOST option is required"))?,
553                    port: self.port.unwrap_or(3306_u16),
554                    tunnel,
555                    tls_mode,
556                    tls_root_cert: self.ssl_certificate_authority,
557                    tls_identity,
558                    user: self
559                        .user
560                        .ok_or_else(|| sql_err!("USER option is required"))?,
561                    aws_connection,
562                })
563            }
564            CreateConnectionType::SqlServer => {
565                scx.require_feature_flag(&vars::ENABLE_SQL_SERVER_SOURCE)?;
566
567                let aws_connection = get_aws_connection_reference(scx, &self)?;
568                if aws_connection.is_some() && self.password.is_some() {
569                    sql_bail!(
570                        "invalid CONNECTION: AWS IAM authentication is not supported with password"
571                    );
572                }
573
574                let (encryption, certificate_validation_policy) = match self
575                    .ssl_mode
576                    .map(|mode| mode.to_uppercase())
577                    .as_ref()
578                    .map(|mode| mode.as_str())
579                {
580                    None | Some("DISABLED") => (
581                        mz_sql_server_util::config::EncryptionLevel::None,
582                        mz_sql_server_util::config::CertificateValidationPolicy::TrustAll,
583                    ),
584                    Some("REQUIRED") => (
585                        mz_sql_server_util::config::EncryptionLevel::Required,
586                        mz_sql_server_util::config::CertificateValidationPolicy::TrustAll,
587                    ),
588                    Some("VERIFY") => (
589                        mz_sql_server_util::config::EncryptionLevel::Required,
590                        mz_sql_server_util::config::CertificateValidationPolicy::VerifySystem,
591                    ),
592                    Some("VERIFY_CA") => {
593                        if self.ssl_certificate_authority.is_none() {
594                            sql_bail!(
595                                "invalid CONNECTION: SSL MODE 'verify_ca' requires SSL CERTIFICATE AUTHORITY"
596                            );
597                        }
598                        (
599                            mz_sql_server_util::config::EncryptionLevel::Required,
600                            mz_sql_server_util::config::CertificateValidationPolicy::VerifyCA,
601                        )
602                    }
603                    Some(mode) => {
604                        sql_bail!("invalid CONNECTION: unknown SSL MODE {}", mode.quoted())
605                    }
606                };
607
608                if let Some(privatelink) = self.aws_privatelink.as_ref() {
609                    if privatelink.port.is_some() {
610                        sql_bail!(
611                            "invalid CONNECTION: SQL SERVER does not support PORT for AWS PRIVATELINK"
612                        )
613                    }
614                }
615
616                // 1433 is the default port for SQL Server instances running over TCP.
617                //
618                // See: <https://learn.microsoft.com/en-us/sql/database-engine/configure-windows/configure-a-server-to-listen-on-a-specific-tcp-port?view=sql-server-ver16>
619                let port = self.port.unwrap_or(1433_u16);
620                let tunnel = build_tunnel_definition(
621                    scx,
622                    self.ssh_tunnel,
623                    self.aws_privatelink,
624                    None, /* Rule-based PrivateLink is not supported for SQL Server. */
625                )?;
626
627                ConnectionDetails::SqlServer(SqlServerConnectionDetails {
628                    host: self
629                        .host
630                        .ok_or_else(|| sql_err!("HOST option is required"))?,
631                    port,
632                    database: self
633                        .database
634                        .ok_or_else(|| sql_err!("DATABASE option is required"))?,
635                    user: self
636                        .user
637                        .ok_or_else(|| sql_err!("USER option is required"))?,
638                    password: self
639                        .password
640                        .ok_or_else(|| sql_err!("PASSWORD option is required"))
641                        .map(|pass| pass.into())?,
642                    tunnel,
643                    encryption,
644                    certificate_validation_policy,
645                    tls_root_cert: self.ssl_certificate_authority,
646                })
647            }
648            CreateConnectionType::IcebergCatalog => {
649                let catalog_type = self.catalog_type.clone().ok_or_else(|| {
650                    sql_err!("invalid CONNECTION: ICEBERG connections must specify CATALOG TYPE")
651                })?;
652
653                let uri: reqwest::Url = match &self.url {
654                    Some(url) => url
655                        .parse()
656                        .map_err(|e| sql_err!("parsing Iceberg catalog url: {e}"))?,
657                    None => sql_bail!("invalid CONNECTION: must specify URL"),
658                };
659
660                let warehouse = self.warehouse.clone();
661                let credential = self.credential.clone();
662                let aws_connection = get_aws_connection_reference(scx, &self)?;
663
664                let catalog = match catalog_type {
665                    IcebergCatalogType::S3TablesRest => {
666                        let Some(warehouse) = warehouse else {
667                            sql_bail!(
668                                "invalid CONNECTION: ICEBERG s3tablesrest connections must specify WAREHOUSE"
669                            );
670                        };
671                        let Some(aws_connection) = aws_connection else {
672                            sql_bail!(
673                                "invalid CONNECTION: ICEBERG s3tablesrest connections require an AWS connection"
674                            );
675                        };
676
677                        IcebergCatalogImpl::S3TablesRest(S3TablesRestIcebergCatalog {
678                            aws_connection,
679                            warehouse,
680                        })
681                    }
682                    IcebergCatalogType::Rest => {
683                        let Some(credential) = credential else {
684                            sql_bail!(
685                                "invalid CONNECTION: ICEBERG rest connections require a CREDENTIAL"
686                            );
687                        };
688
689                        IcebergCatalogImpl::Rest(RestIcebergCatalog {
690                            credential,
691                            scope: self.scope.clone(),
692                            warehouse,
693                        })
694                    }
695                };
696
697                ConnectionDetails::IcebergCatalog(IcebergCatalogConnection { catalog, uri })
698            }
699        };
700
701        Ok(connection)
702    }
703
704    pub fn get_brokers_and_rules(
705        &self,
706        scx: &StatementContext,
707    ) -> Result<
708        (
709            Vec<mz_storage_types::connections::KafkaBroker<ReferencedConnection>>,
710            Vec<KafkaMatchingBrokerRule<Aug>>,
711        ),
712        PlanError,
713    > {
714        // Collect all static broker sources and matching rules.
715        let mut all_brokers: Vec<KafkaBroker<Aug>> = vec![];
716        let mut matching_rules: Vec<KafkaMatchingBrokerRule<Aug>> = vec![];
717
718        // Check exclusions and extract brokers
719        match (&self.broker, &self.brokers, &self.aws_privatelink) {
720            // BROKER
721            (Some(broker), None, None) => all_brokers.extend(broker.iter().cloned()),
722            // BROKERS
723            (None, Some(broker_list), None) => {
724                all_brokers.extend(broker_list.static_entries.iter().cloned());
725                matching_rules.extend(broker_list.matching_rules.iter().cloned());
726            }
727            // AWS PRIVATELINK
728            (None, None, Some(_privatelink)) => {
729                // Noting to do here
730            }
731            // noting - invalid
732            (None, None, None) => {
733                sql_bail!("invalid CONNECTION: must set one of BROKER, BROKERS, or AWS PRIVATELINK")
734            }
735            // exclusive keywords provided
736            _ => sql_bail!(
737                "invalid CONNECTION: can only set one of BROKER, BROKERS, or AWS PRIVATELINK"
738            ),
739        };
740
741        // MATCHING rules require at least one static broker for bootstrapping.
742        if !matching_rules.is_empty() && all_brokers.is_empty() {
743            sql_bail!(
744                "invalid CONNECTION: BROKERS must contain at least one static broker address"
745            );
746        }
747
748        // NOTE: we allow broker configurations to be mixed and matched. If/when we support
749        // a top-level `SSH TUNNEL` configuration, we will need additional assertions.
750        let mut out = vec![];
751        for broker in &all_brokers {
752            if broker.address.contains(',') {
753                sql_bail!(
754                    "invalid CONNECTION: cannot specify multiple Kafka broker addresses in one string.\n\nInstead, specify BROKERS using multiple strings, e.g. BROKERS ('kafka:9092', 'kafka:9093')"
755                );
756            }
757
758            let tunnel = match &broker.tunnel {
759                KafkaBrokerTunnel::Direct => Tunnel::Direct,
760                KafkaBrokerTunnel::AwsPrivatelink(aws_privatelink) => {
761                    Tunnel::AwsPrivatelink(plan_privatelink(scx, aws_privatelink)?)
762                }
763                KafkaBrokerTunnel::SshTunnel(ssh) => {
764                    let id = match &ssh {
765                        ResolvedItemName::Item { id, .. } => id,
766                        _ => sql_bail!(
767                            "internal error: Kafka SSH tunnel connection was not resolved"
768                        ),
769                    };
770                    let ssh_tunnel = scx.catalog.get_item(id);
771                    match ssh_tunnel.connection()? {
772                        Connection::Ssh(_connection) => Tunnel::Ssh(SshTunnel {
773                            connection_id: *id,
774                            connection: *id,
775                        }),
776                        _ => {
777                            sql_bail!("{} is not an SSH connection", ssh_tunnel.name().item)
778                        }
779                    }
780                }
781            };
782
783            out.push(mz_storage_types::connections::KafkaBroker {
784                address: broker.address.clone(),
785                tunnel,
786            });
787        }
788
789        Ok((out, matching_rules))
790    }
791}
792
793fn get_aws_connection_reference(
794    scx: &StatementContext,
795    conn_options: &ConnectionOptionExtracted,
796) -> Result<Option<AwsConnectionReference<ReferencedConnection>>, PlanError> {
797    let Some(aws_connection_id) = conn_options.aws_connection else {
798        return Ok(None);
799    };
800
801    let id = CatalogItemId::from(aws_connection_id);
802    let item = scx.catalog.get_item(&id);
803    Ok(match item.connection()? {
804        Connection::Aws(_) => Some(AwsConnectionReference {
805            connection_id: id,
806            connection: id,
807        }),
808        _ => sql_bail!("{} is not an AWS connection", item.name().item),
809    })
810}
811
812fn plan_kafka_security(
813    scx: &StatementContext,
814    v: &ConnectionOptionExtracted,
815) -> Result<
816    (
817        Option<KafkaTlsConfig>,
818        Option<KafkaSaslConfig<ReferencedConnection>>,
819    ),
820    PlanError,
821> {
822    const SASL_CONFIGS: [ConnectionOptionName; 4] = [
823        ConnectionOptionName::AwsConnection,
824        ConnectionOptionName::SaslMechanisms,
825        ConnectionOptionName::SaslUsername,
826        ConnectionOptionName::SaslPassword,
827    ];
828
829    const ALL_CONFIGS: [ConnectionOptionName; 7] = concat_arrays!(
830        [
831            ConnectionOptionName::SslKey,
832            ConnectionOptionName::SslCertificate,
833            ConnectionOptionName::SslCertificateAuthority,
834        ],
835        SASL_CONFIGS
836    );
837
838    enum SecurityProtocol {
839        Plaintext,
840        Ssl,
841        SaslPlaintext,
842        SaslSsl,
843    }
844
845    let security_protocol = v.security_protocol.as_ref().map(|v| v.to_uppercase());
846    let security_protocol = match security_protocol.as_deref() {
847        Some("PLAINTEXT") => SecurityProtocol::Plaintext,
848        Some("SSL") => SecurityProtocol::Ssl,
849        Some("SASL_PLAINTEXT") => SecurityProtocol::SaslPlaintext,
850        Some("SASL_SSL") => SecurityProtocol::SaslSsl,
851        Some(p) => sql_bail!("unknown security protocol: {}", p),
852        // To be secure by default, if no security protocol is explicitly
853        // specified, we always choose one of the SSL-enabled protocols, using
854        // the presence of any SASL options to guide us between them. Users must
855        // explicitly choose a plaintext mechanism if that's what they want.
856        None if SASL_CONFIGS.iter().any(|c| v.seen.contains(c)) => SecurityProtocol::SaslSsl,
857        None => SecurityProtocol::Ssl,
858    };
859
860    let mut outstanding = ALL_CONFIGS
861        .into_iter()
862        .filter(|c| v.seen.contains(c))
863        .collect::<BTreeSet<ConnectionOptionName>>();
864
865    let tls = match security_protocol {
866        SecurityProtocol::Ssl | SecurityProtocol::SaslSsl => {
867            outstanding.remove(&ConnectionOptionName::SslCertificate);
868            let identity = match &v.ssl_certificate {
869                None => None,
870                Some(cert) => {
871                    outstanding.remove(&ConnectionOptionName::SslKey);
872                    let Some(key) = &v.ssl_key else {
873                        sql_bail!("SSL KEY must be specified with SSL CERTIFICATE");
874                    };
875                    Some(TlsIdentity {
876                        cert: cert.clone(),
877                        key: (*key).into(),
878                    })
879                }
880            };
881            outstanding.remove(&ConnectionOptionName::SslCertificateAuthority);
882            Some(KafkaTlsConfig {
883                identity,
884                root_cert: v.ssl_certificate_authority.clone(),
885            })
886        }
887        _ => None,
888    };
889
890    let sasl = match security_protocol {
891        SecurityProtocol::SaslPlaintext | SecurityProtocol::SaslSsl => {
892            outstanding.remove(&ConnectionOptionName::AwsConnection);
893            match get_aws_connection_reference(scx, v)? {
894                Some(aws) => {
895                    scx.require_feature_flag(&ENABLE_AWS_MSK_IAM_AUTH)?;
896                    Some(KafkaSaslConfig {
897                        mechanism: "OAUTHBEARER".into(),
898                        username: "".into(),
899                        password: None,
900                        aws: Some(aws),
901                    })
902                }
903                None => {
904                    outstanding.remove(&ConnectionOptionName::SaslMechanisms);
905                    outstanding.remove(&ConnectionOptionName::SaslUsername);
906                    outstanding.remove(&ConnectionOptionName::SaslPassword);
907                    // TODO(benesch): support a less confusing `SASL MECHANISM`
908                    // alias, as only a single mechanism that can be specified.
909                    let Some(mechanism) = &v.sasl_mechanisms else {
910                        sql_bail!("SASL MECHANISMS must be specified");
911                    };
912                    let Some(username) = &v.sasl_username else {
913                        sql_bail!("SASL USERNAME must be specified");
914                    };
915                    let Some(password) = &v.sasl_password else {
916                        sql_bail!("SASL PASSWORD must be specified");
917                    };
918                    Some(KafkaSaslConfig {
919                        // librdkafka requires SASL mechanisms to be upper case (PLAIN,
920                        // SCRAM-SHA-256). For usability, we automatically uppercase the
921                        // mechanism that user provides. This avoids a frustrating
922                        // interaction with identifier case folding. Consider `SASL
923                        // MECHANISMS = PLAIN`. Identifier case folding results in a
924                        // SASL mechanism of `plain` (note the lowercase), which
925                        // Materialize previously rejected with an error of "SASL
926                        // mechanism must be uppercase." This was deeply frustarting for
927                        // users who were not familiar with identifier case folding
928                        // rules. See database-issues#6693.
929                        mechanism: mechanism.to_uppercase(),
930                        username: username.clone(),
931                        password: Some((*password).into()),
932                        aws: None,
933                    })
934                }
935            }
936        }
937        _ => None,
938    };
939
940    if let Some(outstanding) = outstanding.first() {
941        sql_bail!("option {outstanding} not supported with this configuration");
942    }
943
944    Ok((tls, sasl))
945}
946pub fn plan_default_privatelink(
947    scx: &StatementContext,
948    pl: &mz_sql_parser::ast::ConnectionDefaultAwsPrivatelink<Aug>,
949) -> Result<AwsPrivatelink, PlanError> {
950    let id = pl.connection.item_id().clone();
951    let entry = scx.catalog.get_item(&id);
952    match entry.connection()? {
953        Connection::AwsPrivatelink(_) => Ok(AwsPrivatelink {
954            connection_id: id,
955            // By default we do not specify an availability zone for the tunnel.
956            availability_zone: None,
957            // We always use the port as specified by the top-level connection.
958            port: pl.port,
959        }),
960        _ => sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item),
961    }
962}
963
964pub fn plan_privatelink(
965    scx: &StatementContext,
966    pl: &mz_sql_parser::ast::KafkaBrokerAwsPrivatelink<Aug>,
967) -> Result<AwsPrivatelink, PlanError> {
968    let KafkaBrokerAwsPrivatelinkOptionExtracted {
969        availability_zone,
970        port,
971        seen: _,
972    } = KafkaBrokerAwsPrivatelinkOptionExtracted::try_from(pl.options.clone())?;
973
974    let id = match &pl.connection {
975        ResolvedItemName::Item { id, .. } => id,
976        _ => sql_bail!("internal error: Kafka PrivateLink connection was not resolved"),
977    };
978    let entry = scx.catalog.get_item(id);
979    match entry.connection()? {
980        Connection::AwsPrivatelink(connection) => {
981            if let Some(az) = &availability_zone {
982                if !connection.availability_zones.contains(az) {
983                    sql_bail!(
984                        "AWS PrivateLink availability zone {} does not match any of the \
985                                      availability zones on the AWS PrivateLink connection {}",
986                        az.quoted(),
987                        scx.catalog
988                            .resolve_full_name(entry.name())
989                            .to_string()
990                            .quoted()
991                    )
992                }
993            }
994            Ok(AwsPrivatelink {
995                connection_id: *id,
996                availability_zone,
997                port,
998            })
999        }
1000        _ => {
1001            sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item)
1002        }
1003    }
1004}
1005
1006pub(crate) fn build_tunnel_definition(
1007    scx: &StatementContext,
1008    ssh_tunnel: Option<with_options::Object>,
1009    aws_privatelink: Option<ConnectionDefaultAwsPrivatelink<Aug>>,
1010    matching_rules: Option<Vec<KafkaMatchingBrokerRule<Aug>>>,
1011) -> Result<Tunnel<ReferencedConnection>, PlanError> {
1012    Ok(match (ssh_tunnel, aws_privatelink, matching_rules) {
1013        (None, None, None) => Tunnel::Direct,
1014        (Some(ssh_tunnel), None, None) => {
1015            let id = CatalogItemId::from(ssh_tunnel);
1016            let ssh_tunnel = scx.catalog.get_item(&id);
1017            match ssh_tunnel.connection()? {
1018                Connection::Ssh(_connection) => Tunnel::Ssh(SshTunnel {
1019                    connection_id: id,
1020                    connection: id,
1021                }),
1022                _ => sql_bail!("{} is not an SSH connection", ssh_tunnel.name().item),
1023            }
1024        }
1025        (None, Some(aws_privatelink), None) => {
1026            Tunnel::AwsPrivatelink(plan_default_privatelink(scx, &aws_privatelink)?)
1027        }
1028        (None, None, Some(rules)) => {
1029            if rules.is_empty() {
1030                sql_bail!("BROKERS MATCHING rules list cannot be empty");
1031            }
1032
1033            let rules = rules
1034                .iter()
1035                .map(|rule| {
1036                    Ok(AwsPrivatelinkRule {
1037                        pattern: rule.pattern.clone(),
1038                        to: plan_privatelink(scx, &rule.tunnel)?,
1039                    })
1040                })
1041                .collect::<Result<Vec<_>, PlanError>>()?;
1042            Tunnel::AwsPrivatelinks(mz_storage_types::connections::AwsPrivatelinks { rules })
1043        }
1044        _ => {
1045            sql_bail!("cannot specify both SSH TUNNEL and AWS PRIVATELINK");
1046        }
1047    })
1048}