mz_sql/plan/statement/ddl/
connection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data definition language (DDL) utilities for CONNECTION objects.
11
12use std::collections::{BTreeMap, BTreeSet};
13
14use anyhow::Context;
15use array_concat::concat_arrays;
16use itertools::Itertools;
17use maplit::btreemap;
18use mz_ore::num::NonNeg;
19use mz_ore::str::StrExt;
20use mz_postgres_util::tunnel::PostgresFlavor;
21use mz_repr::CatalogItemId;
22use mz_sql_parser::ast::ConnectionOptionName::*;
23use mz_sql_parser::ast::display::AstDisplay;
24use mz_sql_parser::ast::{
25    ConnectionDefaultAwsPrivatelink, ConnectionOption, ConnectionOptionName, CreateConnectionType,
26    KafkaBroker, KafkaBrokerAwsPrivatelinkOption, KafkaBrokerAwsPrivatelinkOptionName,
27    KafkaBrokerTunnel,
28};
29use mz_ssh_util::keys::SshKeyPair;
30use mz_storage_types::connections::aws::{
31    AwsAssumeRole, AwsAuth, AwsConnection, AwsConnectionReference, AwsCredentials,
32};
33use mz_storage_types::connections::inline::ReferencedConnection;
34use mz_storage_types::connections::string_or_secret::StringOrSecret;
35use mz_storage_types::connections::{
36    AwsPrivatelink, AwsPrivatelinkConnection, CsrConnection, CsrConnectionHttpAuth,
37    KafkaConnection, KafkaSaslConfig, KafkaTlsConfig, KafkaTopicOptions, MySqlConnection,
38    MySqlSslMode, PostgresConnection, SqlServerConnectionDetails, SshConnection, SshTunnel,
39    TlsIdentity, Tunnel,
40};
41
42use crate::names::Aug;
43use crate::plan::statement::{Connection, ResolvedItemName};
44use crate::plan::with_options;
45use crate::plan::{ConnectionDetails, PlanError, SshKey, StatementContext};
46use crate::session::vars::{self, ENABLE_AWS_MSK_IAM_AUTH};
47
48generate_extracted_config!(
49    ConnectionOption,
50    (AccessKeyId, StringOrSecret),
51    (AssumeRoleArn, String),
52    (AssumeRoleSessionName, String),
53    (AvailabilityZones, Vec<String>),
54    (AwsConnection, with_options::Object),
55    (AwsPrivatelink, ConnectionDefaultAwsPrivatelink<Aug>),
56    // (AwsPrivatelink, with_options::Object),
57    (Broker, Vec<KafkaBroker<Aug>>),
58    (Brokers, Vec<KafkaBroker<Aug>>),
59    (Database, String),
60    (Endpoint, String),
61    (Host, String),
62    (Password, with_options::Secret),
63    (Port, u16),
64    (ProgressTopic, String),
65    (ProgressTopicReplicationFactor, i32),
66    (PublicKey1, String),
67    (PublicKey2, String),
68    (Region, String),
69    (SaslMechanisms, String),
70    (SaslPassword, with_options::Secret),
71    (SaslUsername, StringOrSecret),
72    (SecretAccessKey, with_options::Secret),
73    (SecurityProtocol, String),
74    (ServiceName, String),
75    (SshTunnel, with_options::Object),
76    (SslCertificate, StringOrSecret),
77    (SslCertificateAuthority, StringOrSecret),
78    (SslKey, with_options::Secret),
79    (SslMode, String),
80    (SessionToken, StringOrSecret),
81    (Url, String),
82    (User, StringOrSecret)
83);
84
85generate_extracted_config!(
86    KafkaBrokerAwsPrivatelinkOption,
87    (AvailabilityZone, String),
88    (Port, u16)
89);
90
91/// Options which cannot be changed using ALTER CONNECTION.
92pub(crate) const INALTERABLE_OPTIONS: &[ConnectionOptionName] =
93    &[ProgressTopic, ProgressTopicReplicationFactor];
94
95/// Options of which only one may be specified.
96pub(crate) const MUTUALLY_EXCLUSIVE_SETS: &[&[ConnectionOptionName]] = &[&[Broker, Brokers]];
97
98pub(super) fn validate_options_per_connection_type(
99    t: CreateConnectionType,
100    mut options: BTreeSet<ConnectionOptionName>,
101) -> Result<(), PlanError> {
102    use mz_sql_parser::ast::ConnectionOptionName::*;
103    let permitted_options = match t {
104        CreateConnectionType::Aws => [
105            AccessKeyId,
106            SecretAccessKey,
107            SessionToken,
108            Endpoint,
109            Region,
110            AssumeRoleArn,
111            AssumeRoleSessionName,
112        ]
113        .as_slice(),
114        CreateConnectionType::AwsPrivatelink => &[AvailabilityZones, Port, ServiceName],
115        CreateConnectionType::Csr => &[
116            AwsPrivatelink,
117            Password,
118            Port,
119            SshTunnel,
120            SslCertificate,
121            SslCertificateAuthority,
122            SslKey,
123            Url,
124            User,
125        ],
126        CreateConnectionType::Kafka => &[
127            AwsConnection,
128            Broker,
129            Brokers,
130            ProgressTopic,
131            ProgressTopicReplicationFactor,
132            AwsPrivatelink,
133            SshTunnel,
134            SslKey,
135            SslCertificate,
136            SslCertificateAuthority,
137            SaslMechanisms,
138            SaslUsername,
139            SaslPassword,
140            SecurityProtocol,
141        ],
142        CreateConnectionType::Postgres | CreateConnectionType::Yugabyte => &[
143            AwsPrivatelink,
144            Database,
145            Host,
146            Password,
147            Port,
148            SshTunnel,
149            SslCertificate,
150            SslCertificateAuthority,
151            SslKey,
152            SslMode,
153            User,
154        ],
155        CreateConnectionType::Ssh => &[Host, Port, User, PublicKey1, PublicKey2],
156        CreateConnectionType::MySql => &[
157            AwsPrivatelink,
158            Host,
159            Password,
160            Port,
161            SshTunnel,
162            SslCertificate,
163            SslCertificateAuthority,
164            SslKey,
165            SslMode,
166            User,
167            AwsConnection,
168        ],
169        CreateConnectionType::SqlServer => &[
170            AwsPrivatelink,
171            Database,
172            Host,
173            Password,
174            Port,
175            SshTunnel,
176            SslCertificate,
177            SslCertificateAuthority,
178            SslKey,
179            SslMode,
180            User,
181        ],
182    };
183
184    for o in permitted_options {
185        options.remove(o);
186    }
187
188    if !options.is_empty() {
189        sql_bail!(
190            "{} connections do not support {} values",
191            t,
192            options.iter().join(", ")
193        )
194    }
195
196    Ok(())
197}
198
199impl ConnectionOptionExtracted {
200    pub(super) fn ensure_only_valid_options(
201        &self,
202        t: CreateConnectionType,
203    ) -> Result<(), PlanError> {
204        validate_options_per_connection_type(t, self.seen.clone())
205    }
206
207    pub fn try_into_connection_details(
208        self,
209        scx: &StatementContext,
210        connection_type: CreateConnectionType,
211    ) -> Result<ConnectionDetails, PlanError> {
212        self.ensure_only_valid_options(connection_type)?;
213
214        let connection: ConnectionDetails = match connection_type {
215            CreateConnectionType::Aws => {
216                let credentials = match (
217                    self.access_key_id,
218                    self.secret_access_key,
219                    self.session_token,
220                ) {
221                    (Some(access_key_id), Some(secret_access_key), session_token) => {
222                        Some(AwsCredentials {
223                            access_key_id,
224                            secret_access_key: secret_access_key.into(),
225                            session_token,
226                        })
227                    }
228                    (None, None, None) => None,
229                    _ => {
230                        sql_bail!(
231                            "must specify both ACCESS KEY ID and SECRET ACCESS KEY with optional SESSION TOKEN"
232                        );
233                    }
234                };
235
236                let assume_role = match (self.assume_role_arn, self.assume_role_session_name) {
237                    (Some(arn), session_name) => Some(AwsAssumeRole { arn, session_name }),
238                    (None, Some(_)) => {
239                        sql_bail!(
240                            "must specify ASSUME ROLE ARN with optional ASSUME ROLE SESSION NAME"
241                        );
242                    }
243                    _ => None,
244                };
245
246                let auth = match (credentials, assume_role) {
247                    (None, None) => sql_bail!(
248                        "must specify either ASSUME ROLE ARN or ACCESS KEY ID and SECRET ACCESS KEY"
249                    ),
250                    (Some(credentials), None) => AwsAuth::Credentials(credentials),
251                    (None, Some(assume_role)) => AwsAuth::AssumeRole(assume_role),
252                    (Some(_), Some(_)) => {
253                        sql_bail!("cannot specify both ACCESS KEY ID and ASSUME ROLE ARN");
254                    }
255                };
256
257                ConnectionDetails::Aws(AwsConnection {
258                    auth,
259                    endpoint: match self.endpoint {
260                        // TODO(benesch): this should not treat an empty endpoint as equivalent to a `NULL`
261                        // endpoint, but making that change now would break testdrive. AWS connections are
262                        // all behind feature flags mode right now, so no particular urgency to correct
263                        // this.
264                        Some(endpoint) if !endpoint.is_empty() => Some(endpoint),
265                        _ => None,
266                    },
267                    region: self.region,
268                })
269            }
270            CreateConnectionType::AwsPrivatelink => {
271                let connection = AwsPrivatelinkConnection {
272                    service_name: self
273                        .service_name
274                        .ok_or_else(|| sql_err!("SERVICE NAME option is required"))?,
275                    availability_zones: self
276                        .availability_zones
277                        .ok_or_else(|| sql_err!("AVAILABILITY ZONES option is required"))?,
278                };
279                if let Some(supported_azs) = scx.catalog.aws_privatelink_availability_zones() {
280                    let mut unique_azs: BTreeSet<String> = BTreeSet::new();
281                    let mut duplicate_azs: BTreeSet<String> = BTreeSet::new();
282                    // Validate each AZ is supported
283                    for connection_az in &connection.availability_zones {
284                        if unique_azs.contains(connection_az) {
285                            duplicate_azs.insert(connection_az.to_string());
286                        } else {
287                            unique_azs.insert(connection_az.to_string());
288                        }
289                        if !supported_azs.contains(connection_az) {
290                            return Err(PlanError::InvalidPrivatelinkAvailabilityZone {
291                                name: connection_az.to_string(),
292                                supported_azs,
293                            });
294                        }
295                    }
296                    if duplicate_azs.len() > 0 {
297                        return Err(PlanError::DuplicatePrivatelinkAvailabilityZone {
298                            duplicate_azs,
299                        });
300                    }
301                }
302                ConnectionDetails::AwsPrivatelink(connection)
303            }
304            CreateConnectionType::Kafka => {
305                let (tls, sasl) = plan_kafka_security(scx, &self)?;
306
307                ConnectionDetails::Kafka(KafkaConnection {
308                    brokers: self.get_brokers(scx)?,
309                    default_tunnel: scx
310                        .build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?,
311                    progress_topic: self.progress_topic,
312                    progress_topic_options: KafkaTopicOptions {
313                        // We only allow configuring the progress topic replication factor for now.
314                        // For correctness, the partition count MUST be one and for performance the compaction
315                        // policy MUST be enabled.
316                        partition_count: Some(NonNeg::try_from(1).expect("1 is positive")),
317                        replication_factor: self.progress_topic_replication_factor.map(|val| {
318                            if val <= 0 {
319                                Err(sql_err!("invalid CONNECTION: PROGRESS TOPIC REPLICATION FACTOR must be greater than 0"))?
320                            }
321                            NonNeg::try_from(val).map_err(|e| sql_err!("{e}"))
322                        }).transpose()?,
323                        topic_config: btreemap! {
324                            "cleanup.policy".to_string() => "compact".to_string(),
325                        },
326                    },
327                    options: BTreeMap::new(),
328                    tls,
329                    sasl,
330                })
331            }
332            CreateConnectionType::Csr => {
333                let url: reqwest::Url = match self.url {
334                    Some(url) => url
335                        .parse()
336                        .map_err(|e| sql_err!("parsing schema registry url: {e}"))?,
337                    None => sql_bail!("invalid CONNECTION: must specify URL"),
338                };
339                let _ = url
340                    .host_str()
341                    .ok_or_else(|| sql_err!("invalid CONNECTION: URL must specify domain name"))?;
342                if url.path() != "/" {
343                    sql_bail!("invalid CONNECTION: URL must have an empty path");
344                }
345                let cert = self.ssl_certificate;
346                let key = self.ssl_key.map(|secret| secret.into());
347                let tls_identity = match (cert, key) {
348                    (None, None) => None,
349                    (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
350                    _ => sql_bail!(
351                        "invalid CONNECTION: reading from SSL-auth Confluent Schema Registry requires both SSL KEY and SSL CERTIFICATE"
352                    ),
353                };
354                let http_auth = self.user.map(|username| CsrConnectionHttpAuth {
355                    username,
356                    password: self.password.map(|secret| secret.into()),
357                });
358
359                // TODO we should move to self.port being unsupported if aws_privatelink is some, see <https://github.com/MaterializeInc/database-issues/issues/7359#issuecomment-1925443977>
360                if let Some(privatelink) = self.aws_privatelink.as_ref() {
361                    if privatelink.port.is_some() {
362                        sql_bail!(
363                            "invalid CONNECTION: PORT in AWS PRIVATELINK is only supported for kafka"
364                        )
365                    }
366                }
367                let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?;
368
369                ConnectionDetails::Csr(CsrConnection {
370                    url,
371                    tls_root_cert: self.ssl_certificate_authority,
372                    tls_identity,
373                    http_auth,
374                    tunnel,
375                })
376            }
377            CreateConnectionType::Postgres | CreateConnectionType::Yugabyte => {
378                if matches!(connection_type, CreateConnectionType::Yugabyte) {
379                    scx.require_feature_flag(&vars::ENABLE_YUGABYTE_CONNECTION)?;
380                }
381
382                let cert = self.ssl_certificate;
383                let key = self.ssl_key.map(|secret| secret.into());
384                let tls_identity = match (cert, key) {
385                    (None, None) => None,
386                    (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
387                    _ => sql_bail!(
388                        "invalid CONNECTION: both SSL KEY and SSL CERTIFICATE are required"
389                    ),
390                };
391                let tls_mode = match self.ssl_mode.as_ref().map(|m| m.as_str()) {
392                    None | Some("disable") => tokio_postgres::config::SslMode::Disable,
393                    // "prefer" intentionally omitted because it has dubious security
394                    // properties.
395                    Some("require") | Some("required") => tokio_postgres::config::SslMode::Require,
396                    Some("verify_ca") | Some("verify-ca") => {
397                        tokio_postgres::config::SslMode::VerifyCa
398                    }
399                    Some("verify_full") | Some("verify-full") => {
400                        tokio_postgres::config::SslMode::VerifyFull
401                    }
402                    Some(m) => sql_bail!("invalid CONNECTION: unknown SSL MODE {}", m.quoted()),
403                };
404
405                // TODO we should move to self.port being unsupported if aws_privatelink is some, see <https://github.com/MaterializeInc/database-issues/issues/7359#issuecomment-1925443977>
406                if let Some(privatelink) = self.aws_privatelink.as_ref() {
407                    if privatelink.port.is_some() {
408                        sql_bail!(
409                            "invalid CONNECTION: PORT in AWS PRIVATELINK is only supported for kafka"
410                        )
411                    }
412                }
413                let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?;
414
415                ConnectionDetails::Postgres(PostgresConnection {
416                    database: self
417                        .database
418                        .ok_or_else(|| sql_err!("DATABASE option is required"))?,
419                    password: self.password.map(|password| password.into()),
420                    host: self
421                        .host
422                        .ok_or_else(|| sql_err!("HOST option is required"))?,
423                    port: self.port.unwrap_or(5432_u16),
424                    tunnel,
425                    tls_mode,
426                    tls_root_cert: self.ssl_certificate_authority,
427                    tls_identity,
428                    user: self
429                        .user
430                        .ok_or_else(|| sql_err!("USER option is required"))?,
431                    flavor: match connection_type {
432                        CreateConnectionType::Postgres => PostgresFlavor::Vanilla,
433                        CreateConnectionType::Yugabyte => PostgresFlavor::Yugabyte,
434                        _ => unreachable!(),
435                    },
436                })
437            }
438            CreateConnectionType::Ssh => {
439                let ensure_key = |public_key| match public_key {
440                    Some(public_key) => Ok::<_, anyhow::Error>(SshKey::PublicOnly(public_key)),
441                    None => {
442                        let key = SshKeyPair::new().context("creating SSH key")?;
443                        Ok(SshKey::Both(key))
444                    }
445                };
446                ConnectionDetails::Ssh {
447                    connection: SshConnection {
448                        host: self
449                            .host
450                            .ok_or_else(|| sql_err!("HOST option is required"))?,
451                        port: self.port.unwrap_or(22_u16),
452                        user: match self
453                            .user
454                            .ok_or_else(|| sql_err!("USER option is required"))?
455                        {
456                            StringOrSecret::String(user) => user,
457                            StringOrSecret::Secret(_) => {
458                                sql_bail!(
459                                    "SSH connections do not support supplying USER value as SECRET"
460                                )
461                            }
462                        },
463                    },
464                    key_1: ensure_key(self.public_key1)?,
465                    key_2: ensure_key(self.public_key2)?,
466                }
467            }
468            CreateConnectionType::MySql => {
469                let aws_connection = get_aws_connection_reference(scx, &self)?;
470                if aws_connection.is_some() && self.password.is_some() {
471                    sql_bail!(
472                        "invalid CONNECTION: AWS IAM authentication is not supported with password"
473                    );
474                }
475
476                let cert = self.ssl_certificate;
477                let key = self.ssl_key.map(|secret| secret.into());
478                let tls_identity = match (cert, key) {
479                    (None, None) => None,
480                    (Some(cert), Some(key)) => Some(TlsIdentity { cert, key }),
481                    _ => sql_bail!(
482                        "invalid CONNECTION: both SSL KEY and SSL CERTIFICATE are required"
483                    ),
484                };
485                // Accepts the same SSL Mode values as the MySQL Client
486                // https://dev.mysql.com/doc/refman/8.0/en/connection-options.html#option_general_ssl-mode
487                let tls_mode = match self
488                    .ssl_mode
489                    .map(|f| f.to_uppercase())
490                    .as_ref()
491                    .map(|m| m.as_str())
492                {
493                    None | Some("DISABLED") => {
494                        if aws_connection.is_some() {
495                            sql_bail!(
496                                "invalid CONNECTION: AWS IAM authentication requires SSL to be enabled"
497                            )
498                        }
499                        MySqlSslMode::Disabled
500                    }
501                    // "preferred" intentionally omitted because it has dubious security
502                    // properties.
503                    Some("REQUIRED") | Some("REQUIRE") => MySqlSslMode::Required,
504                    Some("VERIFY_CA") | Some("VERIFY-CA") => MySqlSslMode::VerifyCa,
505                    Some("VERIFY_IDENTITY") | Some("VERIFY-IDENTITY") => {
506                        MySqlSslMode::VerifyIdentity
507                    }
508                    Some(m) => sql_bail!("invalid CONNECTION: unknown SSL MODE {}", m.quoted()),
509                };
510
511                // TODO we should move to self.port being unsupported if aws_privatelink is some, see <https://github.com/MaterializeInc/database-issues/issues/7359#issuecomment-1925443977>
512                if let Some(privatelink) = self.aws_privatelink.as_ref() {
513                    if privatelink.port.is_some() {
514                        sql_bail!(
515                            "invalid CONNECTION: PORT in AWS PRIVATELINK is only supported for kafka"
516                        )
517                    }
518                }
519                let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?;
520
521                ConnectionDetails::MySql(MySqlConnection {
522                    password: self.password.map(|password| password.into()),
523                    host: self
524                        .host
525                        .ok_or_else(|| sql_err!("HOST option is required"))?,
526                    port: self.port.unwrap_or(3306_u16),
527                    tunnel,
528                    tls_mode,
529                    tls_root_cert: self.ssl_certificate_authority,
530                    tls_identity,
531                    user: self
532                        .user
533                        .ok_or_else(|| sql_err!("USER option is required"))?,
534                    aws_connection,
535                })
536            }
537            CreateConnectionType::SqlServer => {
538                scx.require_feature_flag(&vars::ENABLE_SQL_SERVER_SOURCE)?;
539
540                let aws_connection = get_aws_connection_reference(scx, &self)?;
541                // TODO(sql_server2): Support AWS connections for SQL Server. Nothing fundamental
542                // prevents this, just need to wire it up.
543                if aws_connection.is_some() {
544                    return Err(PlanError::Unsupported {
545                        feature: "AWS CONNECTION with SQL Server".to_string(),
546                        discussion_no: None,
547                    });
548                }
549
550                // TODO(sql_server2): Parse the encryption level from the create SQL.
551                let encryption = mz_sql_server_util::config::EncryptionLevel::Preferred;
552
553                // 1433 is the default port for SQL Server instances running over TCP.
554                //
555                // See: <https://learn.microsoft.com/en-us/sql/database-engine/configure-windows/configure-a-server-to-listen-on-a-specific-tcp-port?view=sql-server-ver16>
556                let port = self.port.unwrap_or(1433_u16);
557                let tunnel = scx.build_tunnel_definition(self.ssh_tunnel, self.aws_privatelink)?;
558
559                ConnectionDetails::SqlServer(SqlServerConnectionDetails {
560                    host: self
561                        .host
562                        .ok_or_else(|| sql_err!("HOST option is required"))?,
563                    port,
564                    database: self
565                        .database
566                        .ok_or_else(|| sql_err!("DATABASE option is required"))?,
567                    user: self
568                        .user
569                        .ok_or_else(|| sql_err!("USER option is required"))?,
570                    password: self
571                        .password
572                        .ok_or_else(|| sql_err!("PASSWORD option is required"))
573                        .map(|pass| pass.into())?,
574                    tunnel,
575                    encryption,
576                })
577            }
578        };
579
580        Ok(connection)
581    }
582
583    pub fn get_brokers(
584        &self,
585        scx: &StatementContext,
586    ) -> Result<Vec<mz_storage_types::connections::KafkaBroker<ReferencedConnection>>, PlanError>
587    {
588        let mut brokers = match (&self.broker, &self.brokers, &self.aws_privatelink) {
589            (Some(v), None, None) => v.to_vec(),
590            (None, Some(v), None) => v.to_vec(),
591            (None, None, Some(_)) => vec![],
592            (None, None, None) => {
593                sql_bail!("invalid CONNECTION: must set one of BROKER, BROKERS, or AWS PRIVATELINK")
594            }
595            _ => sql_bail!(
596                "invalid CONNECTION: can only set one of BROKER, BROKERS, or AWS PRIVATELINK"
597            ),
598        };
599
600        // NOTE: we allow broker configurations to be mixed and matched. If/when we support
601        // a top-level `SSH TUNNEL` configuration, we will need additional assertions.
602
603        let mut out = vec![];
604        for broker in &mut brokers {
605            if broker.address.contains(',') {
606                sql_bail!("invalid CONNECTION: cannot specify multiple Kafka broker addresses in one string.\n\n
607Instead, specify BROKERS using multiple strings, e.g. BROKERS ('kafka:9092', 'kafka:9093')");
608            }
609
610            let tunnel = match &broker.tunnel {
611                KafkaBrokerTunnel::Direct => Tunnel::Direct,
612                KafkaBrokerTunnel::AwsPrivatelink(aws_privatelink) => {
613                    let KafkaBrokerAwsPrivatelinkOptionExtracted {
614                        availability_zone,
615                        port,
616                        seen: _,
617                    } = KafkaBrokerAwsPrivatelinkOptionExtracted::try_from(
618                        aws_privatelink.options.clone(),
619                    )?;
620
621                    let id = match &aws_privatelink.connection {
622                        ResolvedItemName::Item { id, .. } => id,
623                        _ => sql_bail!(
624                            "internal error: Kafka PrivateLink connection was not resolved"
625                        ),
626                    };
627                    let entry = scx.catalog.get_item(id);
628                    match entry.connection()? {
629                        Connection::AwsPrivatelink(connection) => {
630                            if let Some(az) = &availability_zone {
631                                if !connection.availability_zones.contains(az) {
632                                    sql_bail!(
633                                        "AWS PrivateLink availability zone {} does not match any of the \
634                                      availability zones on the AWS PrivateLink connection {}",
635                                        az.quoted(),
636                                        scx.catalog
637                                            .resolve_full_name(entry.name())
638                                            .to_string()
639                                            .quoted()
640                                    )
641                                }
642                            }
643                            Tunnel::AwsPrivatelink(AwsPrivatelink {
644                                connection_id: *id,
645                                availability_zone,
646                                port,
647                            })
648                        }
649                        _ => {
650                            sql_bail!("{} is not an AWS PRIVATELINK connection", entry.name().item)
651                        }
652                    }
653                }
654                KafkaBrokerTunnel::SshTunnel(ssh) => {
655                    let id = match &ssh {
656                        ResolvedItemName::Item { id, .. } => id,
657                        _ => sql_bail!(
658                            "internal error: Kafka SSH tunnel connection was not resolved"
659                        ),
660                    };
661                    let ssh_tunnel = scx.catalog.get_item(id);
662                    match ssh_tunnel.connection()? {
663                        Connection::Ssh(_connection) => Tunnel::Ssh(SshTunnel {
664                            connection_id: *id,
665                            connection: *id,
666                        }),
667                        _ => {
668                            sql_bail!("{} is not an SSH connection", ssh_tunnel.name().item)
669                        }
670                    }
671                }
672            };
673
674            out.push(mz_storage_types::connections::KafkaBroker {
675                address: broker.address.clone(),
676                tunnel,
677            });
678        }
679
680        Ok(out)
681    }
682}
683
684fn get_aws_connection_reference(
685    scx: &StatementContext,
686    conn_options: &ConnectionOptionExtracted,
687) -> Result<Option<AwsConnectionReference<ReferencedConnection>>, PlanError> {
688    let Some(aws_connection_id) = conn_options.aws_connection else {
689        return Ok(None);
690    };
691
692    let id = CatalogItemId::from(aws_connection_id);
693    let item = scx.catalog.get_item(&id);
694    Ok(match item.connection()? {
695        Connection::Aws(_) => Some(AwsConnectionReference {
696            connection_id: id,
697            connection: id,
698        }),
699        _ => sql_bail!("{} is not an AWS connection", item.name().item),
700    })
701}
702
703fn plan_kafka_security(
704    scx: &StatementContext,
705    v: &ConnectionOptionExtracted,
706) -> Result<
707    (
708        Option<KafkaTlsConfig>,
709        Option<KafkaSaslConfig<ReferencedConnection>>,
710    ),
711    PlanError,
712> {
713    const SASL_CONFIGS: [ConnectionOptionName; 4] = [
714        ConnectionOptionName::AwsConnection,
715        ConnectionOptionName::SaslMechanisms,
716        ConnectionOptionName::SaslUsername,
717        ConnectionOptionName::SaslPassword,
718    ];
719
720    const ALL_CONFIGS: [ConnectionOptionName; 7] = concat_arrays!(
721        [
722            ConnectionOptionName::SslKey,
723            ConnectionOptionName::SslCertificate,
724            ConnectionOptionName::SslCertificateAuthority,
725        ],
726        SASL_CONFIGS
727    );
728
729    enum SecurityProtocol {
730        Plaintext,
731        Ssl,
732        SaslPlaintext,
733        SaslSsl,
734    }
735
736    let security_protocol = v.security_protocol.as_ref().map(|v| v.to_uppercase());
737    let security_protocol = match security_protocol.as_deref() {
738        Some("PLAINTEXT") => SecurityProtocol::Plaintext,
739        Some("SSL") => SecurityProtocol::Ssl,
740        Some("SASL_PLAINTEXT") => SecurityProtocol::SaslPlaintext,
741        Some("SASL_SSL") => SecurityProtocol::SaslSsl,
742        Some(p) => sql_bail!("unknown security protocol: {}", p),
743        // To be secure by default, if no security protocol is explicitly
744        // specified, we always choose one of the SSL-enabled protocols, using
745        // the presence of any SASL options to guide us between them. Users must
746        // explicitly choose a plaintext mechanism if that's what they want.
747        None if SASL_CONFIGS.iter().any(|c| v.seen.contains(c)) => SecurityProtocol::SaslSsl,
748        None => SecurityProtocol::Ssl,
749    };
750
751    let mut outstanding = ALL_CONFIGS
752        .into_iter()
753        .filter(|c| v.seen.contains(c))
754        .collect::<BTreeSet<ConnectionOptionName>>();
755
756    let tls = match security_protocol {
757        SecurityProtocol::Ssl | SecurityProtocol::SaslSsl => {
758            outstanding.remove(&ConnectionOptionName::SslCertificate);
759            let identity = match &v.ssl_certificate {
760                None => None,
761                Some(cert) => {
762                    outstanding.remove(&ConnectionOptionName::SslKey);
763                    let Some(key) = &v.ssl_key else {
764                        sql_bail!("SSL KEY must be specified with SSL CERTIFICATE");
765                    };
766                    Some(TlsIdentity {
767                        cert: cert.clone(),
768                        key: (*key).into(),
769                    })
770                }
771            };
772            outstanding.remove(&ConnectionOptionName::SslCertificateAuthority);
773            Some(KafkaTlsConfig {
774                identity,
775                root_cert: v.ssl_certificate_authority.clone(),
776            })
777        }
778        _ => None,
779    };
780
781    let sasl = match security_protocol {
782        SecurityProtocol::SaslPlaintext | SecurityProtocol::SaslSsl => {
783            outstanding.remove(&ConnectionOptionName::AwsConnection);
784            match get_aws_connection_reference(scx, v)? {
785                Some(aws) => {
786                    scx.require_feature_flag(&ENABLE_AWS_MSK_IAM_AUTH)?;
787                    Some(KafkaSaslConfig {
788                        mechanism: "OAUTHBEARER".into(),
789                        username: "".into(),
790                        password: None,
791                        aws: Some(aws),
792                    })
793                }
794                None => {
795                    outstanding.remove(&ConnectionOptionName::SaslMechanisms);
796                    outstanding.remove(&ConnectionOptionName::SaslUsername);
797                    outstanding.remove(&ConnectionOptionName::SaslPassword);
798                    // TODO(benesch): support a less confusing `SASL MECHANISM`
799                    // alias, as only a single mechanism that can be specified.
800                    let Some(mechanism) = &v.sasl_mechanisms else {
801                        sql_bail!("SASL MECHANISMS must be specified");
802                    };
803                    let Some(username) = &v.sasl_username else {
804                        sql_bail!("SASL USERNAME must be specified");
805                    };
806                    let Some(password) = &v.sasl_password else {
807                        sql_bail!("SASL PASSWORD must be specified");
808                    };
809                    Some(KafkaSaslConfig {
810                        // librdkafka requires SASL mechanisms to be upper case (PLAIN,
811                        // SCRAM-SHA-256). For usability, we automatically uppercase the
812                        // mechanism that user provides. This avoids a frustrating
813                        // interaction with identifier case folding. Consider `SASL
814                        // MECHANISMS = PLAIN`. Identifier case folding results in a
815                        // SASL mechanism of `plain` (note the lowercase), which
816                        // Materialize previously rejected with an error of "SASL
817                        // mechanism must be uppercase." This was deeply frustarting for
818                        // users who were not familiar with identifier case folding
819                        // rules. See database-issues#6693.
820                        mechanism: mechanism.to_uppercase(),
821                        username: username.clone(),
822                        password: Some((*password).into()),
823                        aws: None,
824                    })
825                }
826            }
827        }
828        _ => None,
829    };
830
831    if let Some(outstanding) = outstanding.first() {
832        sql_bail!("option {outstanding} not supported with this configuration");
833    }
834
835    Ok((tls, sasl))
836}