Skip to main content

mz_sql/plan/statement/ddl/
connection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data definition language (DDL) utilities for CONNECTION objects.
11
12use std::collections::{BTreeMap, BTreeSet};
13
14use anyhow::Context;
15use array_concat::concat_arrays;
16use itertools::Itertools;
17use maplit::btreemap;
18use mz_ore::num::NonNeg;
19use mz_ore::str::StrExt;
20use mz_repr::CatalogItemId;
21use mz_sql_parser::ast::ConnectionOptionName::*;
22use mz_sql_parser::ast::display::AstDisplay;
23use mz_sql_parser::ast::{
24    ConnectionDefaultAwsPrivatelink, ConnectionOption, ConnectionOptionName, CreateConnectionType,
25    KafkaBroker, KafkaBrokerAwsPrivatelinkOption, KafkaBrokerAwsPrivatelinkOptionName,
26    KafkaBrokerTunnel, KafkaMatchingBrokerRule,
27};
28use mz_ssh_util::keys::SshKeyPair;
29use mz_storage_types::connections::aws::{
30    AwsAssumeRole, AwsAuth, AwsConnection, AwsConnectionReference, AwsCredentials,
31};
32use mz_storage_types::connections::inline::ReferencedConnection;
33use mz_storage_types::connections::string_or_secret::StringOrSecret;
34use mz_storage_types::connections::{
35    AwsPrivatelink, AwsPrivatelinkConnection, AwsPrivatelinkRule, CsrConnection,
36    CsrConnectionHttpAuth, GlueSchemaRegistryConnection, IcebergCatalogConnection,
37    IcebergCatalogImpl, IcebergCatalogType, KafkaConnection, KafkaSaslConfig, KafkaTlsConfig,
38    KafkaTopicOptions, MySqlConnection, MySqlSslMode, PostgresConnection, RestIcebergCatalog,
39    S3TablesRestIcebergCatalog, SqlServerConnectionDetails, SshConnection, SshTunnel, TlsIdentity,
40    Tunnel,
41};
42
43use crate::names::Aug;
44use crate::plan::statement::{Connection, ResolvedItemName};
45use crate::plan::with_options::{self};
46use crate::plan::{ConnectionDetails, PlanError, SshKey, StatementContext};
47use crate::session::vars;
48
49generate_extracted_config!(
50    ConnectionOption,
51    (AccessKeyId, StringOrSecret),
52    (AssumeRoleArn, String),
53    (AssumeRoleSessionName, String),
54    (AvailabilityZones, Vec<String>),
55    (AwsConnection, with_options::Object),
56    (AwsPrivatelink, ConnectionDefaultAwsPrivatelink<Aug>),
57    (Broker, Vec<KafkaBroker<Aug>>),
58    (Brokers, with_options::BrokersList),
59    (Credential, StringOrSecret),
60    (Database, String),
61    (Endpoint, String),
62    (Host, String),
63    (Password, with_options::Secret),
64    (Port, u16),
65    (ProgressTopic, String),
66    (ProgressTopicReplicationFactor, i32),
67    (PublicKey1, String),
68    (PublicKey2, String),
69    (Region, String),
70    (Registry, String),
71    (SaslMechanisms, String),
72    (SaslPassword, with_options::Secret),
73    (SaslUsername, StringOrSecret),
74    (Scope, String),
75    (SecretAccessKey, with_options::Secret),
76    (SecurityProtocol, String),
77    (ServiceName, String),
78    (SshTunnel, with_options::Object),
79    (SslCertificate, StringOrSecret),
80    (SslCertificateAuthority, StringOrSecret),
81    (SslKey, with_options::Secret),
82    (SslMode, String),
83    (SessionToken, StringOrSecret),
84    (CatalogType, IcebergCatalogType),
85    (Url, String),
86    (User, StringOrSecret),
87    (Warehouse, String)
88);
89
90generate_extracted_config!(
91    KafkaBrokerAwsPrivatelinkOption,
92    (AvailabilityZone, String),
93    (Port, u16)
94);
95
96/// Options which cannot be changed using ALTER CONNECTION.
97pub(crate) const INALTERABLE_OPTIONS: &[ConnectionOptionName] =
98    &[ProgressTopic, ProgressTopicReplicationFactor];
99
100/// Options of which only one may be specified.
101pub(crate) const MUTUALLY_EXCLUSIVE_SETS: &[&[ConnectionOptionName]] = &[&[Broker, Brokers]];
102
103pub(super) fn validate_options_per_connection_type(
104    t: CreateConnectionType,
105    mut options: BTreeSet<ConnectionOptionName>,
106) -> Result<(), PlanError> {
107    use mz_sql_parser::ast::ConnectionOptionName::*;
108    let permitted_options = match t {
109        CreateConnectionType::Aws => [
110            AccessKeyId,
111            SecretAccessKey,
112            SessionToken,
113            Endpoint,
114            Region,
115            AssumeRoleArn,
116            AssumeRoleSessionName,
117        ]
118        .as_slice(),
119        CreateConnectionType::AwsPrivatelink => &[AvailabilityZones, Port, ServiceName],
120        CreateConnectionType::GlueSchemaRegistry => &[AwsConnection, Registry],
121        CreateConnectionType::Csr => &[
122            AwsPrivatelink,
123            Password,
124            Port,
125            SshTunnel,
126            SslCertificate,
127            SslCertificateAuthority,
128            SslKey,
129            Url,
130            User,
131        ],
132        CreateConnectionType::Kafka => &[
133            AwsConnection,
134            Broker,
135            Brokers,
136            ProgressTopic,
137            ProgressTopicReplicationFactor,
138            AwsPrivatelink,
139            SshTunnel,
140            SslKey,
141            SslCertificate,
142            SslCertificateAuthority,
143            SaslMechanisms,
144            SaslUsername,
145            SaslPassword,
146            SecurityProtocol,
147        ],
148        CreateConnectionType::Postgres => &[
149            AwsPrivatelink,
150            Database,
151            Host,
152            Password,
153            Port,
154            SshTunnel,
155            SslCertificate,
156            SslCertificateAuthority,
157            SslKey,
158            SslMode,
159            User,
160        ],
161        CreateConnectionType::Ssh => &[Host, Port, User, PublicKey1, PublicKey2],
162        CreateConnectionType::MySql => &[
163            AwsPrivatelink,
164            Host,
165            Password,
166            Port,
167            SshTunnel,
168            SslCertificate,
169            SslCertificateAuthority,
170            SslKey,
171            SslMode,
172            User,
173            AwsConnection,
174        ],
175        CreateConnectionType::SqlServer => &[
176            AwsPrivatelink,
177            Database,
178            Host,
179            Password,
180            Port,
181            SshTunnel,
182            SslCertificate,
183            SslCertificateAuthority,
184            SslKey,
185            SslMode,
186            User,
187        ],
188        CreateConnectionType::IcebergCatalog => &[
189            AwsConnection,
190            CatalogType,
191            Credential,
192            Scope,
193            Url,
194            Warehouse,
195        ],
196    };
197
198    for o in permitted_options {
199        options.remove(o);
200    }
201
202    if !options.is_empty() {
203        sql_bail!(
204            "{} connections do not support {} values",
205            t,
206            options.iter().join(", ")
207        )
208    }
209
210    Ok(())
211}
212
213impl ConnectionOptionExtracted {
214    pub(super) fn ensure_only_valid_options(
215        &self,
216        t: CreateConnectionType,
217    ) -> Result<(), PlanError> {
218        validate_options_per_connection_type(t, self.seen.clone())
219    }
220
221    pub fn try_into_connection_details(
222        self,
223        scx: &StatementContext,
224        connection_type: CreateConnectionType,
225    ) -> Result<ConnectionDetails, PlanError> {
226        self.ensure_only_valid_options(connection_type)?;
227
228        let connection: ConnectionDetails = match connection_type {
229            CreateConnectionType::Aws => {
230                let credentials = match (
231                    self.access_key_id,
232                    self.secret_access_key,
233                    self.session_token,
234                ) {
235                    (Some(access_key_id), Some(secret_access_key), session_token) => {
236                        Some(AwsCredentials {
237                            access_key_id,
238                            secret_access_key: secret_access_key.into(),
239                            session_token,
240                        })
241                    }
242                    (None, None, None) => None,
243                    _ => {
244                        sql_bail!(
245                            "must specify both ACCESS KEY ID and SECRET ACCESS KEY with optional SESSION TOKEN"
246                        );
247                    }
248                };
249
250                let assume_role = match (self.assume_role_arn, self.assume_role_session_name) {
251                    (Some(arn), session_name) => Some(AwsAssumeRole { arn, session_name }),
252                    (None, Some(_)) => {
253                        sql_bail!(
254                            "must specify ASSUME ROLE ARN with optional ASSUME ROLE SESSION NAME"
255                        );
256                    }
257                    _ => None,
258                };
259
260                let auth = match (credentials, assume_role) {
261                    (None, None) => sql_bail!(
262                        "must specify either ASSUME ROLE ARN or ACCESS KEY ID and SECRET ACCESS KEY"
263                    ),
264                    (Some(credentials), None) => AwsAuth::Credentials(credentials),
265                    (None, Some(assume_role)) => AwsAuth::AssumeRole(assume_role),
266                    (Some(_), Some(_)) => {
267                        sql_bail!("cannot specify both ACCESS KEY ID and ASSUME ROLE ARN");
268                    }
269                };
270
271                ConnectionDetails::Aws(AwsConnection {
272                    auth,
273                    endpoint: match self.endpoint {
274                        // TODO(benesch): this should not treat an empty endpoint as equivalent to a `NULL`
275                        // endpoint, but making that change now would break testdrive. AWS connections are
276                        // all behind feature flags mode right now, so no particular urgency to correct
277                        // this.
278                        Some(endpoint) if !endpoint.is_empty() => Some(endpoint),
279                        _ => None,
280                    },
281                    region: self.region,
282                })
283            }
284            CreateConnectionType::AwsPrivatelink => {
285                let connection = AwsPrivatelinkConnection {
286                    service_name: self
287                        .service_name
288                        .ok_or_else(|| sql_err!("SERVICE NAME option is required"))?,
289                    availability_zones: self
290                        .availability_zones
291                        .ok_or_else(|| sql_err!("AVAILABILITY ZONES option is required"))?,
292                };
293                if let Some(supported_azs) = scx.catalog.aws_privatelink_availability_zones() {
294                    let mut unique_azs: BTreeSet<String> = BTreeSet::new();
295                    let mut duplicate_azs: BTreeSet<String> = BTreeSet::new();
296                    // Validate each AZ is supported
297                    for connection_az in &connection.availability_zones {
298                        if unique_azs.contains(connection_az) {
299                            duplicate_azs.insert(connection_az.to_string());
300                        } else {
301                            unique_azs.insert(connection_az.to_string());
302                        }
303                        if !supported_azs.contains(connection_az) {
304                            return Err(PlanError::InvalidPrivatelinkAvailabilityZone {
305                                name: connection_az.to_string(),
306                                supported_azs,
307                            });
308                        }
309                    }
310                    if duplicate_azs.len() > 0 {
311                        return Err(PlanError::DuplicatePrivatelinkAvailabilityZone {
312                            duplicate_azs,
313                        });
314                    }
315                }
316                ConnectionDetails::AwsPrivatelink(connection)
317            }
318            CreateConnectionType::Kafka => {
319                let (tls, sasl) = plan_kafka_security(scx, &self)?;
320                let (static_brokers, matching_rules) = self.get_brokers_and_rules(scx)?;
321
322                if !matching_rules.is_empty() {
323                    scx.require_feature_flag(&vars::ENABLE_KAFKA_BROKER_MATCHING_RULES)?;
324                }
325
326                ConnectionDetails::Kafka(KafkaConnection {
327                    brokers: static_brokers,
328                    default_tunnel: build_tunnel_definition(
329                            scx,
330                            self.ssh_tunnel,
331                            self.aws_privatelink,
332                            if matching_rules.is_empty() { None } else { Some(matching_rules) },
333                        )?,
334                    progress_topic: self.progress_topic,
335                    progress_topic_options: KafkaTopicOptions {
336                        // We only allow configuring the progress topic replication factor for now.
337                        // For correctness, the partition count MUST be one and for performance the compaction
338                        // policy MUST be enabled.
339                        partition_count: Some(NonNeg::try_from(1).expect("1 is positive")),
340                        replication_factor: self.progress_topic_replication_factor.map(|val| {
341                            if val <= 0 {
342                                Err(sql_err!("invalid CONNECTION: PROGRESS TOPIC REPLICATION FACTOR must be greater than 0"))?
343                            }
344                            NonNeg::try_from(val).map_err(|e| sql_err!("{e}"))
345                        }).transpose()?,
346                        topic_config: btreemap! {
347                            "cleanup.policy".to_string() => "compact".to_string(),
348                            "segment.bytes".to_string() => "134217728".to_string(), // 128 MiB
349                        },
350                    },
351                    options: BTreeMap::new(),
352                    tls,
353                    sasl,
354                })
355            }
356            CreateConnectionType::Csr => {
357                let url: reqwest::Url = match self.url {
358                    Some(url) => url
359                        .parse()
360                        .map_err(|e| sql_err!("parsing schema registry url: {e}"))?,
361                    None => sql_bail!("invalid CONNECTION: must specify URL"),
362                };
363                let _ = url
364                    .host_str()
365                    .ok_or_else(|| sql_err!("invalid CONNECTION: URL must specify domain name"))?;
366                if url.path() != "/" {
367                    sql_bail!("invalid CONNECTION: URL must have an empty path");
368                }
369                let cert = self.ssl_certificate;
370                let key = self.ssl_key.map(|secret| secret.into());
371                let tls_identity = match (cert, key) {
372                    (None, None) => None,
373                    (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
374                    _ => sql_bail!(
375                        "invalid CONNECTION: reading from SSL-auth Confluent Schema Registry requires both SSL KEY and SSL CERTIFICATE"
376                    ),
377                };
378                let http_auth = self.user.map(|username| CsrConnectionHttpAuth {
379                    username,
380                    password: self.password.map(|secret| secret.into()),
381                });
382
383                // TODO we should move to self.port being unsupported if aws_privatelink is some, see <https://github.com/MaterializeInc/database-issues/issues/7359#issuecomment-1925443977>
384                if let Some(privatelink) = self.aws_privatelink.as_ref() {
385                    if privatelink.port.is_some() {
386                        sql_bail!(
387                            "invalid CONNECTION: CONFLUENT SCHEMA REGISTRY does not support PORT for AWS PRIVATELINK"
388                        )
389                    }
390                }
391                let tunnel = build_tunnel_definition(
392                    scx,
393                    self.ssh_tunnel,
394                    self.aws_privatelink,
395                    None, /* Rule-based PrivateLink is not supported for CSR. */
396                )?;
397
398                ConnectionDetails::Csr(CsrConnection {
399                    url,
400                    tls_root_cert: self.ssl_certificate_authority,
401                    tls_identity,
402                    http_auth,
403                    tunnel,
404                })
405            }
406            CreateConnectionType::GlueSchemaRegistry => {
407                scx.require_feature_flag(&vars::ENABLE_GLUE_SCHEMA_REGISTRY)?;
408
409                let aws_connection = get_aws_connection_reference(scx, &self)?
410                    .ok_or_else(|| sql_err!("AWS CONNECTION option is required"))?;
411                let registry_name = self
412                    .registry
413                    .ok_or_else(|| sql_err!("REGISTRY option is required"))?;
414                if registry_name.is_empty() {
415                    sql_bail!("invalid CONNECTION: REGISTRY must not be empty");
416                }
417
418                ConnectionDetails::GlueSchemaRegistry(GlueSchemaRegistryConnection {
419                    aws_connection,
420                    registry_name,
421                })
422            }
423            CreateConnectionType::Postgres => {
424                let cert = self.ssl_certificate;
425                let key = self.ssl_key.map(|secret| secret.into());
426                let tls_identity = match (cert, key) {
427                    (None, None) => None,
428                    (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
429                    _ => sql_bail!(
430                        "invalid CONNECTION: both SSL KEY and SSL CERTIFICATE are required"
431                    ),
432                };
433                let tls_mode = match self.ssl_mode.as_ref().map(|m| m.as_str()) {
434                    None | Some("disable") => tokio_postgres::config::SslMode::Disable,
435                    // "prefer" intentionally omitted because it has dubious security
436                    // properties.
437                    Some("require") | Some("required") => tokio_postgres::config::SslMode::Require,
438                    Some("verify_ca") | Some("verify-ca") => {
439                        tokio_postgres::config::SslMode::VerifyCa
440                    }
441                    Some("verify_full") | Some("verify-full") => {
442                        tokio_postgres::config::SslMode::VerifyFull
443                    }
444                    Some(m) => sql_bail!("invalid CONNECTION: unknown SSL MODE {}", m.quoted()),
445                };
446
447                // TODO we should move to self.port being unsupported if aws_privatelink is some, see <https://github.com/MaterializeInc/database-issues/issues/7359#issuecomment-1925443977>
448                if let Some(privatelink) = self.aws_privatelink.as_ref() {
449                    if privatelink.port.is_some() {
450                        sql_bail!(
451                            "invalid CONNECTION: POSTGRES does not support PORT for AWS PRIVATELINK"
452                        )
453                    }
454                }
455                let tunnel = build_tunnel_definition(
456                    scx,
457                    self.ssh_tunnel,
458                    self.aws_privatelink,
459                    None, /* Rule-based PrivateLink is not supported for Postgres. */
460                )?;
461
462                ConnectionDetails::Postgres(PostgresConnection {
463                    database: self
464                        .database
465                        .ok_or_else(|| sql_err!("DATABASE option is required"))?,
466                    password: self.password.map(|password| password.into()),
467                    host: self
468                        .host
469                        .ok_or_else(|| sql_err!("HOST option is required"))?,
470                    port: self.port.unwrap_or(5432_u16),
471                    tunnel,
472                    tls_mode,
473                    tls_root_cert: self.ssl_certificate_authority,
474                    tls_identity,
475                    user: self
476                        .user
477                        .ok_or_else(|| sql_err!("USER option is required"))?,
478                })
479            }
480            CreateConnectionType::Ssh => {
481                let ensure_key = |public_key| match public_key {
482                    Some(public_key) => Ok::<_, anyhow::Error>(SshKey::PublicOnly(public_key)),
483                    None => {
484                        let key = SshKeyPair::new().context("creating SSH key")?;
485                        Ok(SshKey::Both(key))
486                    }
487                };
488                ConnectionDetails::Ssh {
489                    connection: SshConnection {
490                        host: self
491                            .host
492                            .ok_or_else(|| sql_err!("HOST option is required"))?,
493                        port: self.port.unwrap_or(22_u16),
494                        user: match self
495                            .user
496                            .ok_or_else(|| sql_err!("USER option is required"))?
497                        {
498                            StringOrSecret::String(user) => user,
499                            StringOrSecret::Secret(_) => {
500                                sql_bail!(
501                                    "SSH connections do not support supplying USER value as SECRET"
502                                )
503                            }
504                        },
505                    },
506                    key_1: ensure_key(self.public_key1)?,
507                    key_2: ensure_key(self.public_key2)?,
508                }
509            }
510            CreateConnectionType::MySql => {
511                let aws_connection = get_aws_connection_reference(scx, &self)?;
512                if aws_connection.is_some() && self.password.is_some() {
513                    sql_bail!(
514                        "invalid CONNECTION: AWS IAM authentication is not supported with password"
515                    );
516                }
517
518                let cert = self.ssl_certificate;
519                let key = self.ssl_key.map(|secret| secret.into());
520                let tls_identity = match (cert, key) {
521                    (None, None) => None,
522                    (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
523                    _ => sql_bail!(
524                        "invalid CONNECTION: both SSL KEY and SSL CERTIFICATE are required"
525                    ),
526                };
527                // Accepts the same SSL Mode values as the MySQL Client
528                // https://dev.mysql.com/doc/refman/8.0/en/connection-options.html#option_general_ssl-mode
529                let tls_mode = match self
530                    .ssl_mode
531                    .map(|f| f.to_uppercase())
532                    .as_ref()
533                    .map(|m| m.as_str())
534                {
535                    None | Some("DISABLED") => {
536                        if aws_connection.is_some() {
537                            sql_bail!(
538                                "invalid CONNECTION: AWS IAM authentication requires SSL to be enabled"
539                            )
540                        }
541                        MySqlSslMode::Disabled
542                    }
543                    // "preferred" intentionally omitted because it has dubious security
544                    // properties.
545                    Some("REQUIRED") | Some("REQUIRE") => MySqlSslMode::Required,
546                    Some("VERIFY_CA") | Some("VERIFY-CA") => MySqlSslMode::VerifyCa,
547                    Some("VERIFY_IDENTITY") | Some("VERIFY-IDENTITY") => {
548                        MySqlSslMode::VerifyIdentity
549                    }
550                    Some(m) => sql_bail!("invalid CONNECTION: unknown SSL MODE {}", m.quoted()),
551                };
552
553                // TODO we should move to self.port being unsupported if aws_privatelink is some, see <https://github.com/MaterializeInc/database-issues/issues/7359#issuecomment-1925443977>
554                if let Some(privatelink) = self.aws_privatelink.as_ref() {
555                    if privatelink.port.is_some() {
556                        sql_bail!(
557                            "invalid CONNECTION: MYSQL does not support PORT for AWS PRIVATELINK"
558                        )
559                    }
560                }
561                let tunnel = build_tunnel_definition(
562                    scx,
563                    self.ssh_tunnel,
564                    self.aws_privatelink,
565                    None, /* Rule-based PrivateLink is not supported for MySQL. */
566                )?;
567
568                ConnectionDetails::MySql(MySqlConnection {
569                    password: self.password.map(|password| password.into()),
570                    host: self
571                        .host
572                        .ok_or_else(|| sql_err!("HOST option is required"))?,
573                    port: self.port.unwrap_or(3306_u16),
574                    tunnel,
575                    tls_mode,
576                    tls_root_cert: self.ssl_certificate_authority,
577                    tls_identity,
578                    user: self
579                        .user
580                        .ok_or_else(|| sql_err!("USER option is required"))?,
581                    aws_connection,
582                })
583            }
584            CreateConnectionType::SqlServer => {
585                let aws_connection = get_aws_connection_reference(scx, &self)?;
586                if aws_connection.is_some() && self.password.is_some() {
587                    sql_bail!(
588                        "invalid CONNECTION: AWS IAM authentication is not supported with password"
589                    );
590                }
591
592                let (encryption, certificate_validation_policy) = match self
593                    .ssl_mode
594                    .map(|mode| mode.to_uppercase())
595                    .as_ref()
596                    .map(|mode| mode.as_str())
597                {
598                    None | Some("DISABLED") => (
599                        mz_sql_server_util::config::EncryptionLevel::None,
600                        mz_sql_server_util::config::CertificateValidationPolicy::TrustAll,
601                    ),
602                    Some("REQUIRED") => (
603                        mz_sql_server_util::config::EncryptionLevel::Required,
604                        mz_sql_server_util::config::CertificateValidationPolicy::TrustAll,
605                    ),
606                    Some("VERIFY") => (
607                        mz_sql_server_util::config::EncryptionLevel::Required,
608                        mz_sql_server_util::config::CertificateValidationPolicy::VerifySystem,
609                    ),
610                    Some("VERIFY_CA") => {
611                        if self.ssl_certificate_authority.is_none() {
612                            sql_bail!(
613                                "invalid CONNECTION: SSL MODE 'verify_ca' requires SSL CERTIFICATE AUTHORITY"
614                            );
615                        }
616                        (
617                            mz_sql_server_util::config::EncryptionLevel::Required,
618                            mz_sql_server_util::config::CertificateValidationPolicy::VerifyCA,
619                        )
620                    }
621                    Some(mode) => {
622                        sql_bail!("invalid CONNECTION: unknown SSL MODE {}", mode.quoted())
623                    }
624                };
625
626                if let Some(privatelink) = self.aws_privatelink.as_ref() {
627                    if privatelink.port.is_some() {
628                        sql_bail!(
629                            "invalid CONNECTION: SQL SERVER does not support PORT for AWS PRIVATELINK"
630                        )
631                    }
632                }
633
634                // 1433 is the default port for SQL Server instances running over TCP.
635                //
636                // See: <https://learn.microsoft.com/en-us/sql/database-engine/configure-windows/configure-a-server-to-listen-on-a-specific-tcp-port?view=sql-server-ver16>
637                let port = self.port.unwrap_or(1433_u16);
638                let tunnel = build_tunnel_definition(
639                    scx,
640                    self.ssh_tunnel,
641                    self.aws_privatelink,
642                    None, /* Rule-based PrivateLink is not supported for SQL Server. */
643                )?;
644
645                ConnectionDetails::SqlServer(SqlServerConnectionDetails {
646                    host: self
647                        .host
648                        .ok_or_else(|| sql_err!("HOST option is required"))?,
649                    port,
650                    database: self
651                        .database
652                        .ok_or_else(|| sql_err!("DATABASE option is required"))?,
653                    user: self
654                        .user
655                        .ok_or_else(|| sql_err!("USER option is required"))?,
656                    password: self
657                        .password
658                        .ok_or_else(|| sql_err!("PASSWORD option is required"))
659                        .map(|pass| pass.into())?,
660                    tunnel,
661                    encryption,
662                    certificate_validation_policy,
663                    tls_root_cert: self.ssl_certificate_authority,
664                })
665            }
666            CreateConnectionType::IcebergCatalog => {
667                let catalog_type = self.catalog_type.clone().ok_or_else(|| {
668                    sql_err!("invalid CONNECTION: ICEBERG connections must specify CATALOG TYPE")
669                })?;
670
671                let uri: reqwest::Url = match &self.url {
672                    Some(url) => url
673                        .parse()
674                        .map_err(|e| sql_err!("parsing Iceberg catalog url: {e}"))?,
675                    None => sql_bail!("invalid CONNECTION: must specify URL"),
676                };
677
678                let warehouse = self.warehouse.clone();
679                let credential = self.credential.clone();
680                let aws_connection = get_aws_connection_reference(scx, &self)?;
681
682                let catalog = match catalog_type {
683                    IcebergCatalogType::S3TablesRest => {
684                        let Some(warehouse) = warehouse else {
685                            sql_bail!(
686                                "invalid CONNECTION: ICEBERG s3tablesrest connections must specify WAREHOUSE"
687                            );
688                        };
689                        let Some(aws_connection) = aws_connection else {
690                            sql_bail!(
691                                "invalid CONNECTION: ICEBERG s3tablesrest connections require an AWS connection"
692                            );
693                        };
694
695                        IcebergCatalogImpl::S3TablesRest(S3TablesRestIcebergCatalog {
696                            aws_connection,
697                            warehouse,
698                        })
699                    }
700                    IcebergCatalogType::Rest => {
701                        let Some(credential) = credential else {
702                            sql_bail!(
703                                "invalid CONNECTION: ICEBERG rest connections require a CREDENTIAL"
704                            );
705                        };
706
707                        IcebergCatalogImpl::Rest(RestIcebergCatalog {
708                            credential,
709                            scope: self.scope.clone(),
710                            warehouse,
711                        })
712                    }
713                };
714
715                ConnectionDetails::IcebergCatalog(IcebergCatalogConnection { catalog, uri })
716            }
717        };
718
719        Ok(connection)
720    }
721
722    pub fn get_brokers_and_rules(
723        &self,
724        scx: &StatementContext,
725    ) -> Result<
726        (
727            Vec<mz_storage_types::connections::KafkaBroker<ReferencedConnection>>,
728            Vec<KafkaMatchingBrokerRule<Aug>>,
729        ),
730        PlanError,
731    > {
732        // Collect all static broker sources and matching rules.
733        let mut all_brokers: Vec<KafkaBroker<Aug>> = vec![];
734        let mut matching_rules: Vec<KafkaMatchingBrokerRule<Aug>> = vec![];
735
736        // Check exclusions and extract brokers
737        match (&self.broker, &self.brokers, &self.aws_privatelink) {
738            // BROKER
739            (Some(broker), None, None) => all_brokers.extend(broker.iter().cloned()),
740            // BROKERS
741            (None, Some(broker_list), None) => {
742                all_brokers.extend(broker_list.static_entries.iter().cloned());
743                matching_rules.extend(broker_list.matching_rules.iter().cloned());
744            }
745            // AWS PRIVATELINK
746            (None, None, Some(_privatelink)) => {
747                // Noting to do here
748            }
749            // noting - invalid
750            (None, None, None) => {
751                sql_bail!("invalid CONNECTION: must set one of BROKER, BROKERS, or AWS PRIVATELINK")
752            }
753            // exclusive keywords provided
754            _ => sql_bail!(
755                "invalid CONNECTION: can only set one of BROKER, BROKERS, or AWS PRIVATELINK"
756            ),
757        };
758
759        // MATCHING rules require at least one static broker for bootstrapping.
760        if !matching_rules.is_empty() && all_brokers.is_empty() {
761            sql_bail!(
762                "invalid CONNECTION: BROKERS must contain at least one static broker address"
763            );
764        }
765
766        // NOTE: we allow broker configurations to be mixed and matched. If/when we support
767        // a top-level `SSH TUNNEL` configuration, we will need additional assertions.
768        let mut out = vec![];
769        for broker in &all_brokers {
770            if broker.address.contains(',') {
771                sql_bail!(
772                    "invalid CONNECTION: cannot specify multiple Kafka broker addresses in one string.\n\nInstead, specify BROKERS using multiple strings, e.g. BROKERS ('kafka:9092', 'kafka:9093')"
773                );
774            }
775
776            let tunnel = match &broker.tunnel {
777                KafkaBrokerTunnel::Direct => Tunnel::Direct,
778                KafkaBrokerTunnel::AwsPrivatelink(aws_privatelink) => {
779                    Tunnel::AwsPrivatelink(plan_privatelink(scx, aws_privatelink)?)
780                }
781                KafkaBrokerTunnel::SshTunnel(ssh) => {
782                    let id = match &ssh {
783                        ResolvedItemName::Item { id, .. } => id,
784                        _ => sql_bail!(
785                            "internal error: Kafka SSH tunnel connection was not resolved"
786                        ),
787                    };
788                    let ssh_tunnel = scx.catalog.get_item(id);
789                    match ssh_tunnel.connection()? {
790                        Connection::Ssh(_connection) => Tunnel::Ssh(SshTunnel {
791                            connection_id: *id,
792                            connection: *id,
793                        }),
794                        _ => {
795                            sql_bail!("{} is not an SSH connection", ssh_tunnel.name().item)
796                        }
797                    }
798                }
799            };
800
801            out.push(mz_storage_types::connections::KafkaBroker {
802                address: broker.address.clone(),
803                tunnel,
804            });
805        }
806
807        Ok((out, matching_rules))
808    }
809}
810
811fn get_aws_connection_reference(
812    scx: &StatementContext,
813    conn_options: &ConnectionOptionExtracted,
814) -> Result<Option<AwsConnectionReference<ReferencedConnection>>, PlanError> {
815    let Some(aws_connection_id) = conn_options.aws_connection else {
816        return Ok(None);
817    };
818
819    let id = CatalogItemId::from(aws_connection_id);
820    let item = scx.catalog.get_item(&id);
821    Ok(match item.connection()? {
822        Connection::Aws(_) => Some(AwsConnectionReference {
823            connection_id: id,
824            connection: id,
825        }),
826        _ => sql_bail!("{} is not an AWS connection", item.name().item),
827    })
828}
829
830fn plan_kafka_security(
831    scx: &StatementContext,
832    v: &ConnectionOptionExtracted,
833) -> Result<
834    (
835        Option<KafkaTlsConfig>,
836        Option<KafkaSaslConfig<ReferencedConnection>>,
837    ),
838    PlanError,
839> {
840    const SASL_CONFIGS: [ConnectionOptionName; 4] = [
841        ConnectionOptionName::AwsConnection,
842        ConnectionOptionName::SaslMechanisms,
843        ConnectionOptionName::SaslUsername,
844        ConnectionOptionName::SaslPassword,
845    ];
846
847    const ALL_CONFIGS: [ConnectionOptionName; 7] = concat_arrays!(
848        [
849            ConnectionOptionName::SslKey,
850            ConnectionOptionName::SslCertificate,
851            ConnectionOptionName::SslCertificateAuthority,
852        ],
853        SASL_CONFIGS
854    );
855
856    enum SecurityProtocol {
857        Plaintext,
858        Ssl,
859        SaslPlaintext,
860        SaslSsl,
861    }
862
863    let security_protocol = v.security_protocol.as_ref().map(|v| v.to_uppercase());
864    let security_protocol = match security_protocol.as_deref() {
865        Some("PLAINTEXT") => SecurityProtocol::Plaintext,
866        Some("SSL") => SecurityProtocol::Ssl,
867        Some("SASL_PLAINTEXT") => SecurityProtocol::SaslPlaintext,
868        Some("SASL_SSL") => SecurityProtocol::SaslSsl,
869        Some(p) => sql_bail!("unknown security protocol: {}", p),
870        // To be secure by default, if no security protocol is explicitly
871        // specified, we always choose one of the SSL-enabled protocols, using
872        // the presence of any SASL options to guide us between them. Users must
873        // explicitly choose a plaintext mechanism if that's what they want.
874        None if SASL_CONFIGS.iter().any(|c| v.seen.contains(c)) => SecurityProtocol::SaslSsl,
875        None => SecurityProtocol::Ssl,
876    };
877
878    let mut outstanding = ALL_CONFIGS
879        .into_iter()
880        .filter(|c| v.seen.contains(c))
881        .collect::<BTreeSet<ConnectionOptionName>>();
882
883    let tls = match security_protocol {
884        SecurityProtocol::Ssl | SecurityProtocol::SaslSsl => {
885            outstanding.remove(&ConnectionOptionName::SslCertificate);
886            let identity = match &v.ssl_certificate {
887                None => None,
888                Some(cert) => {
889                    outstanding.remove(&ConnectionOptionName::SslKey);
890                    let Some(key) = &v.ssl_key else {
891                        sql_bail!("SSL KEY must be specified with SSL CERTIFICATE");
892                    };
893                    Some(TlsIdentity {
894                        cert: cert.clone(),
895                        key: (*key).into(),
896                    })
897                }
898            };
899            outstanding.remove(&ConnectionOptionName::SslCertificateAuthority);
900            Some(KafkaTlsConfig {
901                identity,
902                root_cert: v.ssl_certificate_authority.clone(),
903            })
904        }
905        _ => None,
906    };
907
908    let sasl = match security_protocol {
909        SecurityProtocol::SaslPlaintext | SecurityProtocol::SaslSsl => {
910            outstanding.remove(&ConnectionOptionName::AwsConnection);
911            match get_aws_connection_reference(scx, v)? {
912                Some(aws) => Some(KafkaSaslConfig {
913                    mechanism: "OAUTHBEARER".into(),
914                    username: "".into(),
915                    password: None,
916                    aws: Some(aws),
917                }),
918                None => {
919                    outstanding.remove(&ConnectionOptionName::SaslMechanisms);
920                    outstanding.remove(&ConnectionOptionName::SaslUsername);
921                    outstanding.remove(&ConnectionOptionName::SaslPassword);
922                    // TODO(benesch): support a less confusing `SASL MECHANISM`
923                    // alias, as only a single mechanism that can be specified.
924                    let Some(mechanism) = &v.sasl_mechanisms else {
925                        sql_bail!("SASL MECHANISMS must be specified");
926                    };
927                    let Some(username) = &v.sasl_username else {
928                        sql_bail!("SASL USERNAME must be specified");
929                    };
930                    let Some(password) = &v.sasl_password else {
931                        sql_bail!("SASL PASSWORD must be specified");
932                    };
933                    Some(KafkaSaslConfig {
934                        // librdkafka requires SASL mechanisms to be upper case (PLAIN,
935                        // SCRAM-SHA-256). For usability, we automatically uppercase the
936                        // mechanism that user provides. This avoids a frustrating
937                        // interaction with identifier case folding. Consider `SASL
938                        // MECHANISMS = PLAIN`. Identifier case folding results in a
939                        // SASL mechanism of `plain` (note the lowercase), which
940                        // Materialize previously rejected with an error of "SASL
941                        // mechanism must be uppercase." This was deeply frustarting for
942                        // users who were not familiar with identifier case folding
943                        // rules. See database-issues#6693.
944                        mechanism: mechanism.to_uppercase(),
945                        username: username.clone(),
946                        password: Some((*password).into()),
947                        aws: None,
948                    })
949                }
950            }
951        }
952        _ => None,
953    };
954
955    if let Some(outstanding) = outstanding.first() {
956        sql_bail!("option {outstanding} not supported with this configuration");
957    }
958
959    Ok((tls, sasl))
960}
961pub fn plan_default_privatelink(
962    scx: &StatementContext,
963    pl: &mz_sql_parser::ast::ConnectionDefaultAwsPrivatelink<Aug>,
964) -> Result<AwsPrivatelink, PlanError> {
965    let id = pl.connection.item_id().clone();
966    let entry = scx.catalog.get_item(&id);
967    match entry.connection()? {
968        Connection::AwsPrivatelink(_) => Ok(AwsPrivatelink {
969            connection_id: id,
970            // By default we do not specify an availability zone for the tunnel.
971            availability_zone: None,
972            // We always use the port as specified by the top-level connection.
973            port: pl.port,
974        }),
975        _ => sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item),
976    }
977}
978
979pub fn plan_privatelink(
980    scx: &StatementContext,
981    pl: &mz_sql_parser::ast::KafkaBrokerAwsPrivatelink<Aug>,
982) -> Result<AwsPrivatelink, PlanError> {
983    let KafkaBrokerAwsPrivatelinkOptionExtracted {
984        availability_zone,
985        port,
986        seen: _,
987    } = KafkaBrokerAwsPrivatelinkOptionExtracted::try_from(pl.options.clone())?;
988
989    let id = match &pl.connection {
990        ResolvedItemName::Item { id, .. } => id,
991        _ => sql_bail!("internal error: Kafka PrivateLink connection was not resolved"),
992    };
993    let entry = scx.catalog.get_item(id);
994    match entry.connection()? {
995        Connection::AwsPrivatelink(connection) => {
996            if let Some(az) = &availability_zone {
997                if !connection.availability_zones.contains(az) {
998                    sql_bail!(
999                        "AWS PrivateLink availability zone {} does not match any of the \
1000                                      availability zones on the AWS PrivateLink connection {}",
1001                        az.quoted(),
1002                        scx.catalog
1003                            .resolve_full_name(entry.name())
1004                            .to_string()
1005                            .quoted()
1006                    )
1007                }
1008            }
1009            Ok(AwsPrivatelink {
1010                connection_id: *id,
1011                availability_zone,
1012                port,
1013            })
1014        }
1015        _ => {
1016            sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item)
1017        }
1018    }
1019}
1020
1021pub(crate) fn build_tunnel_definition(
1022    scx: &StatementContext,
1023    ssh_tunnel: Option<with_options::Object>,
1024    aws_privatelink: Option<ConnectionDefaultAwsPrivatelink<Aug>>,
1025    matching_rules: Option<Vec<KafkaMatchingBrokerRule<Aug>>>,
1026) -> Result<Tunnel<ReferencedConnection>, PlanError> {
1027    Ok(match (ssh_tunnel, aws_privatelink, matching_rules) {
1028        (None, None, None) => Tunnel::Direct,
1029        (Some(ssh_tunnel), None, None) => {
1030            let id = CatalogItemId::from(ssh_tunnel);
1031            let ssh_tunnel = scx.catalog.get_item(&id);
1032            match ssh_tunnel.connection()? {
1033                Connection::Ssh(_connection) => Tunnel::Ssh(SshTunnel {
1034                    connection_id: id,
1035                    connection: id,
1036                }),
1037                _ => sql_bail!("{} is not an SSH connection", ssh_tunnel.name().item),
1038            }
1039        }
1040        (None, Some(aws_privatelink), None) => {
1041            Tunnel::AwsPrivatelink(plan_default_privatelink(scx, &aws_privatelink)?)
1042        }
1043        (None, None, Some(rules)) => {
1044            if rules.is_empty() {
1045                sql_bail!("BROKERS MATCHING rules list cannot be empty");
1046            }
1047
1048            let rules = rules
1049                .iter()
1050                .map(|rule| {
1051                    Ok(AwsPrivatelinkRule {
1052                        pattern: rule.pattern.clone(),
1053                        to: plan_privatelink(scx, &rule.tunnel)?,
1054                    })
1055                })
1056                .collect::<Result<Vec<_>, PlanError>>()?;
1057            Tunnel::AwsPrivatelinks(mz_storage_types::connections::AwsPrivatelinks { rules })
1058        }
1059        _ => {
1060            sql_bail!("cannot specify both SSH TUNNEL and AWS PRIVATELINK");
1061        }
1062    })
1063}