Skip to main content

mz_sql/plan/statement/ddl/
connection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data definition language (DDL) utilities for CONNECTION objects.
11
12use std::collections::{BTreeMap, BTreeSet};
13
14use anyhow::Context;
15use array_concat::concat_arrays;
16use itertools::Itertools;
17use maplit::btreemap;
18use mz_ore::num::NonNeg;
19use mz_ore::str::StrExt;
20use mz_repr::CatalogItemId;
21use mz_sql_parser::ast::ConnectionOptionName::*;
22use mz_sql_parser::ast::display::AstDisplay;
23use mz_sql_parser::ast::{
24    ConnectionDefaultAwsPrivatelink, ConnectionOption, ConnectionOptionName, CreateConnectionType,
25    KafkaBroker, KafkaBrokerAwsPrivatelinkOption, KafkaBrokerAwsPrivatelinkOptionName,
26    KafkaBrokerTunnel, KafkaMatchingBrokerRule,
27};
28use mz_ssh_util::keys::SshKeyPair;
29use mz_storage_types::connections::aws::{
30    AwsAssumeRole, AwsAuth, AwsConnection, AwsConnectionReference, AwsCredentials,
31};
32use mz_storage_types::connections::gcp::{GcpConnection, GcpConnectionReference};
33use mz_storage_types::connections::inline::ReferencedConnection;
34use mz_storage_types::connections::string_or_secret::StringOrSecret;
35use mz_storage_types::connections::{
36    AwsPrivatelink, AwsPrivatelinkConnection, AwsPrivatelinkRule, CsrConnection,
37    CsrConnectionHttpAuth, GlueSchemaRegistryConnection, IcebergCatalogAuth,
38    IcebergCatalogConnection, IcebergCatalogImpl, IcebergCatalogType, KafkaConnection,
39    KafkaSaslConfig, KafkaTlsConfig, KafkaTopicOptions, MySqlConnection, MySqlSslMode,
40    PostgresConnection, RestIcebergCatalog, S3TablesRestIcebergCatalog, SqlServerConnectionDetails,
41    SshConnection, SshTunnel, TlsIdentity, Tunnel,
42};
43
44use crate::names::Aug;
45use crate::plan::statement::{Connection, ResolvedItemName};
46use crate::plan::with_options::{self};
47use crate::plan::{ConnectionDetails, PlanError, SshKey, StatementContext};
48use crate::session::vars;
49
50generate_extracted_config!(
51    ConnectionOption,
52    (AccessKeyId, StringOrSecret),
53    (AssumeRoleArn, String),
54    (AssumeRoleSessionName, String),
55    (AvailabilityZones, Vec<String>),
56    (AwsConnection, with_options::Object),
57    (AwsPrivatelink, ConnectionDefaultAwsPrivatelink<Aug>),
58    (Broker, Vec<KafkaBroker<Aug>>),
59    (Brokers, with_options::BrokersList),
60    (Credential, StringOrSecret),
61    (Database, String),
62    (Endpoint, String),
63    (GcpConnection, with_options::Object),
64    (Host, String),
65    (Password, with_options::Secret),
66    (Port, u16),
67    (ProgressTopic, String),
68    (ProgressTopicReplicationFactor, i32),
69    (PublicKey1, String),
70    (PublicKey2, String),
71    (Region, String),
72    (Registry, String),
73    (SaslMechanisms, String),
74    (SaslPassword, with_options::Secret),
75    (SaslUsername, StringOrSecret),
76    (Scope, String),
77    (SecretAccessKey, with_options::Secret),
78    (SecurityProtocol, String),
79    (ServiceAccountKey, with_options::Secret),
80    (ServiceName, String),
81    (SshTunnel, with_options::Object),
82    (SslCertificate, StringOrSecret),
83    (SslCertificateAuthority, StringOrSecret),
84    (SslKey, with_options::Secret),
85    (SslMode, String),
86    (SessionToken, StringOrSecret),
87    (CatalogType, IcebergCatalogType),
88    (Url, String),
89    (User, StringOrSecret),
90    (Warehouse, String)
91);
92
93generate_extracted_config!(
94    KafkaBrokerAwsPrivatelinkOption,
95    (AvailabilityZone, String),
96    (Port, u16)
97);
98
99/// Options which cannot be changed using ALTER CONNECTION.
100pub(crate) const INALTERABLE_OPTIONS: &[ConnectionOptionName] =
101    &[ProgressTopic, ProgressTopicReplicationFactor];
102
103/// Options of which only one may be specified.
104pub(crate) const MUTUALLY_EXCLUSIVE_SETS: &[&[ConnectionOptionName]] = &[&[Broker, Brokers]];
105
106pub(super) fn validate_options_per_connection_type(
107    t: CreateConnectionType,
108    mut options: BTreeSet<ConnectionOptionName>,
109) -> Result<(), PlanError> {
110    use mz_sql_parser::ast::ConnectionOptionName::*;
111    let permitted_options = match t {
112        CreateConnectionType::Aws => [
113            AccessKeyId,
114            SecretAccessKey,
115            SessionToken,
116            Endpoint,
117            Region,
118            AssumeRoleArn,
119            AssumeRoleSessionName,
120        ]
121        .as_slice(),
122        CreateConnectionType::AwsPrivatelink => &[AvailabilityZones, Port, ServiceName],
123        CreateConnectionType::GlueSchemaRegistry => &[AwsConnection, Registry],
124        CreateConnectionType::Gcp => &[ServiceAccountKey],
125        CreateConnectionType::Csr => &[
126            AwsPrivatelink,
127            Password,
128            Port,
129            SshTunnel,
130            SslCertificate,
131            SslCertificateAuthority,
132            SslKey,
133            Url,
134            User,
135        ],
136        CreateConnectionType::Kafka => &[
137            AwsConnection,
138            Broker,
139            Brokers,
140            ProgressTopic,
141            ProgressTopicReplicationFactor,
142            AwsPrivatelink,
143            SshTunnel,
144            SslKey,
145            SslCertificate,
146            SslCertificateAuthority,
147            SaslMechanisms,
148            SaslUsername,
149            SaslPassword,
150            SecurityProtocol,
151        ],
152        CreateConnectionType::Postgres => &[
153            AwsPrivatelink,
154            Database,
155            Host,
156            Password,
157            Port,
158            SshTunnel,
159            SslCertificate,
160            SslCertificateAuthority,
161            SslKey,
162            SslMode,
163            User,
164        ],
165        CreateConnectionType::Ssh => &[Host, Port, User, PublicKey1, PublicKey2],
166        CreateConnectionType::MySql => &[
167            AwsPrivatelink,
168            Host,
169            Password,
170            Port,
171            SshTunnel,
172            SslCertificate,
173            SslCertificateAuthority,
174            SslKey,
175            SslMode,
176            User,
177            AwsConnection,
178        ],
179        CreateConnectionType::SqlServer => &[
180            AwsPrivatelink,
181            Database,
182            Host,
183            Password,
184            Port,
185            SshTunnel,
186            SslCertificate,
187            SslCertificateAuthority,
188            SslKey,
189            SslMode,
190            User,
191        ],
192        CreateConnectionType::IcebergCatalog => &[
193            AwsConnection,
194            CatalogType,
195            Credential,
196            GcpConnection,
197            Scope,
198            Url,
199            Warehouse,
200        ],
201    };
202
203    for o in permitted_options {
204        options.remove(o);
205    }
206
207    if !options.is_empty() {
208        sql_bail!(
209            "{} connections do not support {} values",
210            t,
211            options.iter().join(", ")
212        )
213    }
214
215    Ok(())
216}
217
218impl ConnectionOptionExtracted {
219    pub(super) fn ensure_only_valid_options(
220        &self,
221        t: CreateConnectionType,
222    ) -> Result<(), PlanError> {
223        validate_options_per_connection_type(t, self.seen.clone())
224    }
225
226    pub fn try_into_connection_details(
227        self,
228        scx: &StatementContext,
229        connection_type: CreateConnectionType,
230    ) -> Result<ConnectionDetails, PlanError> {
231        self.ensure_only_valid_options(connection_type)?;
232
233        let connection: ConnectionDetails = match connection_type {
234            CreateConnectionType::Aws => {
235                let credentials = match (
236                    self.access_key_id,
237                    self.secret_access_key,
238                    self.session_token,
239                ) {
240                    (Some(access_key_id), Some(secret_access_key), session_token) => {
241                        Some(AwsCredentials {
242                            access_key_id,
243                            secret_access_key: secret_access_key.into(),
244                            session_token,
245                        })
246                    }
247                    (None, None, None) => None,
248                    _ => {
249                        sql_bail!(
250                            "must specify both ACCESS KEY ID and SECRET ACCESS KEY with optional SESSION TOKEN"
251                        );
252                    }
253                };
254
255                let assume_role = match (self.assume_role_arn, self.assume_role_session_name) {
256                    (Some(arn), session_name) => Some(AwsAssumeRole { arn, session_name }),
257                    (None, Some(_)) => {
258                        sql_bail!(
259                            "must specify ASSUME ROLE ARN with optional ASSUME ROLE SESSION NAME"
260                        );
261                    }
262                    _ => None,
263                };
264
265                let auth = match (credentials, assume_role) {
266                    (None, None) => sql_bail!(
267                        "must specify either ASSUME ROLE ARN or ACCESS KEY ID and SECRET ACCESS KEY"
268                    ),
269                    (Some(credentials), None) => AwsAuth::Credentials(credentials),
270                    (None, Some(assume_role)) => AwsAuth::AssumeRole(assume_role),
271                    (Some(_), Some(_)) => {
272                        sql_bail!("cannot specify both ACCESS KEY ID and ASSUME ROLE ARN");
273                    }
274                };
275
276                ConnectionDetails::Aws(AwsConnection {
277                    auth,
278                    endpoint: match self.endpoint {
279                        // TODO(benesch): this should not treat an empty endpoint as equivalent to a `NULL`
280                        // endpoint, but making that change now would break testdrive. AWS connections are
281                        // all behind feature flags mode right now, so no particular urgency to correct
282                        // this.
283                        Some(endpoint) if !endpoint.is_empty() => Some(endpoint),
284                        _ => None,
285                    },
286                    region: self.region,
287                })
288            }
289            CreateConnectionType::AwsPrivatelink => {
290                let connection = AwsPrivatelinkConnection {
291                    service_name: self
292                        .service_name
293                        .ok_or_else(|| sql_err!("SERVICE NAME option is required"))?,
294                    availability_zones: self
295                        .availability_zones
296                        .ok_or_else(|| sql_err!("AVAILABILITY ZONES option is required"))?,
297                };
298                if let Some(supported_azs) = scx.catalog.aws_privatelink_availability_zones() {
299                    let mut unique_azs: BTreeSet<String> = BTreeSet::new();
300                    let mut duplicate_azs: BTreeSet<String> = BTreeSet::new();
301                    // Validate each AZ is supported
302                    for connection_az in &connection.availability_zones {
303                        if unique_azs.contains(connection_az) {
304                            duplicate_azs.insert(connection_az.to_string());
305                        } else {
306                            unique_azs.insert(connection_az.to_string());
307                        }
308                        if !supported_azs.contains(connection_az) {
309                            return Err(PlanError::InvalidPrivatelinkAvailabilityZone {
310                                name: connection_az.to_string(),
311                                supported_azs,
312                            });
313                        }
314                    }
315                    if duplicate_azs.len() > 0 {
316                        return Err(PlanError::DuplicatePrivatelinkAvailabilityZone {
317                            duplicate_azs,
318                        });
319                    }
320                }
321                ConnectionDetails::AwsPrivatelink(connection)
322            }
323            CreateConnectionType::Gcp => {
324                let credentials_json = self
325                    .service_account_key
326                    .ok_or_else(|| sql_err!("SERVICE ACCOUNT KEY option is required"))?
327                    .into();
328                ConnectionDetails::Gcp(GcpConnection { credentials_json })
329            }
330            CreateConnectionType::Kafka => {
331                let (tls, sasl) = plan_kafka_security(scx, &self)?;
332                let (static_brokers, matching_rules) = self.get_brokers_and_rules(scx)?;
333
334                if !matching_rules.is_empty() {
335                    scx.require_feature_flag(&vars::ENABLE_KAFKA_BROKER_MATCHING_RULES)?;
336                }
337
338                ConnectionDetails::Kafka(KafkaConnection {
339                    brokers: static_brokers,
340                    default_tunnel: build_tunnel_definition(
341                            scx,
342                            self.ssh_tunnel,
343                            self.aws_privatelink,
344                            if matching_rules.is_empty() { None } else { Some(matching_rules) },
345                        )?,
346                    progress_topic: self.progress_topic,
347                    progress_topic_options: KafkaTopicOptions {
348                        // We only allow configuring the progress topic replication factor for now.
349                        // For correctness, the partition count MUST be one and for performance the compaction
350                        // policy MUST be enabled.
351                        partition_count: Some(NonNeg::try_from(1).expect("1 is positive")),
352                        replication_factor: self.progress_topic_replication_factor.map(|val| {
353                            if val <= 0 {
354                                Err(sql_err!("invalid CONNECTION: PROGRESS TOPIC REPLICATION FACTOR must be greater than 0"))?
355                            }
356                            NonNeg::try_from(val).map_err(|e| sql_err!("{e}"))
357                        }).transpose()?,
358                        topic_config: btreemap! {
359                            "cleanup.policy".to_string() => "compact".to_string(),
360                            "segment.bytes".to_string() => "134217728".to_string(), // 128 MiB
361                        },
362                    },
363                    options: BTreeMap::new(),
364                    tls,
365                    sasl,
366                })
367            }
368            CreateConnectionType::Csr => {
369                let url: reqwest::Url = match self.url {
370                    Some(url) => url
371                        .parse()
372                        .map_err(|e| sql_err!("parsing schema registry url: {e}"))?,
373                    None => sql_bail!("invalid CONNECTION: must specify URL"),
374                };
375                let _ = url
376                    .host_str()
377                    .ok_or_else(|| sql_err!("invalid CONNECTION: URL must specify domain name"))?;
378                if url.path() != "/" {
379                    sql_bail!("invalid CONNECTION: URL must have an empty path");
380                }
381                let cert = self.ssl_certificate;
382                let key = self.ssl_key.map(|secret| secret.into());
383                let tls_identity = match (cert, key) {
384                    (None, None) => None,
385                    (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
386                    _ => sql_bail!(
387                        "invalid CONNECTION: reading from SSL-auth Confluent Schema Registry requires both SSL KEY and SSL CERTIFICATE"
388                    ),
389                };
390                let http_auth = self.user.map(|username| CsrConnectionHttpAuth {
391                    username,
392                    password: self.password.map(|secret| secret.into()),
393                });
394
395                // TODO we should move to self.port being unsupported if aws_privatelink is some, see <https://github.com/MaterializeInc/database-issues/issues/7359#issuecomment-1925443977>
396                if let Some(privatelink) = self.aws_privatelink.as_ref() {
397                    if privatelink.port.is_some() {
398                        sql_bail!(
399                            "invalid CONNECTION: CONFLUENT SCHEMA REGISTRY does not support PORT for AWS PRIVATELINK"
400                        )
401                    }
402                }
403                let tunnel = build_tunnel_definition(
404                    scx,
405                    self.ssh_tunnel,
406                    self.aws_privatelink,
407                    None, /* Rule-based PrivateLink is not supported for CSR. */
408                )?;
409
410                ConnectionDetails::Csr(CsrConnection {
411                    url,
412                    tls_root_cert: self.ssl_certificate_authority,
413                    tls_identity,
414                    http_auth,
415                    tunnel,
416                })
417            }
418            CreateConnectionType::GlueSchemaRegistry => {
419                scx.require_feature_flag(&vars::ENABLE_GLUE_SCHEMA_REGISTRY)?;
420
421                let aws_connection = get_aws_connection_reference(scx, &self)?
422                    .ok_or_else(|| sql_err!("AWS CONNECTION option is required"))?;
423                let registry_name = self
424                    .registry
425                    .ok_or_else(|| sql_err!("REGISTRY option is required"))?;
426                if registry_name.is_empty() {
427                    sql_bail!("invalid CONNECTION: REGISTRY must not be empty");
428                }
429
430                ConnectionDetails::GlueSchemaRegistry(GlueSchemaRegistryConnection {
431                    aws_connection,
432                    registry_name,
433                })
434            }
435            CreateConnectionType::Postgres => {
436                let cert = self.ssl_certificate;
437                let key = self.ssl_key.map(|secret| secret.into());
438                let tls_identity = match (cert, key) {
439                    (None, None) => None,
440                    (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
441                    _ => sql_bail!(
442                        "invalid CONNECTION: both SSL KEY and SSL CERTIFICATE are required"
443                    ),
444                };
445                let tls_mode = match self.ssl_mode.as_ref().map(|m| m.as_str()) {
446                    None | Some("disable") => tokio_postgres::config::SslMode::Disable,
447                    // "prefer" intentionally omitted because it has dubious security
448                    // properties.
449                    Some("require") | Some("required") => tokio_postgres::config::SslMode::Require,
450                    Some("verify_ca") | Some("verify-ca") => {
451                        tokio_postgres::config::SslMode::VerifyCa
452                    }
453                    Some("verify_full") | Some("verify-full") => {
454                        tokio_postgres::config::SslMode::VerifyFull
455                    }
456                    Some(m) => sql_bail!("invalid CONNECTION: unknown SSL MODE {}", m.quoted()),
457                };
458
459                // TODO we should move to self.port being unsupported if aws_privatelink is some, see <https://github.com/MaterializeInc/database-issues/issues/7359#issuecomment-1925443977>
460                if let Some(privatelink) = self.aws_privatelink.as_ref() {
461                    if privatelink.port.is_some() {
462                        sql_bail!(
463                            "invalid CONNECTION: POSTGRES does not support PORT for AWS PRIVATELINK"
464                        )
465                    }
466                }
467                let tunnel = build_tunnel_definition(
468                    scx,
469                    self.ssh_tunnel,
470                    self.aws_privatelink,
471                    None, /* Rule-based PrivateLink is not supported for Postgres. */
472                )?;
473
474                ConnectionDetails::Postgres(PostgresConnection {
475                    database: self
476                        .database
477                        .ok_or_else(|| sql_err!("DATABASE option is required"))?,
478                    password: self.password.map(|password| password.into()),
479                    host: self
480                        .host
481                        .ok_or_else(|| sql_err!("HOST option is required"))?,
482                    port: self.port.unwrap_or(5432_u16),
483                    tunnel,
484                    tls_mode,
485                    tls_root_cert: self.ssl_certificate_authority,
486                    tls_identity,
487                    user: self
488                        .user
489                        .ok_or_else(|| sql_err!("USER option is required"))?,
490                })
491            }
492            CreateConnectionType::Ssh => {
493                let ensure_key = |public_key| match public_key {
494                    Some(public_key) => Ok::<_, anyhow::Error>(SshKey::PublicOnly(public_key)),
495                    None => {
496                        let key = SshKeyPair::new().context("creating SSH key")?;
497                        Ok(SshKey::Both(key))
498                    }
499                };
500                ConnectionDetails::Ssh {
501                    connection: SshConnection {
502                        host: self
503                            .host
504                            .ok_or_else(|| sql_err!("HOST option is required"))?,
505                        port: self.port.unwrap_or(22_u16),
506                        user: match self
507                            .user
508                            .ok_or_else(|| sql_err!("USER option is required"))?
509                        {
510                            StringOrSecret::String(user) => user,
511                            StringOrSecret::Secret(_) => {
512                                sql_bail!(
513                                    "SSH connections do not support supplying USER value as SECRET"
514                                )
515                            }
516                        },
517                    },
518                    key_1: ensure_key(self.public_key1)?,
519                    key_2: ensure_key(self.public_key2)?,
520                }
521            }
522            CreateConnectionType::MySql => {
523                let aws_connection = get_aws_connection_reference(scx, &self)?;
524                if aws_connection.is_some() && self.password.is_some() {
525                    sql_bail!(
526                        "invalid CONNECTION: AWS IAM authentication is not supported with password"
527                    );
528                }
529
530                let cert = self.ssl_certificate;
531                let key = self.ssl_key.map(|secret| secret.into());
532                let tls_identity = match (cert, key) {
533                    (None, None) => None,
534                    (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
535                    _ => sql_bail!(
536                        "invalid CONNECTION: both SSL KEY and SSL CERTIFICATE are required"
537                    ),
538                };
539                // Accepts the same SSL Mode values as the MySQL Client
540                // https://dev.mysql.com/doc/refman/8.0/en/connection-options.html#option_general_ssl-mode
541                let tls_mode = match self
542                    .ssl_mode
543                    .map(|f| f.to_uppercase())
544                    .as_ref()
545                    .map(|m| m.as_str())
546                {
547                    None | Some("DISABLED") => {
548                        if aws_connection.is_some() {
549                            sql_bail!(
550                                "invalid CONNECTION: AWS IAM authentication requires SSL to be enabled"
551                            )
552                        }
553                        MySqlSslMode::Disabled
554                    }
555                    // "preferred" intentionally omitted because it has dubious security
556                    // properties.
557                    Some("REQUIRED") | Some("REQUIRE") => MySqlSslMode::Required,
558                    Some("VERIFY_CA") | Some("VERIFY-CA") => MySqlSslMode::VerifyCa,
559                    Some("VERIFY_IDENTITY") | Some("VERIFY-IDENTITY") => {
560                        MySqlSslMode::VerifyIdentity
561                    }
562                    Some(m) => sql_bail!("invalid CONNECTION: unknown SSL MODE {}", m.quoted()),
563                };
564
565                // TODO we should move to self.port being unsupported if aws_privatelink is some, see <https://github.com/MaterializeInc/database-issues/issues/7359#issuecomment-1925443977>
566                if let Some(privatelink) = self.aws_privatelink.as_ref() {
567                    if privatelink.port.is_some() {
568                        sql_bail!(
569                            "invalid CONNECTION: MYSQL does not support PORT for AWS PRIVATELINK"
570                        )
571                    }
572                }
573                let tunnel = build_tunnel_definition(
574                    scx,
575                    self.ssh_tunnel,
576                    self.aws_privatelink,
577                    None, /* Rule-based PrivateLink is not supported for MySQL. */
578                )?;
579
580                ConnectionDetails::MySql(MySqlConnection {
581                    password: self.password.map(|password| password.into()),
582                    host: self
583                        .host
584                        .ok_or_else(|| sql_err!("HOST option is required"))?,
585                    port: self.port.unwrap_or(3306_u16),
586                    tunnel,
587                    tls_mode,
588                    tls_root_cert: self.ssl_certificate_authority,
589                    tls_identity,
590                    user: self
591                        .user
592                        .ok_or_else(|| sql_err!("USER option is required"))?,
593                    aws_connection,
594                })
595            }
596            CreateConnectionType::SqlServer => {
597                let aws_connection = get_aws_connection_reference(scx, &self)?;
598                if aws_connection.is_some() && self.password.is_some() {
599                    sql_bail!(
600                        "invalid CONNECTION: AWS IAM authentication is not supported with password"
601                    );
602                }
603
604                let (encryption, certificate_validation_policy) = match self
605                    .ssl_mode
606                    .map(|mode| mode.to_uppercase())
607                    .as_ref()
608                    .map(|mode| mode.as_str())
609                {
610                    None | Some("DISABLED") => (
611                        mz_sql_server_util::config::EncryptionLevel::None,
612                        mz_sql_server_util::config::CertificateValidationPolicy::TrustAll,
613                    ),
614                    Some("REQUIRED") => (
615                        mz_sql_server_util::config::EncryptionLevel::Required,
616                        mz_sql_server_util::config::CertificateValidationPolicy::TrustAll,
617                    ),
618                    Some("VERIFY") => (
619                        mz_sql_server_util::config::EncryptionLevel::Required,
620                        mz_sql_server_util::config::CertificateValidationPolicy::VerifySystem,
621                    ),
622                    Some("VERIFY_CA") => {
623                        if self.ssl_certificate_authority.is_none() {
624                            sql_bail!(
625                                "invalid CONNECTION: SSL MODE 'verify_ca' requires SSL CERTIFICATE AUTHORITY"
626                            );
627                        }
628                        (
629                            mz_sql_server_util::config::EncryptionLevel::Required,
630                            mz_sql_server_util::config::CertificateValidationPolicy::VerifyCA,
631                        )
632                    }
633                    Some(mode) => {
634                        sql_bail!("invalid CONNECTION: unknown SSL MODE {}", mode.quoted())
635                    }
636                };
637
638                if let Some(privatelink) = self.aws_privatelink.as_ref() {
639                    if privatelink.port.is_some() {
640                        sql_bail!(
641                            "invalid CONNECTION: SQL SERVER does not support PORT for AWS PRIVATELINK"
642                        )
643                    }
644                }
645
646                // 1433 is the default port for SQL Server instances running over TCP.
647                //
648                // See: <https://learn.microsoft.com/en-us/sql/database-engine/configure-windows/configure-a-server-to-listen-on-a-specific-tcp-port?view=sql-server-ver16>
649                let port = self.port.unwrap_or(1433_u16);
650                let tunnel = build_tunnel_definition(
651                    scx,
652                    self.ssh_tunnel,
653                    self.aws_privatelink,
654                    None, /* Rule-based PrivateLink is not supported for SQL Server. */
655                )?;
656
657                ConnectionDetails::SqlServer(SqlServerConnectionDetails {
658                    host: self
659                        .host
660                        .ok_or_else(|| sql_err!("HOST option is required"))?,
661                    port,
662                    database: self
663                        .database
664                        .ok_or_else(|| sql_err!("DATABASE option is required"))?,
665                    user: self
666                        .user
667                        .ok_or_else(|| sql_err!("USER option is required"))?,
668                    password: self
669                        .password
670                        .ok_or_else(|| sql_err!("PASSWORD option is required"))
671                        .map(|pass| pass.into())?,
672                    tunnel,
673                    encryption,
674                    certificate_validation_policy,
675                    tls_root_cert: self.ssl_certificate_authority,
676                })
677            }
678            CreateConnectionType::IcebergCatalog => {
679                let catalog_type = self.catalog_type.clone().ok_or_else(|| {
680                    sql_err!("invalid CONNECTION: ICEBERG connections must specify CATALOG TYPE")
681                })?;
682
683                let uri: reqwest::Url = match &self.url {
684                    Some(url) => url
685                        .parse()
686                        .map_err(|e| sql_err!("parsing Iceberg catalog url: {e}"))?,
687                    None => sql_bail!("invalid CONNECTION: must specify URL"),
688                };
689
690                let warehouse = self.warehouse.clone();
691                let credential = self.credential.clone();
692                let aws_connection = get_aws_connection_reference(scx, &self)?;
693                let gcp_connection = get_gcp_connection_reference(scx, &self)?;
694
695                let catalog = match catalog_type {
696                    IcebergCatalogType::S3TablesRest => {
697                        if gcp_connection.is_some() {
698                            sql_bail!(
699                                "invalid CONNECTION: ICEBERG s3tablesrest connections do not support GCP CONNECTION"
700                            );
701                        }
702                        let Some(warehouse) = warehouse else {
703                            sql_bail!(
704                                "invalid CONNECTION: ICEBERG s3tablesrest connections must specify WAREHOUSE"
705                            );
706                        };
707                        let Some(aws_connection) = aws_connection else {
708                            sql_bail!(
709                                "invalid CONNECTION: ICEBERG s3tablesrest connections require an AWS connection"
710                            );
711                        };
712
713                        IcebergCatalogImpl::S3TablesRest(S3TablesRestIcebergCatalog {
714                            aws_connection,
715                            warehouse,
716                        })
717                    }
718                    IcebergCatalogType::Rest => {
719                        if aws_connection.is_some() {
720                            sql_bail!(
721                                "invalid CONNECTION: ICEBERG rest connections do not support AWS CONNECTION.\n\nTry s3tablesrest instead."
722                            );
723                        }
724                        let auth = match (credential, gcp_connection) {
725                            (Some(_), Some(_)) => sql_bail!(
726                                "invalid CONNECTION: ICEBERG rest connections may set CREDENTIAL or GCP CONNECTION, not both"
727                            ),
728                            (Some(credential), None) => IcebergCatalogAuth::OAuth {
729                                credential,
730                                scope: self.scope.clone(),
731                            },
732                            (None, Some(gcp_connection)) => {
733                                /// All BigLake Iceberg REST Catalogs use the same catalog URI.
734                                const BIGLAKE_CATALOG_URI: &str =
735                                    "https://biglake.googleapis.com/iceberg/v1/restcatalog";
736                                if uri.to_string() != BIGLAKE_CATALOG_URI {
737                                    sql_bail!(
738                                        "GCP connection can only be used with '{}'",
739                                        BIGLAKE_CATALOG_URI
740                                    );
741                                }
742                                IcebergCatalogAuth::Gcp(gcp_connection)
743                            }
744                            (None, None) => sql_bail!(
745                                "invalid CONNECTION: ICEBERG rest connections require a CREDENTIAL or GCP CONNECTION"
746                            ),
747                        };
748
749                        IcebergCatalogImpl::Rest(RestIcebergCatalog { auth, warehouse })
750                    }
751                };
752
753                ConnectionDetails::IcebergCatalog(IcebergCatalogConnection { catalog, uri })
754            }
755        };
756
757        Ok(connection)
758    }
759
760    pub fn get_brokers_and_rules(
761        &self,
762        scx: &StatementContext,
763    ) -> Result<
764        (
765            Vec<mz_storage_types::connections::KafkaBroker<ReferencedConnection>>,
766            Vec<KafkaMatchingBrokerRule<Aug>>,
767        ),
768        PlanError,
769    > {
770        // Collect all static broker sources and matching rules.
771        let mut all_brokers: Vec<KafkaBroker<Aug>> = vec![];
772        let mut matching_rules: Vec<KafkaMatchingBrokerRule<Aug>> = vec![];
773
774        // Check exclusions and extract brokers
775        match (&self.broker, &self.brokers, &self.aws_privatelink) {
776            // BROKER
777            (Some(broker), None, None) => all_brokers.extend(broker.iter().cloned()),
778            // BROKERS
779            (None, Some(broker_list), None) => {
780                all_brokers.extend(broker_list.static_entries.iter().cloned());
781                matching_rules.extend(broker_list.matching_rules.iter().cloned());
782            }
783            // AWS PRIVATELINK
784            (None, None, Some(_privatelink)) => {
785                // Noting to do here
786            }
787            // noting - invalid
788            (None, None, None) => {
789                sql_bail!("invalid CONNECTION: must set one of BROKER, BROKERS, or AWS PRIVATELINK")
790            }
791            // exclusive keywords provided
792            _ => sql_bail!(
793                "invalid CONNECTION: can only set one of BROKER, BROKERS, or AWS PRIVATELINK"
794            ),
795        };
796
797        // MATCHING rules require at least one static broker for bootstrapping.
798        if !matching_rules.is_empty() && all_brokers.is_empty() {
799            sql_bail!(
800                "invalid CONNECTION: BROKERS must contain at least one static broker address"
801            );
802        }
803
804        // NOTE: we allow broker configurations to be mixed and matched. If/when we support
805        // a top-level `SSH TUNNEL` configuration, we will need additional assertions.
806        let mut out = vec![];
807        for broker in &all_brokers {
808            if broker.address.contains(',') {
809                sql_bail!(
810                    "invalid CONNECTION: cannot specify multiple Kafka broker addresses in one string.\n\nInstead, specify BROKERS using multiple strings, e.g. BROKERS ('kafka:9092', 'kafka:9093')"
811                );
812            }
813
814            let tunnel = match &broker.tunnel {
815                KafkaBrokerTunnel::Direct => Tunnel::Direct,
816                KafkaBrokerTunnel::AwsPrivatelink(aws_privatelink) => {
817                    Tunnel::AwsPrivatelink(plan_privatelink(scx, aws_privatelink)?)
818                }
819                KafkaBrokerTunnel::SshTunnel(ssh) => {
820                    let id = match &ssh {
821                        ResolvedItemName::Item { id, .. } => id,
822                        _ => sql_bail!(
823                            "internal error: Kafka SSH tunnel connection was not resolved"
824                        ),
825                    };
826                    let ssh_tunnel = scx.catalog.get_item(id);
827                    match ssh_tunnel.connection()? {
828                        Connection::Ssh(_connection) => Tunnel::Ssh(SshTunnel {
829                            connection_id: *id,
830                            connection: *id,
831                        }),
832                        _ => {
833                            sql_bail!("{} is not an SSH connection", ssh_tunnel.name().item)
834                        }
835                    }
836                }
837            };
838
839            out.push(mz_storage_types::connections::KafkaBroker {
840                address: broker.address.clone(),
841                tunnel,
842            });
843        }
844
845        Ok((out, matching_rules))
846    }
847}
848
849fn get_aws_connection_reference(
850    scx: &StatementContext,
851    conn_options: &ConnectionOptionExtracted,
852) -> Result<Option<AwsConnectionReference<ReferencedConnection>>, PlanError> {
853    let Some(aws_connection_id) = conn_options.aws_connection else {
854        return Ok(None);
855    };
856
857    let id = CatalogItemId::from(aws_connection_id);
858    let item = scx.catalog.get_item(&id);
859    Ok(match item.connection()? {
860        Connection::Aws(_) => Some(AwsConnectionReference {
861            connection_id: id,
862            connection: id,
863        }),
864        _ => sql_bail!("{} is not an AWS connection", item.name().item),
865    })
866}
867fn get_gcp_connection_reference(
868    scx: &StatementContext,
869    conn_options: &ConnectionOptionExtracted,
870) -> Result<Option<GcpConnectionReference<ReferencedConnection>>, PlanError> {
871    let Some(gcp_connection_id) = conn_options.gcp_connection else {
872        return Ok(None);
873    };
874
875    let id = CatalogItemId::from(gcp_connection_id);
876    let item = scx.catalog.get_item(&id);
877    Ok(match item.connection()? {
878        Connection::Gcp(_) => Some(GcpConnectionReference {
879            connection_id: id,
880            connection: id,
881        }),
882        _ => sql_bail!("{} is not a GCP connection", item.name().item),
883    })
884}
885
886fn plan_kafka_security(
887    scx: &StatementContext,
888    v: &ConnectionOptionExtracted,
889) -> Result<
890    (
891        Option<KafkaTlsConfig>,
892        Option<KafkaSaslConfig<ReferencedConnection>>,
893    ),
894    PlanError,
895> {
896    const SASL_CONFIGS: [ConnectionOptionName; 4] = [
897        ConnectionOptionName::AwsConnection,
898        ConnectionOptionName::SaslMechanisms,
899        ConnectionOptionName::SaslUsername,
900        ConnectionOptionName::SaslPassword,
901    ];
902
903    const ALL_CONFIGS: [ConnectionOptionName; 7] = concat_arrays!(
904        [
905            ConnectionOptionName::SslKey,
906            ConnectionOptionName::SslCertificate,
907            ConnectionOptionName::SslCertificateAuthority,
908        ],
909        SASL_CONFIGS
910    );
911
912    enum SecurityProtocol {
913        Plaintext,
914        Ssl,
915        SaslPlaintext,
916        SaslSsl,
917    }
918
919    let security_protocol = v.security_protocol.as_ref().map(|v| v.to_uppercase());
920    let security_protocol = match security_protocol.as_deref() {
921        Some("PLAINTEXT") => SecurityProtocol::Plaintext,
922        Some("SSL") => SecurityProtocol::Ssl,
923        Some("SASL_PLAINTEXT") => SecurityProtocol::SaslPlaintext,
924        Some("SASL_SSL") => SecurityProtocol::SaslSsl,
925        Some(p) => sql_bail!("unknown security protocol: {}", p),
926        // To be secure by default, if no security protocol is explicitly
927        // specified, we always choose one of the SSL-enabled protocols, using
928        // the presence of any SASL options to guide us between them. Users must
929        // explicitly choose a plaintext mechanism if that's what they want.
930        None if SASL_CONFIGS.iter().any(|c| v.seen.contains(c)) => SecurityProtocol::SaslSsl,
931        None => SecurityProtocol::Ssl,
932    };
933
934    let mut outstanding = ALL_CONFIGS
935        .into_iter()
936        .filter(|c| v.seen.contains(c))
937        .collect::<BTreeSet<ConnectionOptionName>>();
938
939    let tls = match security_protocol {
940        SecurityProtocol::Ssl | SecurityProtocol::SaslSsl => {
941            outstanding.remove(&ConnectionOptionName::SslCertificate);
942            let identity = match &v.ssl_certificate {
943                None => None,
944                Some(cert) => {
945                    outstanding.remove(&ConnectionOptionName::SslKey);
946                    let Some(key) = &v.ssl_key else {
947                        sql_bail!("SSL KEY must be specified with SSL CERTIFICATE");
948                    };
949                    Some(TlsIdentity {
950                        cert: cert.clone(),
951                        key: (*key).into(),
952                    })
953                }
954            };
955            outstanding.remove(&ConnectionOptionName::SslCertificateAuthority);
956            Some(KafkaTlsConfig {
957                identity,
958                root_cert: v.ssl_certificate_authority.clone(),
959            })
960        }
961        _ => None,
962    };
963
964    let sasl = match security_protocol {
965        SecurityProtocol::SaslPlaintext | SecurityProtocol::SaslSsl => {
966            outstanding.remove(&ConnectionOptionName::AwsConnection);
967            match get_aws_connection_reference(scx, v)? {
968                Some(aws) => Some(KafkaSaslConfig {
969                    mechanism: "OAUTHBEARER".into(),
970                    username: "".into(),
971                    password: None,
972                    aws: Some(aws),
973                }),
974                None => {
975                    outstanding.remove(&ConnectionOptionName::SaslMechanisms);
976                    outstanding.remove(&ConnectionOptionName::SaslUsername);
977                    outstanding.remove(&ConnectionOptionName::SaslPassword);
978                    // TODO(benesch): support a less confusing `SASL MECHANISM`
979                    // alias, as only a single mechanism that can be specified.
980                    let Some(mechanism) = &v.sasl_mechanisms else {
981                        sql_bail!("SASL MECHANISMS must be specified");
982                    };
983                    let Some(username) = &v.sasl_username else {
984                        sql_bail!("SASL USERNAME must be specified");
985                    };
986                    let Some(password) = &v.sasl_password else {
987                        sql_bail!("SASL PASSWORD must be specified");
988                    };
989                    Some(KafkaSaslConfig {
990                        // librdkafka requires SASL mechanisms to be upper case (PLAIN,
991                        // SCRAM-SHA-256). For usability, we automatically uppercase the
992                        // mechanism that user provides. This avoids a frustrating
993                        // interaction with identifier case folding. Consider `SASL
994                        // MECHANISMS = PLAIN`. Identifier case folding results in a
995                        // SASL mechanism of `plain` (note the lowercase), which
996                        // Materialize previously rejected with an error of "SASL
997                        // mechanism must be uppercase." This was deeply frustarting for
998                        // users who were not familiar with identifier case folding
999                        // rules. See database-issues#6693.
1000                        mechanism: mechanism.to_uppercase(),
1001                        username: username.clone(),
1002                        password: Some((*password).into()),
1003                        aws: None,
1004                    })
1005                }
1006            }
1007        }
1008        _ => None,
1009    };
1010
1011    if let Some(outstanding) = outstanding.first() {
1012        sql_bail!("option {outstanding} not supported with this configuration");
1013    }
1014
1015    Ok((tls, sasl))
1016}
1017pub fn plan_default_privatelink(
1018    scx: &StatementContext,
1019    pl: &mz_sql_parser::ast::ConnectionDefaultAwsPrivatelink<Aug>,
1020) -> Result<AwsPrivatelink, PlanError> {
1021    let id = pl.connection.item_id().clone();
1022    let entry = scx.catalog.get_item(&id);
1023    match entry.connection()? {
1024        Connection::AwsPrivatelink(_) => Ok(AwsPrivatelink {
1025            connection_id: id,
1026            // By default we do not specify an availability zone for the tunnel.
1027            availability_zone: None,
1028            // We always use the port as specified by the top-level connection.
1029            port: pl.port,
1030        }),
1031        _ => sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item),
1032    }
1033}
1034
1035pub fn plan_privatelink(
1036    scx: &StatementContext,
1037    pl: &mz_sql_parser::ast::KafkaBrokerAwsPrivatelink<Aug>,
1038) -> Result<AwsPrivatelink, PlanError> {
1039    let KafkaBrokerAwsPrivatelinkOptionExtracted {
1040        availability_zone,
1041        port,
1042        seen: _,
1043    } = KafkaBrokerAwsPrivatelinkOptionExtracted::try_from(pl.options.clone())?;
1044
1045    let id = match &pl.connection {
1046        ResolvedItemName::Item { id, .. } => id,
1047        _ => sql_bail!("internal error: Kafka PrivateLink connection was not resolved"),
1048    };
1049    let entry = scx.catalog.get_item(id);
1050    match entry.connection()? {
1051        Connection::AwsPrivatelink(connection) => {
1052            if let Some(az) = &availability_zone {
1053                if !connection.availability_zones.contains(az) {
1054                    sql_bail!(
1055                        "AWS PrivateLink availability zone {} does not match any of the \
1056                                      availability zones on the AWS PrivateLink connection {}",
1057                        az.quoted(),
1058                        scx.catalog
1059                            .resolve_full_name(entry.name())
1060                            .to_string()
1061                            .quoted()
1062                    )
1063                }
1064            }
1065            Ok(AwsPrivatelink {
1066                connection_id: *id,
1067                availability_zone,
1068                port,
1069            })
1070        }
1071        _ => {
1072            sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item)
1073        }
1074    }
1075}
1076
1077pub(crate) fn build_tunnel_definition(
1078    scx: &StatementContext,
1079    ssh_tunnel: Option<with_options::Object>,
1080    aws_privatelink: Option<ConnectionDefaultAwsPrivatelink<Aug>>,
1081    matching_rules: Option<Vec<KafkaMatchingBrokerRule<Aug>>>,
1082) -> Result<Tunnel<ReferencedConnection>, PlanError> {
1083    Ok(match (ssh_tunnel, aws_privatelink, matching_rules) {
1084        (None, None, None) => Tunnel::Direct,
1085        (Some(ssh_tunnel), None, None) => {
1086            let id = CatalogItemId::from(ssh_tunnel);
1087            let ssh_tunnel = scx.catalog.get_item(&id);
1088            match ssh_tunnel.connection()? {
1089                Connection::Ssh(_connection) => Tunnel::Ssh(SshTunnel {
1090                    connection_id: id,
1091                    connection: id,
1092                }),
1093                _ => sql_bail!("{} is not an SSH connection", ssh_tunnel.name().item),
1094            }
1095        }
1096        (None, Some(aws_privatelink), None) => {
1097            Tunnel::AwsPrivatelink(plan_default_privatelink(scx, &aws_privatelink)?)
1098        }
1099        (None, None, Some(rules)) => {
1100            if rules.is_empty() {
1101                sql_bail!("BROKERS MATCHING rules list cannot be empty");
1102            }
1103
1104            let rules = rules
1105                .iter()
1106                .map(|rule| {
1107                    Ok(AwsPrivatelinkRule {
1108                        pattern: rule.pattern.clone(),
1109                        to: plan_privatelink(scx, &rule.tunnel)?,
1110                    })
1111                })
1112                .collect::<Result<Vec<_>, PlanError>>()?;
1113            Tunnel::AwsPrivatelinks(mz_storage_types::connections::AwsPrivatelinks { rules })
1114        }
1115        _ => {
1116            sql_bail!("cannot specify both SSH TUNNEL and AWS PRIVATELINK");
1117        }
1118    })
1119}