Skip to main content

mz_sql/plan/statement/
ddl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data definition language (DDL).
11//!
12//! This module houses the handlers for statements that modify the catalog, like
13//! `ALTER`, `CREATE`, and `DROP`.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::num::NonZeroU32;
19use std::time::Duration;
20
21use itertools::Itertools;
22use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
23use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
24use mz_arrow_util::builder::ArrowBuilder;
25use mz_auth::password::Password;
26use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
27use mz_expr::{CollectionPlan, UnmaterializableFunc};
28use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
29use mz_ore::cast::{CastFrom, TryCastFrom};
30use mz_ore::collections::{CollectionExt, HashSet};
31use mz_ore::num::NonNeg;
32use mz_ore::soft_panic_or_log;
33use mz_ore::str::StrExt;
34use mz_proto::RustType;
35use mz_repr::adt::interval::Interval;
36use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
37use mz_repr::network_policy_id::NetworkPolicyId;
38use mz_repr::optimize::OptimizerFeatureOverrides;
39use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
40use mz_repr::role_id::RoleId;
41use mz_repr::{
42    CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
43    SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
44    preserves_order, strconv,
45};
46use mz_sql_parser::ast::{
47    self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
48    AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
49    AlterMaterializedViewApplyReplacementStatement, AlterNetworkPolicyStatement,
50    AlterObjectRenameStatement, AlterObjectSwapStatement, AlterRetainHistoryStatement,
51    AlterRoleOption, AlterRoleStatement, AlterSecretStatement, AlterSetClusterStatement,
52    AlterSinkAction, AlterSinkStatement, AlterSourceAction, AlterSourceAddSubsourceOption,
53    AlterSourceAddSubsourceOptionName, AlterSourceStatement, AlterSystemResetAllStatement,
54    AlterSystemResetStatement, AlterSystemSetStatement, AlterTableAddColumnStatement, AvroSchema,
55    AvroSchemaOption, AvroSchemaOptionName, ClusterAlterOption, ClusterAlterOptionName,
56    ClusterAlterOptionValue, ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName,
57    ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName,
58    ClusterScheduleOptionValue, ColumnDef, ColumnOption, CommentObjectType, CommentStatement,
59    ConnectionOption, ConnectionOptionName, CreateClusterReplicaStatement, CreateClusterStatement,
60    CreateConnectionOption, CreateConnectionOptionName, CreateConnectionStatement,
61    CreateConnectionType, CreateDatabaseStatement, CreateIndexStatement,
62    CreateMaterializedViewStatement, CreateNetworkPolicyStatement, CreateRoleStatement,
63    CreateSchemaStatement, CreateSecretStatement, CreateSinkConnection, CreateSinkOption,
64    CreateSinkOptionName, CreateSinkStatement, CreateSourceConnection, CreateSourceOption,
65    CreateSourceOptionName, CreateSourceStatement, CreateSubsourceOption,
66    CreateSubsourceOptionName, CreateSubsourceStatement, CreateTableFromSourceStatement,
67    CreateTableStatement, CreateTypeAs, CreateTypeListOption, CreateTypeListOptionName,
68    CreateTypeMapOption, CreateTypeMapOptionName, CreateTypeStatement, CreateViewStatement,
69    CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
70    CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DeferredItemName,
71    DocOnIdentifier, DocOnSchema, DropObjectsStatement, DropOwnedStatement, Expr, Format,
72    FormatSpecifier, IcebergSinkConfigOption, Ident, IfExistsBehavior, IndexOption,
73    IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
74    LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
75    MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
76    NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
77    PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
78    RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
79    ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
80    SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
81    TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
82    TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
83    UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
84};
85use mz_sql_parser::ident;
86use mz_sql_parser::parser::StatementParseResult;
87use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
88use mz_storage_types::connections::{Connection, KafkaTopicOptions};
89use mz_storage_types::sinks::{
90    IcebergSinkConnection, KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType,
91    SinkEnvelope, StorageSinkConnection, iceberg_type_overrides,
92};
93use mz_storage_types::sources::encoding::{
94    AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
95    SourceDataEncoding, included_column_desc,
96};
97use mz_storage_types::sources::envelope::{
98    KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
99};
100use mz_storage_types::sources::kafka::{
101    KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
102};
103use mz_storage_types::sources::load_generator::{
104    KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
105    LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
106};
107use mz_storage_types::sources::mysql::{
108    MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
109};
110use mz_storage_types::sources::postgres::{
111    PostgresSourceConnection, PostgresSourcePublicationDetails,
112    ProtoPostgresSourcePublicationDetails,
113};
114use mz_storage_types::sources::sql_server::{
115    ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
116};
117use mz_storage_types::sources::{
118    GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
119    ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
120    SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
121    SqlServerSourceExtras, Timeline,
122};
123use prost::Message;
124
125use crate::ast::display::AstDisplay;
126use crate::catalog::{
127    CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
128    CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
129};
130use crate::iceberg::IcebergSinkConfigOptionExtracted;
131use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
132use crate::names::{
133    Aug, CommentObjectId, DatabaseId, DependencyIds, ObjectId, PartialItemName, QualifiedItemName,
134    ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
135    ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
136};
137use crate::normalize::{self, ident};
138use crate::plan::error::PlanError;
139use crate::plan::query::{
140    ExprContext, QueryLifetime, plan_expr, scalar_type_from_catalog, scalar_type_from_sql,
141};
142use crate::plan::scope::Scope;
143use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
144use crate::plan::statement::{StatementContext, StatementDesc, scl};
145use crate::plan::typeconv::CastContext;
146use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
147use crate::plan::{
148    AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
149    AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
150    AlterMaterializedViewApplyReplacementPlan, AlterNetworkPolicyPlan, AlterNoopPlan,
151    AlterOptionParameter, AlterRetainHistoryPlan, AlterRolePlan, AlterSchemaRenamePlan,
152    AlterSchemaSwapPlan, AlterSecretPlan, AlterSetClusterPlan, AlterSinkPlan,
153    AlterSourceTimestampIntervalPlan, AlterSystemResetAllPlan, AlterSystemResetPlan,
154    AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig,
155    ComputeReplicaIntrospectionConfig, ConnectionDetails, CreateClusterManagedPlan,
156    CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant,
157    CreateConnectionPlan, CreateDatabasePlan, CreateIndexPlan, CreateMaterializedViewPlan,
158    CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan,
159    CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc,
160    DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index, MaterializedView, NetworkPolicyRule,
161    NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan, PlanClusterOption, PlanNotice,
162    PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink, Source, Table, TableDataSource, Type,
163    VariableValue, View, WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders,
164    WebhookValidation, literal, plan_utils, query, transform_ast,
165};
166use crate::session::vars::{
167    self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
168    ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
169    ENABLE_REPLICA_TARGETED_MATERIALIZED_VIEWS,
170};
171use crate::{names, parse};
172
173mod connection;
174
175// TODO: Figure out what the maximum number of columns we can actually support is, and set that.
176//
177// The real max is probably higher than this, but it's easier to relax a constraint than make it
178// more strict.
179const MAX_NUM_COLUMNS: usize = 256;
180
181static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
182    std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
183
184/// Given a relation desc and a column list, checks that:
185/// - the column list is a prefix of the desc;
186/// - all the listed columns are types that have meaningful Persist-level ordering.
187fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
188    if partition_by.len() > desc.len() {
189        tracing::error!(
190            "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
191            desc.len(),
192            partition_by.len()
193        );
194        partition_by.truncate(desc.len());
195    }
196
197    let desc_prefix = desc.iter().take(partition_by.len());
198    for (idx, ((desc_name, desc_type), partition_name)) in
199        desc_prefix.zip_eq(partition_by).enumerate()
200    {
201        let partition_name = normalize::column_name(partition_name);
202        if *desc_name != partition_name {
203            sql_bail!(
204                "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
205            );
206        }
207        if !preserves_order(&desc_type.scalar_type) {
208            sql_bail!("PARTITION BY column {partition_name} has unsupported type");
209        }
210    }
211    Ok(())
212}
213
214pub fn describe_create_database(
215    _: &StatementContext,
216    _: CreateDatabaseStatement,
217) -> Result<StatementDesc, PlanError> {
218    Ok(StatementDesc::new(None))
219}
220
221pub fn plan_create_database(
222    _: &StatementContext,
223    CreateDatabaseStatement {
224        name,
225        if_not_exists,
226    }: CreateDatabaseStatement,
227) -> Result<Plan, PlanError> {
228    Ok(Plan::CreateDatabase(CreateDatabasePlan {
229        name: normalize::ident(name.0),
230        if_not_exists,
231    }))
232}
233
234pub fn describe_create_schema(
235    _: &StatementContext,
236    _: CreateSchemaStatement,
237) -> Result<StatementDesc, PlanError> {
238    Ok(StatementDesc::new(None))
239}
240
241pub fn plan_create_schema(
242    scx: &StatementContext,
243    CreateSchemaStatement {
244        mut name,
245        if_not_exists,
246    }: CreateSchemaStatement,
247) -> Result<Plan, PlanError> {
248    if name.0.len() > 2 {
249        sql_bail!("schema name {} has more than two components", name);
250    }
251    let schema_name = normalize::ident(
252        name.0
253            .pop()
254            .expect("names always have at least one component"),
255    );
256    let database_spec = match name.0.pop() {
257        None => match scx.catalog.active_database() {
258            Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
259            None => sql_bail!("no database specified and no active database"),
260        },
261        Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
262            Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
263            Err(_) => sql_bail!("invalid database {}", n.as_str()),
264        },
265    };
266    Ok(Plan::CreateSchema(CreateSchemaPlan {
267        database_spec,
268        schema_name,
269        if_not_exists,
270    }))
271}
272
273pub fn describe_create_table(
274    _: &StatementContext,
275    _: CreateTableStatement<Aug>,
276) -> Result<StatementDesc, PlanError> {
277    Ok(StatementDesc::new(None))
278}
279
280pub fn plan_create_table(
281    scx: &StatementContext,
282    stmt: CreateTableStatement<Aug>,
283) -> Result<Plan, PlanError> {
284    let CreateTableStatement {
285        name,
286        columns,
287        constraints,
288        if_not_exists,
289        temporary,
290        with_options,
291    } = &stmt;
292
293    let names: Vec<_> = columns
294        .iter()
295        .filter(|c| {
296            // This set of `names` is used to create the initial RelationDesc.
297            // Columns that have been added at later versions of the table will
298            // get added further below.
299            let is_versioned = c
300                .options
301                .iter()
302                .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
303            !is_versioned
304        })
305        .map(|c| normalize::column_name(c.name.clone()))
306        .collect();
307
308    if let Some(dup) = names.iter().duplicates().next() {
309        sql_bail!("column {} specified more than once", dup.quoted());
310    }
311
312    // Build initial relation type that handles declared data types
313    // and NOT NULL constraints.
314    let mut column_types = Vec::with_capacity(columns.len());
315    let mut defaults = Vec::with_capacity(columns.len());
316    let mut changes = BTreeMap::new();
317    let mut keys = Vec::new();
318
319    for (i, c) in columns.into_iter().enumerate() {
320        let aug_data_type = &c.data_type;
321        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
322        let mut nullable = true;
323        let mut default = Expr::null();
324        let mut versioned = false;
325        for option in &c.options {
326            match &option.option {
327                ColumnOption::NotNull => nullable = false,
328                ColumnOption::Default(expr) => {
329                    // Ensure expression can be planned and yields the correct
330                    // type.
331                    let mut expr = expr.clone();
332                    transform_ast::transform(scx, &mut expr)?;
333                    let _ = query::plan_default_expr(scx, &expr, &ty)?;
334                    default = expr.clone();
335                }
336                ColumnOption::Unique { is_primary } => {
337                    keys.push(vec![i]);
338                    if *is_primary {
339                        nullable = false;
340                    }
341                }
342                ColumnOption::Versioned { action, version } => {
343                    let version = RelationVersion::from(*version);
344                    versioned = true;
345
346                    let name = normalize::column_name(c.name.clone());
347                    let typ = ty.clone().nullable(nullable);
348
349                    changes.insert(version, (action.clone(), name, typ));
350                }
351                other => {
352                    bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
353                }
354            }
355        }
356        // TODO(alter_table): This assumes all versioned columns are at the
357        // end. This will no longer be true when we support dropping columns.
358        if !versioned {
359            column_types.push(ty.nullable(nullable));
360        }
361        defaults.push(default);
362    }
363
364    let mut seen_primary = false;
365    'c: for constraint in constraints {
366        match constraint {
367            TableConstraint::Unique {
368                name: _,
369                columns,
370                is_primary,
371                nulls_not_distinct,
372            } => {
373                if seen_primary && *is_primary {
374                    sql_bail!(
375                        "multiple primary keys for table {} are not allowed",
376                        name.to_ast_string_stable()
377                    );
378                }
379                seen_primary = *is_primary || seen_primary;
380
381                let mut key = vec![];
382                for column in columns {
383                    let column = normalize::column_name(column.clone());
384                    match names.iter().position(|name| *name == column) {
385                        None => sql_bail!("unknown column in constraint: {}", column),
386                        Some(i) => {
387                            let nullable = &mut column_types[i].nullable;
388                            if *is_primary {
389                                if *nulls_not_distinct {
390                                    sql_bail!(
391                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
392                                    );
393                                }
394
395                                *nullable = false;
396                            } else if !(*nulls_not_distinct || !*nullable) {
397                                // Non-primary key unique constraints are only keys if all of their
398                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
399                                break 'c;
400                            }
401
402                            key.push(i);
403                        }
404                    }
405                }
406
407                if *is_primary {
408                    keys.insert(0, key);
409                } else {
410                    keys.push(key);
411                }
412            }
413            TableConstraint::ForeignKey { .. } => {
414                // Foreign key constraints are not presently enforced. We allow
415                // them with feature flags for sqllogictest's sake.
416                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
417            }
418            TableConstraint::Check { .. } => {
419                // Check constraints are not presently enforced. We allow them
420                // with feature flags for sqllogictest's sake.
421                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
422            }
423        }
424    }
425
426    if !keys.is_empty() {
427        // Unique constraints are not presently enforced. We allow them with feature flags for
428        // sqllogictest's sake.
429        scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
430    }
431
432    let typ = SqlRelationType::new(column_types).with_keys(keys);
433
434    let temporary = *temporary;
435    let name = if temporary {
436        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
437    } else {
438        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
439    };
440
441    // Check for an object in the catalog with this same name
442    let full_name = scx.catalog.resolve_full_name(&name);
443    let partial_name = PartialItemName::from(full_name.clone());
444    // For PostgreSQL compatibility, we need to prevent creating tables when
445    // there is an existing object *or* type of the same name.
446    if let (false, Ok(item)) = (
447        if_not_exists,
448        scx.catalog.resolve_item_or_type(&partial_name),
449    ) {
450        return Err(PlanError::ItemAlreadyExists {
451            name: full_name.to_string(),
452            item_type: item.item_type(),
453        });
454    }
455
456    let desc = RelationDesc::new(typ, names);
457    let mut desc = VersionedRelationDesc::new(desc);
458    for (version, (_action, name, typ)) in changes.into_iter() {
459        let new_version = desc.add_column(name, typ);
460        if version != new_version {
461            return Err(PlanError::InvalidTable {
462                name: full_name.item,
463            });
464        }
465    }
466
467    let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
468
469    // Table options should only consider the original columns, since those
470    // were the only ones in scope when the table was created.
471    //
472    // TODO(alter_table): Will need to reconsider this when we support ALTERing
473    // the PARTITION BY columns.
474    let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
475    let options = plan_table_options(scx, &original_desc, with_options.clone())?;
476
477    let compaction_window = options.iter().find_map(|o| {
478        #[allow(irrefutable_let_patterns)]
479        if let crate::plan::TableOption::RetainHistory(lcw) = o {
480            Some(lcw.clone())
481        } else {
482            None
483        }
484    });
485
486    let table = Table {
487        create_sql,
488        desc,
489        temporary,
490        compaction_window,
491        data_source: TableDataSource::TableWrites { defaults },
492    };
493    Ok(Plan::CreateTable(CreateTablePlan {
494        name,
495        table,
496        if_not_exists: *if_not_exists,
497    }))
498}
499
500pub fn describe_create_table_from_source(
501    _: &StatementContext,
502    _: CreateTableFromSourceStatement<Aug>,
503) -> Result<StatementDesc, PlanError> {
504    Ok(StatementDesc::new(None))
505}
506
507pub fn describe_create_webhook_source(
508    _: &StatementContext,
509    _: CreateWebhookSourceStatement<Aug>,
510) -> Result<StatementDesc, PlanError> {
511    Ok(StatementDesc::new(None))
512}
513
514pub fn describe_create_source(
515    _: &StatementContext,
516    _: CreateSourceStatement<Aug>,
517) -> Result<StatementDesc, PlanError> {
518    Ok(StatementDesc::new(None))
519}
520
521pub fn describe_create_subsource(
522    _: &StatementContext,
523    _: CreateSubsourceStatement<Aug>,
524) -> Result<StatementDesc, PlanError> {
525    Ok(StatementDesc::new(None))
526}
527
528generate_extracted_config!(
529    CreateSourceOption,
530    (TimestampInterval, Duration),
531    (RetainHistory, OptionalDuration)
532);
533
534generate_extracted_config!(
535    PgConfigOption,
536    (Details, String),
537    (Publication, String),
538    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
539    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
540);
541
542generate_extracted_config!(
543    MySqlConfigOption,
544    (Details, String),
545    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
546    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
547);
548
549generate_extracted_config!(
550    SqlServerConfigOption,
551    (Details, String),
552    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
553    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
554);
555
556pub fn plan_create_webhook_source(
557    scx: &StatementContext,
558    mut stmt: CreateWebhookSourceStatement<Aug>,
559) -> Result<Plan, PlanError> {
560    if stmt.is_table {
561        scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
562    }
563
564    // We will rewrite the cluster if one is not provided, so we must use the `in_cluster` value
565    // we plan to normalize when we canonicalize the create statement.
566    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
567    let enable_multi_replica_sources =
568        ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
569    if !enable_multi_replica_sources {
570        if in_cluster.replica_ids().len() > 1 {
571            sql_bail!("cannot create webhook source in cluster with more than one replica")
572        }
573    }
574    let create_sql =
575        normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
576
577    let CreateWebhookSourceStatement {
578        name,
579        if_not_exists,
580        body_format,
581        include_headers,
582        validate_using,
583        is_table,
584        // We resolved `in_cluster` above, so we want to ignore it here.
585        in_cluster: _,
586    } = stmt;
587
588    let validate_using = validate_using
589        .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
590        .transpose()?;
591    if let Some(WebhookValidation { expression, .. }) = &validate_using {
592        // If the validation expression doesn't reference any part of the request, then we should
593        // return an error because it's almost definitely wrong.
594        if !expression.contains_column() {
595            return Err(PlanError::WebhookValidationDoesNotUseColumns);
596        }
597        // Validation expressions cannot contain unmaterializable functions, except `now()`. We
598        // allow calls to `now()` because some webhook providers recommend rejecting requests that
599        // are older than a certain threshold.
600        if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
601            return Err(PlanError::WebhookValidationNonDeterministic);
602        }
603    }
604
605    let body_format = match body_format {
606        Format::Bytes => WebhookBodyFormat::Bytes,
607        Format::Json { array } => WebhookBodyFormat::Json { array },
608        Format::Text => WebhookBodyFormat::Text,
609        // TODO(parkmycar): Make an issue to support more types, or change this to NeverSupported.
610        ty => {
611            return Err(PlanError::Unsupported {
612                feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
613                discussion_no: None,
614            });
615        }
616    };
617
618    let mut column_ty = vec![
619        // Always include the body of the request as the first column.
620        SqlColumnType {
621            scalar_type: SqlScalarType::from(body_format),
622            nullable: false,
623        },
624    ];
625    let mut column_names = vec!["body".to_string()];
626
627    let mut headers = WebhookHeaders::default();
628
629    // Include a `headers` column, possibly filtered.
630    if let Some(filters) = include_headers.column {
631        column_ty.push(SqlColumnType {
632            scalar_type: SqlScalarType::Map {
633                value_type: Box::new(SqlScalarType::String),
634                custom_id: None,
635            },
636            nullable: false,
637        });
638        column_names.push("headers".to_string());
639
640        let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
641            filters.into_iter().partition_map(|filter| {
642                if filter.block {
643                    itertools::Either::Right(filter.header_name)
644                } else {
645                    itertools::Either::Left(filter.header_name)
646                }
647            });
648        headers.header_column = Some(WebhookHeaderFilters { allow, block });
649    }
650
651    // Map headers to specific columns.
652    for header in include_headers.mappings {
653        let scalar_type = header
654            .use_bytes
655            .then_some(SqlScalarType::Bytes)
656            .unwrap_or(SqlScalarType::String);
657        column_ty.push(SqlColumnType {
658            scalar_type,
659            nullable: true,
660        });
661        column_names.push(header.column_name.into_string());
662
663        let column_idx = column_ty.len() - 1;
664        // Double check we're consistent with column names.
665        assert_eq!(
666            column_idx,
667            column_names.len() - 1,
668            "header column names and types don't match"
669        );
670        headers
671            .mapped_headers
672            .insert(column_idx, (header.header_name, header.use_bytes));
673    }
674
675    // Validate our columns.
676    let mut unique_check = HashSet::with_capacity(column_names.len());
677    for name in &column_names {
678        if !unique_check.insert(name) {
679            return Err(PlanError::AmbiguousColumn(name.clone().into()));
680        }
681    }
682    if column_names.len() > MAX_NUM_COLUMNS {
683        return Err(PlanError::TooManyColumns {
684            max_num_columns: MAX_NUM_COLUMNS,
685            req_num_columns: column_names.len(),
686        });
687    }
688
689    let typ = SqlRelationType::new(column_ty);
690    let desc = RelationDesc::new(typ, column_names);
691
692    // Check for an object in the catalog with this same name
693    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
694    let full_name = scx.catalog.resolve_full_name(&name);
695    let partial_name = PartialItemName::from(full_name.clone());
696    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
697        return Err(PlanError::ItemAlreadyExists {
698            name: full_name.to_string(),
699            item_type: item.item_type(),
700        });
701    }
702
703    // Note(parkmycar): We don't currently support specifying a timeline for Webhook sources. As
704    // such, we always use a default of EpochMilliseconds.
705    let timeline = Timeline::EpochMilliseconds;
706
707    let plan = if is_table {
708        let data_source = DataSourceDesc::Webhook {
709            validate_using,
710            body_format,
711            headers,
712            cluster_id: Some(in_cluster.id()),
713        };
714        let data_source = TableDataSource::DataSource {
715            desc: data_source,
716            timeline,
717        };
718        Plan::CreateTable(CreateTablePlan {
719            name,
720            if_not_exists,
721            table: Table {
722                create_sql,
723                desc: VersionedRelationDesc::new(desc),
724                temporary: false,
725                compaction_window: None,
726                data_source,
727            },
728        })
729    } else {
730        let data_source = DataSourceDesc::Webhook {
731            validate_using,
732            body_format,
733            headers,
734            // Important: The cluster is set at the `Source` level.
735            cluster_id: None,
736        };
737        Plan::CreateSource(CreateSourcePlan {
738            name,
739            source: Source {
740                create_sql,
741                data_source,
742                desc,
743                compaction_window: None,
744            },
745            if_not_exists,
746            timeline,
747            in_cluster: Some(in_cluster.id()),
748        })
749    };
750
751    Ok(plan)
752}
753
754pub fn plan_create_source(
755    scx: &StatementContext,
756    mut stmt: CreateSourceStatement<Aug>,
757) -> Result<Plan, PlanError> {
758    let CreateSourceStatement {
759        name,
760        in_cluster: _,
761        col_names,
762        connection: source_connection,
763        envelope,
764        if_not_exists,
765        format,
766        key_constraint,
767        include_metadata,
768        with_options,
769        external_references: referenced_subsources,
770        progress_subsource,
771    } = &stmt;
772
773    mz_ore::soft_assert_or_log!(
774        referenced_subsources.is_none(),
775        "referenced subsources must be cleared in purification"
776    );
777
778    let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
779        && scx.catalog.system_vars().force_source_table_syntax();
780
781    // If the new source table syntax is forced all the options related to the primary
782    // source output should be un-set.
783    if force_source_table_syntax {
784        if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
785            Err(PlanError::UseTablesForSources(
786                "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
787            ))?;
788        }
789    }
790
791    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
792
793    if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
794        && include_metadata
795            .iter()
796            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
797    {
798        // TODO(guswynn): should this be `bail_unsupported!`?
799        sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
800    }
801    if !matches!(
802        source_connection,
803        CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
804    ) && !include_metadata.is_empty()
805    {
806        bail_unsupported!("INCLUDE metadata with non-Kafka sources");
807    }
808
809    if !include_metadata.is_empty()
810        && !matches!(
811            envelope,
812            ast::SourceEnvelope::Upsert { .. }
813                | ast::SourceEnvelope::None
814                | ast::SourceEnvelope::Debezium
815        )
816    {
817        sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
818    }
819
820    let external_connection =
821        plan_generic_source_connection(scx, source_connection, include_metadata)?;
822
823    let CreateSourceOptionExtracted {
824        timestamp_interval,
825        retain_history,
826        seen: _,
827    } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
828
829    let metadata_columns_desc = match external_connection {
830        GenericSourceConnection::Kafka(KafkaSourceConnection {
831            ref metadata_columns,
832            ..
833        }) => kafka_metadata_columns_desc(metadata_columns),
834        _ => vec![],
835    };
836
837    // Generate the relation description for the primary export of the source.
838    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
839        scx,
840        &envelope,
841        format,
842        Some(external_connection.default_key_desc()),
843        external_connection.default_value_desc(),
844        include_metadata,
845        metadata_columns_desc,
846        &external_connection,
847    )?;
848    plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
849
850    let names: Vec<_> = desc.iter_names().cloned().collect();
851    if let Some(dup) = names.iter().duplicates().next() {
852        sql_bail!("column {} specified more than once", dup.quoted());
853    }
854
855    // Apply user-specified key constraint
856    if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
857        // Don't remove this without addressing
858        // https://github.com/MaterializeInc/database-issues/issues/4371.
859        scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
860
861        let key_columns = columns
862            .into_iter()
863            .map(normalize::column_name)
864            .collect::<Vec<_>>();
865
866        let mut uniq = BTreeSet::new();
867        for col in key_columns.iter() {
868            if !uniq.insert(col) {
869                sql_bail!("Repeated column name in source key constraint: {}", col);
870            }
871        }
872
873        let key_indices = key_columns
874            .iter()
875            .map(|col| {
876                let name_idx = desc
877                    .get_by_name(col)
878                    .map(|(idx, _type)| idx)
879                    .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
880                if desc.get_unambiguous_name(name_idx).is_none() {
881                    sql_bail!("Ambiguous column in source key constraint: {}", col);
882                }
883                Ok(name_idx)
884            })
885            .collect::<Result<Vec<_>, _>>()?;
886
887        if !desc.typ().keys.is_empty() {
888            return Err(key_constraint_err(&desc, &key_columns));
889        } else {
890            desc = desc.with_key(key_indices);
891        }
892    }
893
894    let timestamp_interval = match timestamp_interval {
895        Some(duration) => {
896            // Only validate bounds for new statements (pcx is Some), not during
897            // catalog deserialization (pcx is None). Previously persisted sources
898            // may have intervals that no longer fall within the current bounds.
899            if scx.pcx.is_some() {
900                let min = scx.catalog.system_vars().min_timestamp_interval();
901                let max = scx.catalog.system_vars().max_timestamp_interval();
902                if duration < min || duration > max {
903                    return Err(PlanError::InvalidTimestampInterval {
904                        min,
905                        max,
906                        requested: duration,
907                    });
908                }
909            }
910            duration
911        }
912        None => scx.catalog.system_vars().default_timestamp_interval(),
913    };
914
915    let (desc, data_source) = match progress_subsource {
916        Some(name) => {
917            let DeferredItemName::Named(name) = name else {
918                sql_bail!("[internal error] progress subsource must be named during purification");
919            };
920            let ResolvedItemName::Item { id, .. } = name else {
921                sql_bail!("[internal error] invalid target id");
922            };
923
924            let details = match external_connection {
925                GenericSourceConnection::Kafka(ref c) => {
926                    SourceExportDetails::Kafka(KafkaSourceExportDetails {
927                        metadata_columns: c.metadata_columns.clone(),
928                    })
929                }
930                GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
931                    LoadGenerator::Auction
932                    | LoadGenerator::Marketing
933                    | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
934                    LoadGenerator::Counter { .. }
935                    | LoadGenerator::Clock
936                    | LoadGenerator::Datums
937                    | LoadGenerator::KeyValue(_) => {
938                        SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
939                            output: LoadGeneratorOutput::Default,
940                        })
941                    }
942                },
943                GenericSourceConnection::Postgres(_)
944                | GenericSourceConnection::MySql(_)
945                | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
946            };
947
948            let data_source = DataSourceDesc::OldSyntaxIngestion {
949                desc: SourceDesc {
950                    connection: external_connection,
951                    timestamp_interval,
952                },
953                progress_subsource: *id,
954                data_config: SourceExportDataConfig {
955                    encoding,
956                    envelope: envelope.clone(),
957                },
958                details,
959            };
960            (desc, data_source)
961        }
962        None => {
963            let desc = external_connection.timestamp_desc();
964            let data_source = DataSourceDesc::Ingestion(SourceDesc {
965                connection: external_connection,
966                timestamp_interval,
967            });
968            (desc, data_source)
969        }
970    };
971
972    let if_not_exists = *if_not_exists;
973    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
974
975    // Check for an object in the catalog with this same name
976    let full_name = scx.catalog.resolve_full_name(&name);
977    let partial_name = PartialItemName::from(full_name.clone());
978    // For PostgreSQL compatibility, we need to prevent creating sources when
979    // there is an existing object *or* type of the same name.
980    if let (false, Ok(item)) = (
981        if_not_exists,
982        scx.catalog.resolve_item_or_type(&partial_name),
983    ) {
984        return Err(PlanError::ItemAlreadyExists {
985            name: full_name.to_string(),
986            item_type: item.item_type(),
987        });
988    }
989
990    // We will rewrite the cluster if one is not provided, so we must use the
991    // `in_cluster` value we plan to normalize when we canonicalize the create
992    // statement.
993    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
994
995    let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
996
997    // Determine a default timeline for the source.
998    let timeline = match envelope {
999        SourceEnvelope::CdcV2 => {
1000            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1001        }
1002        _ => Timeline::EpochMilliseconds,
1003    };
1004
1005    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1006
1007    let source = Source {
1008        create_sql,
1009        data_source,
1010        desc,
1011        compaction_window,
1012    };
1013
1014    Ok(Plan::CreateSource(CreateSourcePlan {
1015        name,
1016        source,
1017        if_not_exists,
1018        timeline,
1019        in_cluster: Some(in_cluster.id()),
1020    }))
1021}
1022
1023pub fn plan_generic_source_connection(
1024    scx: &StatementContext<'_>,
1025    source_connection: &CreateSourceConnection<Aug>,
1026    include_metadata: &Vec<SourceIncludeMetadata>,
1027) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1028    Ok(match source_connection {
1029        CreateSourceConnection::Kafka {
1030            connection,
1031            options,
1032        } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1033            scx,
1034            connection,
1035            options,
1036            include_metadata,
1037        )?),
1038        CreateSourceConnection::Postgres {
1039            connection,
1040            options,
1041        } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1042            scx, connection, options,
1043        )?),
1044        CreateSourceConnection::SqlServer {
1045            connection,
1046            options,
1047        } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1048            scx, connection, options,
1049        )?),
1050        CreateSourceConnection::MySql {
1051            connection,
1052            options,
1053        } => {
1054            GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1055        }
1056        CreateSourceConnection::LoadGenerator { generator, options } => {
1057            GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1058                scx,
1059                generator,
1060                options,
1061                include_metadata,
1062            )?)
1063        }
1064    })
1065}
1066
1067fn plan_load_generator_source_connection(
1068    scx: &StatementContext<'_>,
1069    generator: &ast::LoadGenerator,
1070    options: &Vec<LoadGeneratorOption<Aug>>,
1071    include_metadata: &Vec<SourceIncludeMetadata>,
1072) -> Result<LoadGeneratorSourceConnection, PlanError> {
1073    let load_generator =
1074        load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1075    let LoadGeneratorOptionExtracted {
1076        tick_interval,
1077        as_of,
1078        up_to,
1079        ..
1080    } = options.clone().try_into()?;
1081    let tick_micros = match tick_interval {
1082        Some(interval) => Some(interval.as_micros().try_into()?),
1083        None => None,
1084    };
1085    if up_to < as_of {
1086        sql_bail!("UP TO cannot be less than AS OF");
1087    }
1088    Ok(LoadGeneratorSourceConnection {
1089        load_generator,
1090        tick_micros,
1091        as_of,
1092        up_to,
1093    })
1094}
1095
1096fn plan_mysql_source_connection(
1097    scx: &StatementContext<'_>,
1098    connection: &ResolvedItemName,
1099    options: &Vec<MySqlConfigOption<Aug>>,
1100) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1101    let connection_item = scx.get_item_by_resolved_name(connection)?;
1102    match connection_item.connection()? {
1103        Connection::MySql(connection) => connection,
1104        _ => sql_bail!(
1105            "{} is not a MySQL connection",
1106            scx.catalog.resolve_full_name(connection_item.name())
1107        ),
1108    };
1109    let MySqlConfigOptionExtracted {
1110        details,
1111        // text/exclude columns are already part of the source-exports and are only included
1112        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1113        // be removed once we drop support for implicitly created subsources.
1114        text_columns: _,
1115        exclude_columns: _,
1116        seen: _,
1117    } = options.clone().try_into()?;
1118    let details = details
1119        .as_ref()
1120        .ok_or_else(|| internal_err!("MySQL source missing details"))?;
1121    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1122    let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1123    let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1124    Ok(MySqlSourceConnection {
1125        connection: connection_item.id(),
1126        connection_id: connection_item.id(),
1127        details,
1128    })
1129}
1130
1131fn plan_sqlserver_source_connection(
1132    scx: &StatementContext<'_>,
1133    connection: &ResolvedItemName,
1134    options: &Vec<SqlServerConfigOption<Aug>>,
1135) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1136    let connection_item = scx.get_item_by_resolved_name(connection)?;
1137    match connection_item.connection()? {
1138        Connection::SqlServer(connection) => connection,
1139        _ => sql_bail!(
1140            "{} is not a SQL Server connection",
1141            scx.catalog.resolve_full_name(connection_item.name())
1142        ),
1143    };
1144    let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1145    let details = details
1146        .as_ref()
1147        .ok_or_else(|| internal_err!("SQL Server source missing details"))?;
1148    let extras = hex::decode(details)
1149        .map_err(|e| sql_err!("{e}"))
1150        .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1151        .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1152    Ok(SqlServerSourceConnection {
1153        connection_id: connection_item.id(),
1154        connection: connection_item.id(),
1155        extras,
1156    })
1157}
1158
1159fn plan_postgres_source_connection(
1160    scx: &StatementContext<'_>,
1161    connection: &ResolvedItemName,
1162    options: &Vec<PgConfigOption<Aug>>,
1163) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1164    let connection_item = scx.get_item_by_resolved_name(connection)?;
1165    let PgConfigOptionExtracted {
1166        details,
1167        publication,
1168        // text columns are already part of the source-exports and are only included
1169        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1170        // be removed once we drop support for implicitly created subsources.
1171        text_columns: _,
1172        // exclude columns are already part of the source-exports and are only included
1173        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1174        // be removed once we drop support for implicitly created subsources.
1175        exclude_columns: _,
1176        seen: _,
1177    } = options.clone().try_into()?;
1178    let details = details
1179        .as_ref()
1180        .ok_or_else(|| internal_err!("Postgres source missing details"))?;
1181    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1182    let details =
1183        ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1184    let publication_details =
1185        PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1186    Ok(PostgresSourceConnection {
1187        connection: connection_item.id(),
1188        connection_id: connection_item.id(),
1189        // Validated during purification.
1190        publication: publication.ok_or_else(|| internal_err!("PUBLICATION option is required"))?,
1191        publication_details,
1192    })
1193}
1194
1195fn plan_kafka_source_connection(
1196    scx: &StatementContext<'_>,
1197    connection_name: &ResolvedItemName,
1198    options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1199    include_metadata: &Vec<SourceIncludeMetadata>,
1200) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1201    let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1202    if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1203        sql_bail!(
1204            "{} is not a kafka connection",
1205            scx.catalog.resolve_full_name(connection_item.name())
1206        )
1207    }
1208    let KafkaSourceConfigOptionExtracted {
1209        group_id_prefix,
1210        topic,
1211        topic_metadata_refresh_interval,
1212        start_timestamp: _, // purified into `start_offset`
1213        start_offset,
1214        seen: _,
1215    }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1216    // Validated during purification.
1217    let topic = topic.ok_or_else(|| internal_err!("TOPIC option is required"))?;
1218    let mut start_offsets = BTreeMap::new();
1219    if let Some(offsets) = start_offset {
1220        for (part, offset) in offsets.iter().enumerate() {
1221            if *offset < 0 {
1222                sql_bail!("START OFFSET must be a nonnegative integer");
1223            }
1224            start_offsets.insert(i32::try_from(part)?, *offset);
1225        }
1226    }
1227    if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1228        // This is a librdkafka-enforced restriction that, if violated,
1229        // would result in a runtime error for the source.
1230        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1231    }
1232    let metadata_columns = include_metadata
1233        .into_iter()
1234        .flat_map(|item| match item {
1235            SourceIncludeMetadata::Timestamp { alias } => {
1236                let name = match alias {
1237                    Some(name) => name.to_string(),
1238                    None => "timestamp".to_owned(),
1239                };
1240                Some((name, KafkaMetadataKind::Timestamp))
1241            }
1242            SourceIncludeMetadata::Partition { alias } => {
1243                let name = match alias {
1244                    Some(name) => name.to_string(),
1245                    None => "partition".to_owned(),
1246                };
1247                Some((name, KafkaMetadataKind::Partition))
1248            }
1249            SourceIncludeMetadata::Offset { alias } => {
1250                let name = match alias {
1251                    Some(name) => name.to_string(),
1252                    None => "offset".to_owned(),
1253                };
1254                Some((name, KafkaMetadataKind::Offset))
1255            }
1256            SourceIncludeMetadata::Headers { alias } => {
1257                let name = match alias {
1258                    Some(name) => name.to_string(),
1259                    None => "headers".to_owned(),
1260                };
1261                Some((name, KafkaMetadataKind::Headers))
1262            }
1263            SourceIncludeMetadata::Header {
1264                alias,
1265                key,
1266                use_bytes,
1267            } => Some((
1268                alias.to_string(),
1269                KafkaMetadataKind::Header {
1270                    key: key.clone(),
1271                    use_bytes: *use_bytes,
1272                },
1273            )),
1274            SourceIncludeMetadata::Key { .. } => {
1275                // handled below
1276                None
1277            }
1278        })
1279        .collect();
1280    Ok(KafkaSourceConnection {
1281        connection: connection_item.id(),
1282        connection_id: connection_item.id(),
1283        topic,
1284        start_offsets,
1285        group_id_prefix,
1286        topic_metadata_refresh_interval,
1287        metadata_columns,
1288    })
1289}
1290
1291fn apply_source_envelope_encoding(
1292    scx: &StatementContext,
1293    envelope: &ast::SourceEnvelope,
1294    format: &Option<FormatSpecifier<Aug>>,
1295    key_desc: Option<RelationDesc>,
1296    value_desc: RelationDesc,
1297    include_metadata: &[SourceIncludeMetadata],
1298    metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1299    source_connection: &GenericSourceConnection<ReferencedConnection>,
1300) -> Result<
1301    (
1302        RelationDesc,
1303        SourceEnvelope,
1304        Option<SourceDataEncoding<ReferencedConnection>>,
1305    ),
1306    PlanError,
1307> {
1308    let encoding = match format {
1309        Some(format) => Some(get_encoding(scx, format, envelope)?),
1310        None => None,
1311    };
1312
1313    let (key_desc, value_desc) = match &encoding {
1314        Some(encoding) => {
1315            // If we are applying an encoding we need to ensure that the incoming value_desc is a
1316            // single column of type bytes.
1317            match value_desc.typ().columns() {
1318                [typ] => match typ.scalar_type {
1319                    SqlScalarType::Bytes => {}
1320                    _ => sql_bail!(
1321                        "The schema produced by the source is incompatible with format decoding"
1322                    ),
1323                },
1324                _ => sql_bail!(
1325                    "The schema produced by the source is incompatible with format decoding"
1326                ),
1327            }
1328
1329            let (key_desc, value_desc) = encoding.desc()?;
1330
1331            // TODO(petrosagg): This piece of code seems to be making a statement about the
1332            // nullability of the NONE envelope when the source is Kafka. As written, the code
1333            // misses opportunities to mark columns as not nullable and is over conservative. For
1334            // example in the case of `FORMAT BYTES ENVELOPE NONE` the output is indeed
1335            // non-nullable but we will mark it as nullable anyway. This kind of crude reasoning
1336            // should be replaced with precise type-level reasoning.
1337            let key_desc = key_desc.map(|desc| {
1338                let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1339                let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1340                if is_kafka && is_envelope_none {
1341                    RelationDesc::from_names_and_types(
1342                        desc.into_iter()
1343                            .map(|(name, typ)| (name, typ.nullable(true))),
1344                    )
1345                } else {
1346                    desc
1347                }
1348            });
1349            (key_desc, value_desc)
1350        }
1351        None => (key_desc, value_desc),
1352    };
1353
1354    // KEY VALUE load generators are the only UPSERT source that
1355    // has no encoding but defaults to `INCLUDE KEY`.
1356    //
1357    // As discussed
1358    // <https://github.com/MaterializeInc/materialize/pull/26246#issuecomment-2023558097>,
1359    // removing this special case amounts to deciding how to handle null keys
1360    // from sources, in a holistic way. We aren't yet prepared to do this, so we leave
1361    // this special case in.
1362    //
1363    // Note that this is safe because this generator is
1364    // 1. The only source with no encoding that can have its key included.
1365    // 2. Never produces null keys (or values, for that matter).
1366    let key_envelope_no_encoding = matches!(
1367        source_connection,
1368        GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1369            load_generator: LoadGenerator::KeyValue(_),
1370            ..
1371        })
1372    );
1373    let mut key_envelope = get_key_envelope(
1374        include_metadata,
1375        encoding.as_ref(),
1376        key_envelope_no_encoding,
1377    )?;
1378
1379    match (&envelope, &key_envelope) {
1380        (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1381        (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1382            "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1383        ),
1384        _ => {}
1385    };
1386
1387    // Not all source envelopes are compatible with all source connections.
1388    // Whoever constructs the source ingestion pipeline is responsible for
1389    // choosing compatible envelopes and connections.
1390    //
1391    // TODO(guswynn): ambiguously assert which connections and envelopes are
1392    // compatible in typechecking
1393    //
1394    // TODO: remove bails as more support for upsert is added.
1395    let envelope = match &envelope {
1396        // TODO: fixup key envelope
1397        ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1398        ast::SourceEnvelope::Debezium => {
1399            //TODO check that key envelope is not set
1400            let after_idx = match typecheck_debezium(&value_desc) {
1401                Ok((_before_idx, after_idx)) => Ok(after_idx),
1402                Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1403                    Some(DataEncoding::Avro(_)) => Err(type_err),
1404                    _ => Err(sql_err!(
1405                        "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1406                    )),
1407                },
1408            }?;
1409
1410            UnplannedSourceEnvelope::Upsert {
1411                style: UpsertStyle::Debezium { after_idx },
1412            }
1413        }
1414        ast::SourceEnvelope::Upsert {
1415            value_decode_err_policy,
1416        } => {
1417            let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1418                None => {
1419                    if !key_envelope_no_encoding {
1420                        bail_unsupported!(format!(
1421                            "UPSERT requires a key/value format: {:?}",
1422                            format
1423                        ))
1424                    }
1425                    None
1426                }
1427                Some(key_encoding) => Some(key_encoding),
1428            };
1429            // `ENVELOPE UPSERT` implies `INCLUDE KEY`, if it is not explicitly
1430            // specified.
1431            if key_envelope == KeyEnvelope::None {
1432                key_envelope = get_unnamed_key_envelope(key_encoding)?;
1433            }
1434            // If the value decode error policy is not set we use the default upsert style.
1435            let style = match value_decode_err_policy.as_slice() {
1436                [] => UpsertStyle::Default(key_envelope),
1437                [SourceErrorPolicy::Inline { alias }] => {
1438                    scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1439                    UpsertStyle::ValueErrInline {
1440                        key_envelope,
1441                        error_column: alias
1442                            .as_ref()
1443                            .map_or_else(|| "error".to_string(), |a| a.to_string()),
1444                    }
1445                }
1446                _ => {
1447                    bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1448                }
1449            };
1450
1451            UnplannedSourceEnvelope::Upsert { style }
1452        }
1453        ast::SourceEnvelope::CdcV2 => {
1454            scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1455            //TODO check that key envelope is not set
1456            match format {
1457                Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1458                _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1459            }
1460            UnplannedSourceEnvelope::CdcV2
1461        }
1462    };
1463
1464    let metadata_desc = included_column_desc(metadata_columns_desc);
1465    let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1466
1467    Ok((desc, envelope, encoding))
1468}
1469
1470/// Plans the RelationDesc for a source export (subsource or table) that has a defined list
1471/// of columns and constraints.
1472fn plan_source_export_desc(
1473    scx: &StatementContext,
1474    name: &UnresolvedItemName,
1475    columns: &Vec<ColumnDef<Aug>>,
1476    constraints: &Vec<TableConstraint<Aug>>,
1477) -> Result<RelationDesc, PlanError> {
1478    let names: Vec<_> = columns
1479        .iter()
1480        .map(|c| normalize::column_name(c.name.clone()))
1481        .collect();
1482
1483    if let Some(dup) = names.iter().duplicates().next() {
1484        sql_bail!("column {} specified more than once", dup.quoted());
1485    }
1486
1487    // Build initial relation type that handles declared data types
1488    // and NOT NULL constraints.
1489    let mut column_types = Vec::with_capacity(columns.len());
1490    let mut keys = Vec::new();
1491
1492    for (i, c) in columns.into_iter().enumerate() {
1493        let aug_data_type = &c.data_type;
1494        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1495        let mut nullable = true;
1496        for option in &c.options {
1497            match &option.option {
1498                ColumnOption::NotNull => nullable = false,
1499                ColumnOption::Default(_) => {
1500                    bail_unsupported!("Source export with default value")
1501                }
1502                ColumnOption::Unique { is_primary } => {
1503                    keys.push(vec![i]);
1504                    if *is_primary {
1505                        nullable = false;
1506                    }
1507                }
1508                other => {
1509                    bail_unsupported!(format!("Source export with column constraint: {}", other))
1510                }
1511            }
1512        }
1513        column_types.push(ty.nullable(nullable));
1514    }
1515
1516    let mut seen_primary = false;
1517    'c: for constraint in constraints {
1518        match constraint {
1519            TableConstraint::Unique {
1520                name: _,
1521                columns,
1522                is_primary,
1523                nulls_not_distinct,
1524            } => {
1525                if seen_primary && *is_primary {
1526                    sql_bail!(
1527                        "multiple primary keys for source export {} are not allowed",
1528                        name.to_ast_string_stable()
1529                    );
1530                }
1531                seen_primary = *is_primary || seen_primary;
1532
1533                let mut key = vec![];
1534                for column in columns {
1535                    let column = normalize::column_name(column.clone());
1536                    match names.iter().position(|name| *name == column) {
1537                        None => sql_bail!("unknown column in constraint: {}", column),
1538                        Some(i) => {
1539                            let nullable = &mut column_types[i].nullable;
1540                            if *is_primary {
1541                                if *nulls_not_distinct {
1542                                    sql_bail!(
1543                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1544                                    );
1545                                }
1546                                *nullable = false;
1547                            } else if !(*nulls_not_distinct || !*nullable) {
1548                                // Non-primary key unique constraints are only keys if all of their
1549                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
1550                                break 'c;
1551                            }
1552
1553                            key.push(i);
1554                        }
1555                    }
1556                }
1557
1558                if *is_primary {
1559                    keys.insert(0, key);
1560                } else {
1561                    keys.push(key);
1562                }
1563            }
1564            TableConstraint::ForeignKey { .. } => {
1565                bail_unsupported!("Source export with a foreign key")
1566            }
1567            TableConstraint::Check { .. } => {
1568                bail_unsupported!("Source export with a check constraint")
1569            }
1570        }
1571    }
1572
1573    let typ = SqlRelationType::new(column_types).with_keys(keys);
1574    let desc = RelationDesc::new(typ, names);
1575    Ok(desc)
1576}
1577
1578generate_extracted_config!(
1579    CreateSubsourceOption,
1580    (Progress, bool, Default(false)),
1581    (ExternalReference, UnresolvedItemName),
1582    (RetainHistory, OptionalDuration),
1583    (TextColumns, Vec::<Ident>, Default(vec![])),
1584    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1585    (Details, String)
1586);
1587
1588pub fn plan_create_subsource(
1589    scx: &StatementContext,
1590    stmt: CreateSubsourceStatement<Aug>,
1591) -> Result<Plan, PlanError> {
1592    let CreateSubsourceStatement {
1593        name,
1594        columns,
1595        of_source,
1596        constraints,
1597        if_not_exists,
1598        with_options,
1599    } = &stmt;
1600
1601    let CreateSubsourceOptionExtracted {
1602        progress,
1603        retain_history,
1604        external_reference,
1605        text_columns,
1606        exclude_columns,
1607        details,
1608        seen: _,
1609    } = with_options.clone().try_into()?;
1610
1611    // This invariant is enforced during purification; we are responsible for
1612    // creating the AST for subsources as a response to CREATE SOURCE
1613    // statements, so this would fire in integration testing if we failed to
1614    // uphold it.
1615    if !(progress ^ (external_reference.is_some() && of_source.is_some())) {
1616        bail_internal!(
1617            "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1618        );
1619    }
1620
1621    let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1622
1623    let data_source = if let Some(source_reference) = of_source {
1624        // If the new source table syntax is forced we should not be creating any non-progress
1625        // subsources.
1626        if scx.catalog.system_vars().enable_create_table_from_source()
1627            && scx.catalog.system_vars().force_source_table_syntax()
1628        {
1629            Err(PlanError::UseTablesForSources(
1630                "CREATE SUBSOURCE".to_string(),
1631            ))?;
1632        }
1633
1634        // This is a subsource with the "natural" dependency order, i.e. it is
1635        // not a legacy subsource with the inverted structure.
1636        let ingestion_id = *source_reference.item_id();
1637        let external_reference = external_reference.ok_or_else(|| {
1638            sql_err!("CREATE SUBSOURCE with REFERENCES requires EXTERNAL REFERENCE option")
1639        })?;
1640
1641        // Decode the details option stored on the subsource statement, which contains information
1642        // created during the purification process.
1643        let details = details
1644            .as_ref()
1645            .ok_or_else(|| internal_err!("source-export subsource missing details"))?;
1646        let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1647        let details =
1648            ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1649        let details =
1650            SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1651        let details = match details {
1652            SourceExportStatementDetails::Postgres { table } => {
1653                SourceExportDetails::Postgres(PostgresSourceExportDetails {
1654                    column_casts: crate::pure::postgres::generate_column_casts(
1655                        scx,
1656                        &table,
1657                        &text_columns,
1658                    )?,
1659                    table,
1660                })
1661            }
1662            SourceExportStatementDetails::MySql {
1663                table,
1664                initial_gtid_set,
1665            } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1666                table,
1667                initial_gtid_set,
1668                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1669                exclude_columns: exclude_columns
1670                    .into_iter()
1671                    .map(|c| c.into_string())
1672                    .collect(),
1673            }),
1674            SourceExportStatementDetails::SqlServer {
1675                table,
1676                capture_instance,
1677                initial_lsn,
1678            } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1679                capture_instance,
1680                table,
1681                initial_lsn,
1682                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1683                exclude_columns: exclude_columns
1684                    .into_iter()
1685                    .map(|c| c.into_string())
1686                    .collect(),
1687            }),
1688            SourceExportStatementDetails::LoadGenerator { output } => {
1689                SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1690            }
1691            SourceExportStatementDetails::Kafka {} => {
1692                bail_unsupported!("subsources cannot reference Kafka sources")
1693            }
1694        };
1695        DataSourceDesc::IngestionExport {
1696            ingestion_id,
1697            external_reference,
1698            details,
1699            // Subsources don't currently support non-default envelopes / encoding
1700            data_config: SourceExportDataConfig {
1701                envelope: SourceEnvelope::None(NoneEnvelope {
1702                    key_envelope: KeyEnvelope::None,
1703                    key_arity: 0,
1704                }),
1705                encoding: None,
1706            },
1707        }
1708    } else if progress {
1709        DataSourceDesc::Progress
1710    } else {
1711        sql_bail!("CREATE SUBSOURCE must specify one of PROGRESS or REFERENCES option")
1712    };
1713
1714    let if_not_exists = *if_not_exists;
1715    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1716
1717    let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1718
1719    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1720    let source = Source {
1721        create_sql,
1722        data_source,
1723        desc,
1724        compaction_window,
1725    };
1726
1727    Ok(Plan::CreateSource(CreateSourcePlan {
1728        name,
1729        source,
1730        if_not_exists,
1731        timeline: Timeline::EpochMilliseconds,
1732        in_cluster: None,
1733    }))
1734}
1735
1736generate_extracted_config!(
1737    TableFromSourceOption,
1738    (TextColumns, Vec::<Ident>, Default(vec![])),
1739    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1740    (PartitionBy, Vec<Ident>),
1741    (RetainHistory, OptionalDuration),
1742    (Details, String)
1743);
1744
1745pub fn plan_create_table_from_source(
1746    scx: &StatementContext,
1747    stmt: CreateTableFromSourceStatement<Aug>,
1748) -> Result<Plan, PlanError> {
1749    if !scx.catalog.system_vars().enable_create_table_from_source() {
1750        sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1751    }
1752
1753    let CreateTableFromSourceStatement {
1754        name,
1755        columns,
1756        constraints,
1757        if_not_exists,
1758        source,
1759        external_reference,
1760        envelope,
1761        format,
1762        include_metadata,
1763        with_options,
1764    } = &stmt;
1765
1766    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1767
1768    let TableFromSourceOptionExtracted {
1769        text_columns,
1770        exclude_columns,
1771        retain_history,
1772        partition_by,
1773        details,
1774        seen: _,
1775    } = with_options.clone().try_into()?;
1776
1777    let source_item = scx.get_item_by_resolved_name(source)?;
1778    let ingestion_id = source_item.id();
1779
1780    // Decode the details option stored on the statement, which contains information
1781    // created during the purification process.
1782    let details = details
1783        .as_ref()
1784        .ok_or_else(|| internal_err!("source-export missing details"))?;
1785    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1786    let details =
1787        ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1788    let details =
1789        SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1790
1791    if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1792        && include_metadata
1793            .iter()
1794            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1795    {
1796        // TODO(guswynn): should this be `bail_unsupported!`?
1797        sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1798    }
1799    if !matches!(
1800        details,
1801        SourceExportStatementDetails::Kafka { .. }
1802            | SourceExportStatementDetails::LoadGenerator { .. }
1803    ) && !include_metadata.is_empty()
1804    {
1805        bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1806    }
1807
1808    let details = match details {
1809        SourceExportStatementDetails::Postgres { table } => {
1810            SourceExportDetails::Postgres(PostgresSourceExportDetails {
1811                column_casts: crate::pure::postgres::generate_column_casts(
1812                    scx,
1813                    &table,
1814                    &text_columns,
1815                )?,
1816                table,
1817            })
1818        }
1819        SourceExportStatementDetails::MySql {
1820            table,
1821            initial_gtid_set,
1822        } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1823            table,
1824            initial_gtid_set,
1825            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1826            exclude_columns: exclude_columns
1827                .into_iter()
1828                .map(|c| c.into_string())
1829                .collect(),
1830        }),
1831        SourceExportStatementDetails::SqlServer {
1832            table,
1833            capture_instance,
1834            initial_lsn,
1835        } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1836            table,
1837            capture_instance,
1838            initial_lsn,
1839            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1840            exclude_columns: exclude_columns
1841                .into_iter()
1842                .map(|c| c.into_string())
1843                .collect(),
1844        }),
1845        SourceExportStatementDetails::LoadGenerator { output } => {
1846            SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1847        }
1848        SourceExportStatementDetails::Kafka {} => {
1849            if !include_metadata.is_empty()
1850                && !matches!(
1851                    envelope,
1852                    ast::SourceEnvelope::Upsert { .. }
1853                        | ast::SourceEnvelope::None
1854                        | ast::SourceEnvelope::Debezium
1855                )
1856            {
1857                // TODO(guswynn): should this be `bail_unsupported!`?
1858                sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1859            }
1860
1861            let metadata_columns = include_metadata
1862                .into_iter()
1863                .flat_map(|item| match item {
1864                    SourceIncludeMetadata::Timestamp { alias } => {
1865                        let name = match alias {
1866                            Some(name) => name.to_string(),
1867                            None => "timestamp".to_owned(),
1868                        };
1869                        Some((name, KafkaMetadataKind::Timestamp))
1870                    }
1871                    SourceIncludeMetadata::Partition { alias } => {
1872                        let name = match alias {
1873                            Some(name) => name.to_string(),
1874                            None => "partition".to_owned(),
1875                        };
1876                        Some((name, KafkaMetadataKind::Partition))
1877                    }
1878                    SourceIncludeMetadata::Offset { alias } => {
1879                        let name = match alias {
1880                            Some(name) => name.to_string(),
1881                            None => "offset".to_owned(),
1882                        };
1883                        Some((name, KafkaMetadataKind::Offset))
1884                    }
1885                    SourceIncludeMetadata::Headers { alias } => {
1886                        let name = match alias {
1887                            Some(name) => name.to_string(),
1888                            None => "headers".to_owned(),
1889                        };
1890                        Some((name, KafkaMetadataKind::Headers))
1891                    }
1892                    SourceIncludeMetadata::Header {
1893                        alias,
1894                        key,
1895                        use_bytes,
1896                    } => Some((
1897                        alias.to_string(),
1898                        KafkaMetadataKind::Header {
1899                            key: key.clone(),
1900                            use_bytes: *use_bytes,
1901                        },
1902                    )),
1903                    SourceIncludeMetadata::Key { .. } => {
1904                        // handled below
1905                        None
1906                    }
1907                })
1908                .collect();
1909
1910            SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1911        }
1912    };
1913
1914    let source_connection = &source_item
1915        .source_desc()?
1916        .ok_or_else(|| sql_err!("item is not a source"))?
1917        .connection;
1918
1919    // Some source-types (e.g. postgres, mysql, multi-output load-gen sources) define a value_schema
1920    // during purification and define the `columns` and `constraints` fields for the statement,
1921    // whereas other source-types (e.g. kafka, single-output load-gen sources) do not, so instead
1922    // we use the source connection's default schema.
1923    let (key_desc, value_desc) =
1924        if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1925            let columns = match columns {
1926                TableFromSourceColumns::Defined(columns) => columns,
1927                _ => bail_internal!("expected column definitions to be present"),
1928            };
1929            let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1930            (None, desc)
1931        } else {
1932            let key_desc = source_connection.default_key_desc();
1933            let value_desc = source_connection.default_value_desc();
1934            (Some(key_desc), value_desc)
1935        };
1936
1937    let metadata_columns_desc = match &details {
1938        SourceExportDetails::Kafka(KafkaSourceExportDetails {
1939            metadata_columns, ..
1940        }) => kafka_metadata_columns_desc(metadata_columns),
1941        _ => vec![],
1942    };
1943
1944    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1945        scx,
1946        &envelope,
1947        format,
1948        key_desc,
1949        value_desc,
1950        include_metadata,
1951        metadata_columns_desc,
1952        source_connection,
1953    )?;
1954    if let TableFromSourceColumns::Named(col_names) = columns {
1955        plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1956    }
1957
1958    let names: Vec<_> = desc.iter_names().cloned().collect();
1959    if let Some(dup) = names.iter().duplicates().next() {
1960        sql_bail!("column {} specified more than once", dup.quoted());
1961    }
1962
1963    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1964
1965    // Allow users to specify a timeline. If they do not, determine a default
1966    // timeline for the source.
1967    let timeline = match envelope {
1968        SourceEnvelope::CdcV2 => {
1969            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1970        }
1971        _ => Timeline::EpochMilliseconds,
1972    };
1973
1974    if let Some(partition_by) = partition_by {
1975        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1976        check_partition_by(&desc, partition_by)?;
1977    }
1978
1979    let data_source = DataSourceDesc::IngestionExport {
1980        ingestion_id,
1981        // Populated during purification.
1982        external_reference: external_reference
1983            .as_ref()
1984            .ok_or_else(|| sql_err!("EXTERNAL REFERENCE is required"))?
1985            .clone(),
1986        details,
1987        data_config: SourceExportDataConfig { envelope, encoding },
1988    };
1989
1990    let if_not_exists = *if_not_exists;
1991
1992    let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1993
1994    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1995    let table = Table {
1996        create_sql,
1997        desc: VersionedRelationDesc::new(desc),
1998        temporary: false,
1999        compaction_window,
2000        data_source: TableDataSource::DataSource {
2001            desc: data_source,
2002            timeline,
2003        },
2004    };
2005
2006    Ok(Plan::CreateTable(CreateTablePlan {
2007        name,
2008        table,
2009        if_not_exists,
2010    }))
2011}
2012
2013generate_extracted_config!(
2014    LoadGeneratorOption,
2015    (TickInterval, Duration),
2016    (AsOf, u64, Default(0_u64)),
2017    (UpTo, u64, Default(u64::MAX)),
2018    (ScaleFactor, f64),
2019    (MaxCardinality, u64),
2020    (Keys, u64),
2021    (SnapshotRounds, u64),
2022    (TransactionalSnapshot, bool),
2023    (ValueSize, u64),
2024    (Seed, u64),
2025    (Partitions, u64),
2026    (BatchSize, u64)
2027);
2028
2029impl LoadGeneratorOptionExtracted {
2030    pub(super) fn ensure_only_valid_options(
2031        &self,
2032        loadgen: &ast::LoadGenerator,
2033    ) -> Result<(), PlanError> {
2034        use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2035
2036        let mut options = self.seen.clone();
2037
2038        let permitted_options: &[_] = match loadgen {
2039            ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2040            ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2041            ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2042            ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2043            ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2044            ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2045            ast::LoadGenerator::KeyValue => &[
2046                TickInterval,
2047                Keys,
2048                SnapshotRounds,
2049                TransactionalSnapshot,
2050                ValueSize,
2051                Seed,
2052                Partitions,
2053                BatchSize,
2054            ],
2055        };
2056
2057        for o in permitted_options {
2058            options.remove(o);
2059        }
2060
2061        if !options.is_empty() {
2062            sql_bail!(
2063                "{} load generators do not support {} values",
2064                loadgen,
2065                options.iter().join(", ")
2066            )
2067        }
2068
2069        Ok(())
2070    }
2071}
2072
2073pub(crate) fn load_generator_ast_to_generator(
2074    scx: &StatementContext,
2075    loadgen: &ast::LoadGenerator,
2076    options: &[LoadGeneratorOption<Aug>],
2077    include_metadata: &[SourceIncludeMetadata],
2078) -> Result<LoadGenerator, PlanError> {
2079    let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2080    extracted.ensure_only_valid_options(loadgen)?;
2081
2082    if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2083        sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2084    }
2085
2086    let load_generator = match loadgen {
2087        ast::LoadGenerator::Auction => LoadGenerator::Auction,
2088        ast::LoadGenerator::Clock => {
2089            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_CLOCK)?;
2090            LoadGenerator::Clock
2091        }
2092        ast::LoadGenerator::Counter => {
2093            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_COUNTER)?;
2094            let LoadGeneratorOptionExtracted {
2095                max_cardinality, ..
2096            } = extracted;
2097            LoadGenerator::Counter { max_cardinality }
2098        }
2099        ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2100        ast::LoadGenerator::Datums => {
2101            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_DATUMS)?;
2102            LoadGenerator::Datums
2103        }
2104        ast::LoadGenerator::Tpch => {
2105            let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2106
2107            // Default to 0.01 scale factor (=10MB).
2108            let sf: f64 = scale_factor.unwrap_or(0.01);
2109            if !sf.is_finite() || sf < 0.0 {
2110                sql_bail!("unsupported scale factor {sf}");
2111            }
2112
2113            let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2114                let total = (sf * multiplier).floor();
2115                let mut i = i64::try_cast_from(total)
2116                    .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2117                if i < 1 {
2118                    i = 1;
2119                }
2120                Ok(i)
2121            };
2122
2123            // The multiplications here are safely unchecked because they will
2124            // overflow to infinity, which will be caught by f64_to_i64.
2125            let count_supplier = f_to_i(10_000f64)?;
2126            let count_part = f_to_i(200_000f64)?;
2127            let count_customer = f_to_i(150_000f64)?;
2128            let count_orders = f_to_i(150_000f64 * 10f64)?;
2129            let count_clerk = f_to_i(1_000f64)?;
2130
2131            LoadGenerator::Tpch {
2132                count_supplier,
2133                count_part,
2134                count_customer,
2135                count_orders,
2136                count_clerk,
2137            }
2138        }
2139        mz_sql_parser::ast::LoadGenerator::KeyValue => {
2140            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2141            let LoadGeneratorOptionExtracted {
2142                keys,
2143                snapshot_rounds,
2144                transactional_snapshot,
2145                value_size,
2146                tick_interval,
2147                seed,
2148                partitions,
2149                batch_size,
2150                ..
2151            } = extracted;
2152
2153            let mut include_offset = None;
2154            for im in include_metadata {
2155                match im {
2156                    SourceIncludeMetadata::Offset { alias } => {
2157                        include_offset = match alias {
2158                            Some(alias) => Some(alias.to_string()),
2159                            None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2160                        }
2161                    }
2162                    SourceIncludeMetadata::Key { .. } => continue,
2163
2164                    _ => {
2165                        sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2166                    }
2167                };
2168            }
2169
2170            let lgkv = KeyValueLoadGenerator {
2171                keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2172                snapshot_rounds: snapshot_rounds
2173                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2174                // Defaults to true.
2175                transactional_snapshot: transactional_snapshot.unwrap_or(true),
2176                value_size: value_size
2177                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2178                partitions: partitions
2179                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2180                tick_interval,
2181                batch_size: batch_size
2182                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2183                seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2184                include_offset,
2185            };
2186
2187            if lgkv.keys == 0
2188                || lgkv.partitions == 0
2189                || lgkv.value_size == 0
2190                || lgkv.batch_size == 0
2191            {
2192                sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2193            }
2194
2195            if lgkv.keys % lgkv.partitions != 0 {
2196                sql_bail!("KEYS must be a multiple of PARTITIONS")
2197            }
2198
2199            if lgkv.batch_size > lgkv.keys {
2200                sql_bail!("KEYS must be larger than BATCH SIZE")
2201            }
2202
2203            // This constraints simplifies the source implementation.
2204            // We can lift it later.
2205            if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2206                sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2207            }
2208
2209            if lgkv.snapshot_rounds == 0 {
2210                sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2211            }
2212
2213            LoadGenerator::KeyValue(lgkv)
2214        }
2215    };
2216
2217    Ok(load_generator)
2218}
2219
2220fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2221    let before = value_desc.get_by_name(&"before".into());
2222    let (after_idx, after_ty) = value_desc
2223        .get_by_name(&"after".into())
2224        .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2225    let before_idx = if let Some((before_idx, before_ty)) = before {
2226        if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2227            sql_bail!("'before' column must be of type record");
2228        }
2229        if before_ty != after_ty {
2230            sql_bail!("'before' type differs from 'after' column");
2231        }
2232        Some(before_idx)
2233    } else {
2234        None
2235    };
2236    Ok((before_idx, after_idx))
2237}
2238
2239fn get_encoding(
2240    scx: &StatementContext,
2241    format: &FormatSpecifier<Aug>,
2242    envelope: &ast::SourceEnvelope,
2243) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2244    let encoding = match format {
2245        FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2246        FormatSpecifier::KeyValue { key, value } => {
2247            let key = {
2248                let encoding = get_encoding_inner(scx, key)?;
2249                Some(encoding.key.unwrap_or(encoding.value))
2250            };
2251            let value = get_encoding_inner(scx, value)?.value;
2252            SourceDataEncoding { key, value }
2253        }
2254    };
2255
2256    let requires_keyvalue = matches!(
2257        envelope,
2258        ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2259    );
2260    let is_keyvalue = encoding.key.is_some();
2261    if requires_keyvalue && !is_keyvalue {
2262        sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2263    };
2264
2265    Ok(encoding)
2266}
2267
2268/// Determine the cluster ID to use for this item.
2269///
2270/// If `in_cluster` is `None` we will update it to refer to the default cluster.
2271/// Because of this, do not normalize/canonicalize the create SQL statement
2272/// until after calling this function.
2273fn source_sink_cluster_config<'a, 'ctx>(
2274    scx: &'a StatementContext<'ctx>,
2275    in_cluster: &mut Option<ResolvedClusterName>,
2276) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2277    let cluster = match in_cluster {
2278        None => {
2279            let cluster = scx.catalog.resolve_cluster(None)?;
2280            *in_cluster = Some(ResolvedClusterName {
2281                id: cluster.id(),
2282                print_name: None,
2283            });
2284            cluster
2285        }
2286        Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2287    };
2288
2289    Ok(cluster)
2290}
2291
2292generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2293
2294#[derive(Debug)]
2295pub struct Schema {
2296    pub key_schema: Option<String>,
2297    pub value_schema: String,
2298    /// Reference schemas for the key schema, in dependency order.
2299    pub key_reference_schemas: Vec<String>,
2300    /// Reference schemas for the value schema, in dependency order.
2301    pub value_reference_schemas: Vec<String>,
2302    pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2303    pub confluent_wire_format: bool,
2304}
2305
2306fn get_encoding_inner(
2307    scx: &StatementContext,
2308    format: &Format<Aug>,
2309) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2310    let value = match format {
2311        Format::Bytes => DataEncoding::Bytes,
2312        Format::Avro(schema) => {
2313            let Schema {
2314                key_schema,
2315                value_schema,
2316                key_reference_schemas,
2317                value_reference_schemas,
2318                csr_connection,
2319                confluent_wire_format,
2320            } = match schema {
2321                // TODO(jldlaughlin): we need a way to pass in primary key information
2322                // when building a source from a string or file.
2323                AvroSchema::InlineSchema {
2324                    schema: ast::Schema { schema },
2325                    with_options,
2326                } => {
2327                    let AvroSchemaOptionExtracted {
2328                        confluent_wire_format,
2329                        ..
2330                    } = with_options.clone().try_into()?;
2331
2332                    Schema {
2333                        key_schema: None,
2334                        value_schema: schema.clone(),
2335                        key_reference_schemas: vec![],
2336                        value_reference_schemas: vec![],
2337                        csr_connection: None,
2338                        confluent_wire_format,
2339                    }
2340                }
2341                AvroSchema::Csr {
2342                    csr_connection:
2343                        CsrConnectionAvro {
2344                            connection,
2345                            seed,
2346                            key_strategy: _,
2347                            value_strategy: _,
2348                        },
2349                } => {
2350                    let item = scx.get_item_by_resolved_name(&connection.connection)?;
2351                    let csr_connection = match item.connection()? {
2352                        Connection::Csr(_) => item.id(),
2353                        _ => {
2354                            sql_bail!(
2355                                "{} is not a schema registry connection",
2356                                scx.catalog
2357                                    .resolve_full_name(item.name())
2358                                    .to_string()
2359                                    .quoted()
2360                            )
2361                        }
2362                    };
2363
2364                    if let Some(seed) = seed {
2365                        Schema {
2366                            key_schema: seed.key_schema.clone(),
2367                            value_schema: seed.value_schema.clone(),
2368                            key_reference_schemas: seed.key_reference_schemas.clone(),
2369                            value_reference_schemas: seed.value_reference_schemas.clone(),
2370                            csr_connection: Some(csr_connection),
2371                            confluent_wire_format: true,
2372                        }
2373                    } else {
2374                        sql_bail!("Avro CSR seed resolution has not been performed")
2375                    }
2376                }
2377            };
2378
2379            if let Some(key_schema) = key_schema {
2380                return Ok(SourceDataEncoding {
2381                    key: Some(DataEncoding::Avro(AvroEncoding {
2382                        schema: key_schema,
2383                        reference_schemas: key_reference_schemas,
2384                        csr_connection: csr_connection.clone(),
2385                        confluent_wire_format,
2386                    })),
2387                    value: DataEncoding::Avro(AvroEncoding {
2388                        schema: value_schema,
2389                        reference_schemas: value_reference_schemas,
2390                        csr_connection,
2391                        confluent_wire_format,
2392                    }),
2393                });
2394            } else {
2395                DataEncoding::Avro(AvroEncoding {
2396                    schema: value_schema,
2397                    reference_schemas: value_reference_schemas,
2398                    csr_connection,
2399                    confluent_wire_format,
2400                })
2401            }
2402        }
2403        Format::Protobuf(schema) => match schema {
2404            ProtobufSchema::Csr {
2405                csr_connection:
2406                    CsrConnectionProtobuf {
2407                        connection:
2408                            CsrConnection {
2409                                connection,
2410                                options,
2411                            },
2412                        seed,
2413                    },
2414            } => {
2415                if let Some(CsrSeedProtobuf { key, value }) = seed {
2416                    let item = scx.get_item_by_resolved_name(connection)?;
2417                    let _ = match item.connection()? {
2418                        Connection::Csr(connection) => connection,
2419                        _ => {
2420                            sql_bail!(
2421                                "{} is not a schema registry connection",
2422                                scx.catalog
2423                                    .resolve_full_name(item.name())
2424                                    .to_string()
2425                                    .quoted()
2426                            )
2427                        }
2428                    };
2429
2430                    if !options.is_empty() {
2431                        sql_bail!("Protobuf CSR connections do not support any options");
2432                    }
2433
2434                    let value = DataEncoding::Protobuf(ProtobufEncoding {
2435                        descriptors: strconv::parse_bytes(&value.schema)?,
2436                        message_name: value.message_name.clone(),
2437                        confluent_wire_format: true,
2438                    });
2439                    if let Some(key) = key {
2440                        return Ok(SourceDataEncoding {
2441                            key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2442                                descriptors: strconv::parse_bytes(&key.schema)?,
2443                                message_name: key.message_name.clone(),
2444                                confluent_wire_format: true,
2445                            })),
2446                            value,
2447                        });
2448                    }
2449                    value
2450                } else {
2451                    sql_bail!("Protobuf CSR seed resolution has not been performed")
2452                }
2453            }
2454            ProtobufSchema::InlineSchema {
2455                message_name,
2456                schema: ast::Schema { schema },
2457            } => {
2458                let descriptors = strconv::parse_bytes(schema)?;
2459
2460                DataEncoding::Protobuf(ProtobufEncoding {
2461                    descriptors,
2462                    message_name: message_name.to_owned(),
2463                    confluent_wire_format: false,
2464                })
2465            }
2466        },
2467        Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2468            regex: mz_repr::adt::regex::Regex::new(regex, false)
2469                .map_err(|e| sql_err!("parsing regex: {e}"))?,
2470        }),
2471        Format::Csv { columns, delimiter } => {
2472            let columns = match columns {
2473                CsvColumns::Header { names } => {
2474                    if names.is_empty() {
2475                        sql_bail!("[internal error] column spec should get names in purify")
2476                    }
2477                    ColumnSpec::Header {
2478                        names: names.iter().cloned().map(|n| n.into_string()).collect(),
2479                    }
2480                }
2481                CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2482            };
2483            DataEncoding::Csv(CsvEncoding {
2484                columns,
2485                delimiter: u8::try_from(*delimiter)
2486                    .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2487            })
2488        }
2489        Format::Json { array: false } => DataEncoding::Json,
2490        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2491        Format::Text => DataEncoding::Text,
2492    };
2493    Ok(SourceDataEncoding { key: None, value })
2494}
2495
2496/// Extract the key envelope, if it is requested
2497fn get_key_envelope(
2498    included_items: &[SourceIncludeMetadata],
2499    encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2500    key_envelope_no_encoding: bool,
2501) -> Result<KeyEnvelope, PlanError> {
2502    let key_definition = included_items
2503        .iter()
2504        .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2505    if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2506        match (alias, encoding.and_then(|e| e.key.as_ref())) {
2507            (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2508            (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2509            (Some(name), _) if key_envelope_no_encoding => {
2510                Ok(KeyEnvelope::Named(name.as_str().to_string()))
2511            }
2512            (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2513            (_, None) => {
2514                // `kd.alias` == `None` means `INCLUDE KEY`
2515                // `kd.alias` == `Some(_) means INCLUDE KEY AS ___`
2516                // These both make sense with the same error message
2517                sql_bail!(
2518                    "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2519                        got bare FORMAT"
2520                );
2521            }
2522        }
2523    } else {
2524        Ok(KeyEnvelope::None)
2525    }
2526}
2527
2528/// Gets the key envelope for a given key encoding when no name for the key has
2529/// been requested by the user.
2530fn get_unnamed_key_envelope(
2531    key: Option<&DataEncoding<ReferencedConnection>>,
2532) -> Result<KeyEnvelope, PlanError> {
2533    // If the key is requested but comes from an unnamed type then it gets the name "key"
2534    //
2535    // Otherwise it gets the names of the columns in the type
2536    let is_composite = match key {
2537        Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2538        Some(
2539            DataEncoding::Avro(_)
2540            | DataEncoding::Csv(_)
2541            | DataEncoding::Protobuf(_)
2542            | DataEncoding::Regex { .. },
2543        ) => true,
2544        None => false,
2545    };
2546
2547    if is_composite {
2548        Ok(KeyEnvelope::Flattened)
2549    } else {
2550        Ok(KeyEnvelope::Named("key".to_string()))
2551    }
2552}
2553
2554pub fn describe_create_view(
2555    _: &StatementContext,
2556    _: CreateViewStatement<Aug>,
2557) -> Result<StatementDesc, PlanError> {
2558    Ok(StatementDesc::new(None))
2559}
2560
2561pub fn plan_view(
2562    scx: &StatementContext,
2563    def: &mut ViewDefinition<Aug>,
2564    temporary: bool,
2565) -> Result<(QualifiedItemName, View), PlanError> {
2566    let create_sql = normalize::create_statement(
2567        scx,
2568        Statement::CreateView(CreateViewStatement {
2569            if_exists: IfExistsBehavior::Error,
2570            temporary,
2571            definition: def.clone(),
2572        }),
2573    )?;
2574
2575    let ViewDefinition {
2576        name,
2577        columns,
2578        query,
2579    } = def;
2580
2581    let query::PlannedRootQuery {
2582        expr,
2583        mut desc,
2584        finishing,
2585        scope: _,
2586    } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2587    // We get back a trivial finishing, because `plan_root_query` applies the given finishing.
2588    // Note: Earlier, we were thinking to maybe persist the finishing information with the view
2589    // here to help with database-issues#236. However, in the meantime, there might be a better
2590    // approach to solve database-issues#236:
2591    // https://github.com/MaterializeInc/database-issues/issues/236#issuecomment-1688293709
2592    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2593        &finishing,
2594        expr.arity()
2595    ));
2596    if expr.contains_parameters()? {
2597        return Err(PlanError::ParameterNotAllowed("views".to_string()));
2598    }
2599
2600    let dependencies = expr
2601        .depends_on()
2602        .into_iter()
2603        .map(|gid| scx.catalog.resolve_item_id(&gid))
2604        .collect();
2605
2606    let name = if temporary {
2607        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2608    } else {
2609        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2610    };
2611
2612    plan_utils::maybe_rename_columns(
2613        format!("view {}", scx.catalog.resolve_full_name(&name)),
2614        &mut desc,
2615        columns,
2616    )?;
2617    let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2618
2619    if let Some(dup) = names.iter().duplicates().next() {
2620        sql_bail!("column {} specified more than once", dup.quoted());
2621    }
2622
2623    let view = View {
2624        create_sql,
2625        expr,
2626        dependencies,
2627        column_names: names,
2628        temporary,
2629    };
2630
2631    Ok((name, view))
2632}
2633
2634pub fn plan_create_view(
2635    scx: &StatementContext,
2636    mut stmt: CreateViewStatement<Aug>,
2637) -> Result<Plan, PlanError> {
2638    let CreateViewStatement {
2639        temporary,
2640        if_exists,
2641        definition,
2642    } = &mut stmt;
2643    let (name, view) = plan_view(scx, definition, *temporary)?;
2644
2645    // Override the statement-level IfExistsBehavior with Skip if this is
2646    // explicitly requested in the PlanContext (the default is `false`).
2647    let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2648
2649    let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2650        let if_exists = true;
2651        let cascade = false;
2652        let maybe_item_to_drop = plan_drop_item(
2653            scx,
2654            ObjectType::View,
2655            if_exists,
2656            definition.name.clone(),
2657            cascade,
2658        )?;
2659
2660        // Check if the new View depends on the item that we would be replacing.
2661        if let Some(id) = maybe_item_to_drop {
2662            let dependencies = view.expr.depends_on();
2663            let invalid_drop = scx
2664                .get_item(&id)
2665                .global_ids()
2666                .any(|gid| dependencies.contains(&gid));
2667            if invalid_drop {
2668                let item = scx.catalog.get_item(&id);
2669                sql_bail!(
2670                    "cannot replace view {0}: depended upon by new {0} definition",
2671                    scx.catalog.resolve_full_name(item.name())
2672                );
2673            }
2674
2675            Some(id)
2676        } else {
2677            None
2678        }
2679    } else {
2680        None
2681    };
2682    let drop_ids = replace
2683        .map(|id| {
2684            scx.catalog
2685                .item_dependents(id)
2686                .into_iter()
2687                .map(|id| id.unwrap_item_id())
2688                .collect()
2689        })
2690        .unwrap_or_default();
2691
2692    validate_view_dependencies(scx, &view.dependencies.0)?;
2693
2694    // Check for an object in the catalog with this same name
2695    let full_name = scx.catalog.resolve_full_name(&name);
2696    let partial_name = PartialItemName::from(full_name.clone());
2697    // For PostgreSQL compatibility, we need to prevent creating views when
2698    // there is an existing object *or* type of the same name.
2699    if let (Ok(item), IfExistsBehavior::Error, false) = (
2700        scx.catalog.resolve_item_or_type(&partial_name),
2701        *if_exists,
2702        ignore_if_exists_errors,
2703    ) {
2704        return Err(PlanError::ItemAlreadyExists {
2705            name: full_name.to_string(),
2706            item_type: item.item_type(),
2707        });
2708    }
2709
2710    Ok(Plan::CreateView(CreateViewPlan {
2711        name,
2712        view,
2713        replace,
2714        drop_ids,
2715        if_not_exists: *if_exists == IfExistsBehavior::Skip,
2716        ambiguous_columns: *scx.ambiguous_columns.borrow(),
2717    }))
2718}
2719
2720/// Validate the dependencies of a (materialized) view.
2721fn validate_view_dependencies(
2722    scx: &StatementContext,
2723    dependencies: &BTreeSet<CatalogItemId>,
2724) -> Result<(), PlanError> {
2725    for id in dependencies {
2726        let item = scx.catalog.get_item(id);
2727        if item.replacement_target().is_some() {
2728            let name = scx.catalog.minimal_qualification(item.name());
2729            return Err(PlanError::InvalidDependency {
2730                name: name.to_string(),
2731                item_type: format!("replacement {}", item.item_type()),
2732            });
2733        }
2734    }
2735
2736    Ok(())
2737}
2738
2739pub fn describe_create_materialized_view(
2740    _: &StatementContext,
2741    _: CreateMaterializedViewStatement<Aug>,
2742) -> Result<StatementDesc, PlanError> {
2743    Ok(StatementDesc::new(None))
2744}
2745
2746pub fn describe_create_network_policy(
2747    _: &StatementContext,
2748    _: CreateNetworkPolicyStatement<Aug>,
2749) -> Result<StatementDesc, PlanError> {
2750    Ok(StatementDesc::new(None))
2751}
2752
2753pub fn describe_alter_network_policy(
2754    _: &StatementContext,
2755    _: AlterNetworkPolicyStatement<Aug>,
2756) -> Result<StatementDesc, PlanError> {
2757    Ok(StatementDesc::new(None))
2758}
2759
2760pub fn plan_create_materialized_view(
2761    scx: &StatementContext,
2762    mut stmt: CreateMaterializedViewStatement<Aug>,
2763) -> Result<Plan, PlanError> {
2764    let cluster_id =
2765        crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2766    stmt.in_cluster = Some(ResolvedClusterName {
2767        id: cluster_id,
2768        print_name: None,
2769    });
2770
2771    let target_replica = match &stmt.in_cluster_replica {
2772        Some(replica_name) => {
2773            scx.require_feature_flag(&ENABLE_REPLICA_TARGETED_MATERIALIZED_VIEWS)?;
2774
2775            let cluster = scx.catalog.get_cluster(cluster_id);
2776            let replica_id = cluster
2777                .replica_ids()
2778                .get(replica_name.as_str())
2779                .copied()
2780                .ok_or_else(|| {
2781                    CatalogError::UnknownClusterReplica(replica_name.as_str().to_string())
2782                })?;
2783            Some(replica_id)
2784        }
2785        None => None,
2786    };
2787
2788    let create_sql =
2789        normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2790
2791    let partial_name = normalize::unresolved_item_name(stmt.name)?;
2792    let name = scx.allocate_qualified_name(partial_name.clone())?;
2793
2794    let query::PlannedRootQuery {
2795        expr,
2796        mut desc,
2797        finishing,
2798        scope: _,
2799    } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2800    // We get back a trivial finishing, see comment in `plan_view`.
2801    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2802        &finishing,
2803        expr.arity()
2804    ));
2805    if expr.contains_parameters()? {
2806        return Err(PlanError::ParameterNotAllowed(
2807            "materialized views".to_string(),
2808        ));
2809    }
2810
2811    plan_utils::maybe_rename_columns(
2812        format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2813        &mut desc,
2814        &stmt.columns,
2815    )?;
2816    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2817
2818    let MaterializedViewOptionExtracted {
2819        assert_not_null,
2820        partition_by,
2821        retain_history,
2822        refresh,
2823        seen: _,
2824    }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2825
2826    if let Some(partition_by) = partition_by {
2827        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2828        check_partition_by(&desc, partition_by)?;
2829    }
2830
2831    let refresh_schedule = {
2832        let mut refresh_schedule = RefreshSchedule::default();
2833        let mut on_commits_seen = 0;
2834        for refresh_option_value in refresh {
2835            if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2836                scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2837            }
2838            match refresh_option_value {
2839                RefreshOptionValue::OnCommit => {
2840                    on_commits_seen += 1;
2841                }
2842                RefreshOptionValue::AtCreation => {
2843                    soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2844                    sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2845                }
2846                RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2847                    transform_ast::transform(scx, &mut time)?; // Desugar the expression
2848                    let ecx = &ExprContext {
2849                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2850                        name: "REFRESH AT",
2851                        scope: &Scope::empty(),
2852                        relation_type: &SqlRelationType::empty(),
2853                        allow_aggregates: false,
2854                        allow_subqueries: false,
2855                        allow_parameters: false,
2856                        allow_windows: false,
2857                    };
2858                    let hir = plan_expr(ecx, &time)?.cast_to(
2859                        ecx,
2860                        CastContext::Assignment,
2861                        &SqlScalarType::MzTimestamp,
2862                    )?;
2863                    // (mz_now was purified away to a literal earlier)
2864                    let timestamp = hir
2865                        .into_literal_mz_timestamp()
2866                        .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2867                    refresh_schedule.ats.push(timestamp);
2868                }
2869                RefreshOptionValue::Every(RefreshEveryOptionValue {
2870                    interval,
2871                    aligned_to,
2872                }) => {
2873                    let interval = Interval::try_from_value(Value::Interval(interval))?;
2874                    if interval.as_microseconds() <= 0 {
2875                        sql_bail!("REFRESH interval must be positive; got: {}", interval);
2876                    }
2877                    if interval.months != 0 {
2878                        // This limitation is because we want Intervals to be cleanly convertable
2879                        // to a unix epoch timestamp difference. When the interval involves months, then
2880                        // this is not true anymore, because months have variable lengths.
2881                        // See `Timestamp::round_up`.
2882                        sql_bail!("REFRESH interval must not involve units larger than days");
2883                    }
2884                    let interval = interval.duration()?;
2885                    if u64::try_from(interval.as_millis()).is_err() {
2886                        sql_bail!("REFRESH interval too large");
2887                    }
2888
2889                    let mut aligned_to = match aligned_to {
2890                        Some(aligned_to) => aligned_to,
2891                        None => {
2892                            soft_panic_or_log!(
2893                                "ALIGNED TO should have been filled in by purification"
2894                            );
2895                            sql_bail!(
2896                                "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2897                            )
2898                        }
2899                    };
2900
2901                    // Desugar the `aligned_to` expression
2902                    transform_ast::transform(scx, &mut aligned_to)?;
2903
2904                    let ecx = &ExprContext {
2905                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2906                        name: "REFRESH EVERY ... ALIGNED TO",
2907                        scope: &Scope::empty(),
2908                        relation_type: &SqlRelationType::empty(),
2909                        allow_aggregates: false,
2910                        allow_subqueries: false,
2911                        allow_parameters: false,
2912                        allow_windows: false,
2913                    };
2914                    let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2915                        ecx,
2916                        CastContext::Assignment,
2917                        &SqlScalarType::MzTimestamp,
2918                    )?;
2919                    // (mz_now was purified away to a literal earlier)
2920                    let aligned_to_const = aligned_to_hir
2921                        .into_literal_mz_timestamp()
2922                        .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2923
2924                    refresh_schedule.everies.push(RefreshEvery {
2925                        interval,
2926                        aligned_to: aligned_to_const,
2927                    });
2928                }
2929            }
2930        }
2931
2932        if on_commits_seen > 1 {
2933            sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2934        }
2935        if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2936            sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2937        }
2938
2939        if refresh_schedule == RefreshSchedule::default() {
2940            None
2941        } else {
2942            Some(refresh_schedule)
2943        }
2944    };
2945
2946    let as_of = stmt.as_of.map(Timestamp::from);
2947    let compaction_window = plan_retain_history_option(scx, retain_history)?;
2948    let mut non_null_assertions = assert_not_null
2949        .into_iter()
2950        .map(normalize::column_name)
2951        .map(|assertion_name| {
2952            column_names
2953                .iter()
2954                .position(|col| col == &assertion_name)
2955                .ok_or_else(|| {
2956                    sql_err!(
2957                        "column {} in ASSERT NOT NULL option not found",
2958                        assertion_name.quoted()
2959                    )
2960                })
2961        })
2962        .collect::<Result<Vec<_>, _>>()?;
2963    non_null_assertions.sort();
2964    if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2965        let dup = &column_names[*dup];
2966        sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2967    }
2968
2969    if let Some(dup) = column_names.iter().duplicates().next() {
2970        sql_bail!("column {} specified more than once", dup.quoted());
2971    }
2972
2973    // Override the statement-level IfExistsBehavior with Skip if this is
2974    // explicitly requested in the PlanContext (the default is `false`).
2975    let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2976        Ok(true) => IfExistsBehavior::Skip,
2977        _ => stmt.if_exists,
2978    };
2979
2980    let mut replace = None;
2981    let mut if_not_exists = false;
2982    match if_exists {
2983        IfExistsBehavior::Replace => {
2984            let if_exists = true;
2985            let cascade = false;
2986            let replace_id = plan_drop_item(
2987                scx,
2988                ObjectType::MaterializedView,
2989                if_exists,
2990                partial_name.clone().into(),
2991                cascade,
2992            )?;
2993
2994            // Check if the new Materialized View depends on the item that we would be replacing.
2995            if let Some(id) = replace_id {
2996                let dependencies = expr.depends_on();
2997                let invalid_drop = scx
2998                    .get_item(&id)
2999                    .global_ids()
3000                    .any(|gid| dependencies.contains(&gid));
3001                if invalid_drop {
3002                    let item = scx.catalog.get_item(&id);
3003                    sql_bail!(
3004                        "cannot replace materialized view {0}: depended upon by new {0} definition",
3005                        scx.catalog.resolve_full_name(item.name())
3006                    );
3007                }
3008                replace = Some(id);
3009            }
3010        }
3011        IfExistsBehavior::Skip => if_not_exists = true,
3012        IfExistsBehavior::Error => (),
3013    }
3014    let drop_ids = replace
3015        .map(|id| {
3016            scx.catalog
3017                .item_dependents(id)
3018                .into_iter()
3019                .map(|id| id.unwrap_item_id())
3020                .collect()
3021        })
3022        .unwrap_or_default();
3023    let mut dependencies: BTreeSet<_> = expr
3024        .depends_on()
3025        .into_iter()
3026        .map(|gid| scx.catalog.resolve_item_id(&gid))
3027        .collect();
3028
3029    // Validate the replacement target, if one is given.
3030    let mut replacement_target = None;
3031    if let Some(target_name) = &stmt.replacement_for {
3032        scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
3033
3034        let target = scx.get_item_by_resolved_name(target_name)?;
3035        if target.item_type() != CatalogItemType::MaterializedView {
3036            return Err(PlanError::InvalidReplacement {
3037                item_type: target.item_type(),
3038                item_name: scx.catalog.minimal_qualification(target.name()),
3039                replacement_type: CatalogItemType::MaterializedView,
3040                replacement_name: partial_name,
3041            });
3042        }
3043        if target.id().is_system() {
3044            sql_bail!(
3045                "cannot replace {} because it is required by the database system",
3046                scx.catalog.minimal_qualification(target.name()),
3047            );
3048        }
3049
3050        // Check for dependency cycles.
3051        for dependent in scx.catalog.item_dependents(target.id()) {
3052            if let ObjectId::Item(id) = dependent
3053                && dependencies.contains(&id)
3054            {
3055                sql_bail!(
3056                    "replacement would cause {} to depend on itself",
3057                    scx.catalog.minimal_qualification(target.name()),
3058                );
3059            }
3060        }
3061
3062        dependencies.insert(target.id());
3063
3064        for use_id in target.used_by() {
3065            let use_item = scx.get_item(use_id);
3066            if use_item.replacement_target() == Some(target.id()) {
3067                sql_bail!(
3068                    "cannot replace {} because it already has a replacement: {}",
3069                    scx.catalog.minimal_qualification(target.name()),
3070                    scx.catalog.minimal_qualification(use_item.name()),
3071                );
3072            }
3073        }
3074
3075        replacement_target = Some(target.id());
3076    }
3077
3078    validate_view_dependencies(scx, &dependencies)?;
3079
3080    // Check for an object in the catalog with this same name
3081    let full_name = scx.catalog.resolve_full_name(&name);
3082    let partial_name = PartialItemName::from(full_name.clone());
3083    // For PostgreSQL compatibility, we need to prevent creating materialized
3084    // views when there is an existing object *or* type of the same name.
3085    if let (IfExistsBehavior::Error, Ok(item)) =
3086        (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
3087    {
3088        return Err(PlanError::ItemAlreadyExists {
3089            name: full_name.to_string(),
3090            item_type: item.item_type(),
3091        });
3092    }
3093
3094    Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
3095        name,
3096        materialized_view: MaterializedView {
3097            create_sql,
3098            expr,
3099            dependencies: DependencyIds(dependencies),
3100            column_names,
3101            replacement_target,
3102            cluster_id,
3103            target_replica,
3104            non_null_assertions,
3105            compaction_window,
3106            refresh_schedule,
3107            as_of,
3108        },
3109        replace,
3110        drop_ids,
3111        if_not_exists,
3112        ambiguous_columns: *scx.ambiguous_columns.borrow(),
3113    }))
3114}
3115
3116generate_extracted_config!(
3117    MaterializedViewOption,
3118    (AssertNotNull, Ident, AllowMultiple),
3119    (PartitionBy, Vec<Ident>),
3120    (RetainHistory, OptionalDuration),
3121    (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
3122);
3123
3124pub fn describe_create_sink(
3125    _: &StatementContext,
3126    _: CreateSinkStatement<Aug>,
3127) -> Result<StatementDesc, PlanError> {
3128    Ok(StatementDesc::new(None))
3129}
3130
3131generate_extracted_config!(
3132    CreateSinkOption,
3133    (Snapshot, bool),
3134    (PartitionStrategy, String),
3135    (Version, u64),
3136    (CommitInterval, Duration)
3137);
3138
3139pub fn plan_create_sink(
3140    scx: &StatementContext,
3141    stmt: CreateSinkStatement<Aug>,
3142) -> Result<Plan, PlanError> {
3143    // Check for an object in the catalog with this same name
3144    let Some(name) = stmt.name.clone() else {
3145        return Err(PlanError::MissingName(CatalogItemType::Sink));
3146    };
3147    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3148    let full_name = scx.catalog.resolve_full_name(&name);
3149    let partial_name = PartialItemName::from(full_name.clone());
3150    if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3151        return Err(PlanError::ItemAlreadyExists {
3152            name: full_name.to_string(),
3153            item_type: item.item_type(),
3154        });
3155    }
3156
3157    plan_sink(scx, stmt)
3158}
3159
3160/// This function will plan a sink as if it does not exist in the catalog. This is so the planning
3161/// logic is reused by both CREATE SINK and ALTER SINK planning. It is the responsibility of the
3162/// callers (plan_create_sink and plan_alter_sink) to check for name collisions if this is
3163/// important.
3164fn plan_sink(
3165    scx: &StatementContext,
3166    mut stmt: CreateSinkStatement<Aug>,
3167) -> Result<Plan, PlanError> {
3168    let CreateSinkStatement {
3169        name,
3170        in_cluster: _,
3171        from,
3172        connection,
3173        format,
3174        envelope,
3175        mode,
3176        if_not_exists,
3177        with_options,
3178    } = stmt.clone();
3179
3180    let Some(name) = name else {
3181        return Err(PlanError::MissingName(CatalogItemType::Sink));
3182    };
3183    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3184
3185    let envelope = match (&connection, envelope, mode) {
3186        // Kafka sinks use ENVELOPE
3187        (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Upsert), None) => {
3188            SinkEnvelope::Upsert
3189        }
3190        (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Debezium), None) => {
3191            SinkEnvelope::Debezium
3192        }
3193        (CreateSinkConnection::Kafka { .. }, None, None) => {
3194            sql_bail!("ENVELOPE clause is required")
3195        }
3196        (CreateSinkConnection::Kafka { .. }, _, Some(_)) => {
3197            sql_bail!("MODE is not supported for Kafka sinks, use ENVELOPE instead")
3198        }
3199        // Iceberg sinks use MODE
3200        (CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Upsert)) => {
3201            SinkEnvelope::Upsert
3202        }
3203        (CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Append)) => {
3204            SinkEnvelope::Append
3205        }
3206        (CreateSinkConnection::Iceberg { .. }, None, None) => {
3207            sql_bail!("MODE clause is required")
3208        }
3209        (CreateSinkConnection::Iceberg { .. }, Some(_), _) => {
3210            sql_bail!("ENVELOPE is not supported for Iceberg sinks, use MODE instead")
3211        }
3212    };
3213
3214    let from_name = &from;
3215    let from = scx.get_item_by_resolved_name(&from)?;
3216
3217    {
3218        use CatalogItemType::*;
3219        match from.item_type() {
3220            Table | Source | MaterializedView => {
3221                if from.replacement_target().is_some() {
3222                    let name = scx.catalog.minimal_qualification(from.name());
3223                    return Err(PlanError::InvalidSinkFrom {
3224                        name: name.to_string(),
3225                        item_type: format!("replacement {}", from.item_type()),
3226                    });
3227                }
3228            }
3229            Sink | View | Index | Type | Func | Secret | Connection => {
3230                let name = scx.catalog.minimal_qualification(from.name());
3231                return Err(PlanError::InvalidSinkFrom {
3232                    name: name.to_string(),
3233                    item_type: from.item_type().to_string(),
3234                });
3235            }
3236        }
3237    }
3238
3239    if from.id().is_system() {
3240        bail_unsupported!("creating a sink directly on a catalog object");
3241    }
3242
3243    let desc = from
3244        .relation_desc()
3245        .ok_or_else(|| sql_err!("item does not have a relation description"))?;
3246    let key_indices = match &connection {
3247        CreateSinkConnection::Kafka { key: Some(key), .. }
3248        | CreateSinkConnection::Iceberg { key: Some(key), .. } => {
3249            let key_columns = key
3250                .key_columns
3251                .clone()
3252                .into_iter()
3253                .map(normalize::column_name)
3254                .collect::<Vec<_>>();
3255            let mut uniq = BTreeSet::new();
3256            for col in key_columns.iter() {
3257                if !uniq.insert(col) {
3258                    sql_bail!("duplicate column referenced in KEY: {}", col);
3259                }
3260            }
3261            let indices = key_columns
3262                .iter()
3263                .map(|col| -> anyhow::Result<usize> {
3264                    let name_idx =
3265                        desc.get_by_name(col)
3266                            .map(|(idx, _type)| idx)
3267                            .ok_or_else(|| {
3268                                sql_err!("column referenced in KEY does not exist: {}", col)
3269                            })?;
3270                    if desc.get_unambiguous_name(name_idx).is_none() {
3271                        sql_err!("column referenced in KEY is ambiguous: {}", col);
3272                    }
3273                    Ok(name_idx)
3274                })
3275                .collect::<Result<Vec<_>, _>>()?;
3276
3277            // Iceberg equality deletes require primitive, non-float key columns.
3278            // Use an allow-list so that new types are rejected by default.
3279            if matches!(&connection, CreateSinkConnection::Iceberg { .. }) {
3280                let cols: Vec<_> = desc.iter().collect();
3281                for &idx in &indices {
3282                    let (col_name, col_type) = cols[idx];
3283                    let scalar = &col_type.scalar_type;
3284                    let is_valid = matches!(
3285                        scalar,
3286                        // integers
3287                        SqlScalarType::Bool
3288                            | SqlScalarType::Int16
3289                            | SqlScalarType::Int32
3290                            | SqlScalarType::Int64
3291                            | SqlScalarType::UInt16
3292                            | SqlScalarType::UInt32
3293                            | SqlScalarType::UInt64
3294                            // decimal / numeric
3295                            | SqlScalarType::Numeric { .. }
3296                            // date / time
3297                            | SqlScalarType::Date
3298                            | SqlScalarType::Time
3299                            | SqlScalarType::Timestamp { .. }
3300                            | SqlScalarType::TimestampTz { .. }
3301                            | SqlScalarType::Interval
3302                            | SqlScalarType::MzTimestamp
3303                            // string-like
3304                            | SqlScalarType::String
3305                            | SqlScalarType::Char { .. }
3306                            | SqlScalarType::VarChar { .. }
3307                            | SqlScalarType::PgLegacyChar
3308                            | SqlScalarType::PgLegacyName
3309                            | SqlScalarType::Bytes
3310                            | SqlScalarType::Jsonb
3311                            // identifiers
3312                            | SqlScalarType::Uuid
3313                            | SqlScalarType::Oid
3314                            | SqlScalarType::RegProc
3315                            | SqlScalarType::RegType
3316                            | SqlScalarType::RegClass
3317                            | SqlScalarType::MzAclItem
3318                            | SqlScalarType::AclItem
3319                            | SqlScalarType::Int2Vector
3320                            // ranges
3321                            | SqlScalarType::Range { .. }
3322                    );
3323                    if !is_valid {
3324                        return Err(PlanError::IcebergSinkUnsupportedKeyType {
3325                            column: col_name.to_string(),
3326                            column_type: format!("{:?}", scalar),
3327                        });
3328                    }
3329                }
3330            }
3331
3332            let is_valid_key = desc
3333                .typ()
3334                .keys
3335                .iter()
3336                .any(|key_columns| key_columns.iter().all(|column| indices.contains(column)));
3337
3338            if !is_valid_key && envelope == SinkEnvelope::Upsert {
3339                if key.not_enforced {
3340                    scx.catalog
3341                        .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3342                            key: key_columns.clone(),
3343                            name: name.item.clone(),
3344                        })
3345                } else {
3346                    return Err(PlanError::UpsertSinkWithInvalidKey {
3347                        name: from_name.full_name_str(),
3348                        desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3349                        valid_keys: desc
3350                            .typ()
3351                            .keys
3352                            .iter()
3353                            .map(|key| {
3354                                key.iter()
3355                                    .map(|col| desc.get_name(*col).as_str().into())
3356                                    .collect()
3357                            })
3358                            .collect(),
3359                    });
3360                }
3361            }
3362            Some(indices)
3363        }
3364        CreateSinkConnection::Kafka { key: None, .. }
3365        | CreateSinkConnection::Iceberg { key: None, .. } => None,
3366    };
3367
3368    if key_indices.is_some() && envelope == SinkEnvelope::Append {
3369        sql_bail!("KEY is not supported for MODE APPEND Iceberg sinks");
3370    }
3371
3372    // Reject input columns that clash with the columns MODE APPEND adds to the Iceberg table.
3373    if envelope == SinkEnvelope::Append {
3374        if let CreateSinkConnection::Iceberg { .. } = &connection {
3375            use mz_storage_types::sinks::{
3376                ICEBERG_APPEND_DIFF_COLUMN, ICEBERG_APPEND_TIMESTAMP_COLUMN,
3377            };
3378            for (col_name, _) in desc.iter() {
3379                if col_name.as_str() == ICEBERG_APPEND_DIFF_COLUMN
3380                    || col_name.as_str() == ICEBERG_APPEND_TIMESTAMP_COLUMN
3381                {
3382                    sql_bail!(
3383                        "column {} conflicts with the system column that MODE APPEND \
3384                         adds to the Iceberg table",
3385                        col_name.quoted()
3386                    );
3387                }
3388            }
3389        }
3390    }
3391
3392    let headers_index = match &connection {
3393        CreateSinkConnection::Kafka {
3394            headers: Some(headers),
3395            ..
3396        } => {
3397            scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3398
3399            match envelope {
3400                SinkEnvelope::Upsert | SinkEnvelope::Append => (),
3401                SinkEnvelope::Debezium => {
3402                    sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3403                }
3404            };
3405
3406            let headers = normalize::column_name(headers.clone());
3407            let (idx, ty) = desc
3408                .get_by_name(&headers)
3409                .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3410
3411            if desc.get_unambiguous_name(idx).is_none() {
3412                sql_bail!("HEADERS column ({}) is ambiguous", headers);
3413            }
3414
3415            match &ty.scalar_type {
3416                SqlScalarType::Map { value_type, .. }
3417                    if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3418                _ => sql_bail!(
3419                    "HEADERS column must have type map[text => text] or map[text => bytea]"
3420                ),
3421            }
3422
3423            Some(idx)
3424        }
3425        _ => None,
3426    };
3427
3428    // pick the first valid natural relation key, if any
3429    let relation_key_indices = desc.typ().keys.get(0).cloned();
3430
3431    let key_desc_and_indices = key_indices.map(|key_indices| {
3432        let cols = desc
3433            .iter()
3434            .map(|(name, ty)| (name.clone(), ty.clone()))
3435            .collect::<Vec<_>>();
3436        let (names, types): (Vec<_>, Vec<_>) =
3437            key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3438        let typ = SqlRelationType::new(types);
3439        (RelationDesc::new(typ, names), key_indices)
3440    });
3441
3442    if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3443        return Err(PlanError::UpsertSinkWithoutKey);
3444    }
3445
3446    let CreateSinkOptionExtracted {
3447        snapshot,
3448        version,
3449        partition_strategy: _,
3450        seen: _,
3451        commit_interval,
3452    } = with_options.try_into()?;
3453
3454    let connection_builder = match connection {
3455        CreateSinkConnection::Kafka {
3456            connection,
3457            options,
3458            ..
3459        } => kafka_sink_builder(
3460            scx,
3461            connection,
3462            options,
3463            format,
3464            relation_key_indices,
3465            key_desc_and_indices,
3466            headers_index,
3467            desc.into_owned(),
3468            envelope,
3469            from.id(),
3470            commit_interval,
3471        )?,
3472        CreateSinkConnection::Iceberg {
3473            connection,
3474            aws_connection,
3475            options,
3476            ..
3477        } => iceberg_sink_builder(
3478            scx,
3479            connection,
3480            aws_connection,
3481            options,
3482            relation_key_indices,
3483            key_desc_and_indices,
3484            commit_interval,
3485            &desc,
3486        )?,
3487    };
3488
3489    // WITH SNAPSHOT defaults to true
3490    let with_snapshot = snapshot.unwrap_or(true);
3491    // VERSION defaults to 0
3492    let version = version.unwrap_or(0);
3493
3494    // We will rewrite the cluster if one is not provided, so we must use the
3495    // `in_cluster` value we plan to normalize when we canonicalize the create
3496    // statement.
3497    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3498    let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3499
3500    Ok(Plan::CreateSink(CreateSinkPlan {
3501        name,
3502        sink: Sink {
3503            create_sql,
3504            from: from.global_id(),
3505            connection: connection_builder,
3506            envelope,
3507            version,
3508            commit_interval,
3509        },
3510        with_snapshot,
3511        if_not_exists,
3512        in_cluster: in_cluster.id(),
3513    }))
3514}
3515
3516fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3517    let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3518
3519    let existing_keys = desc
3520        .typ()
3521        .keys
3522        .iter()
3523        .map(|key_columns| {
3524            key_columns
3525                .iter()
3526                .map(|col| desc.get_name(*col).as_str())
3527                .join(", ")
3528        })
3529        .join(", ");
3530
3531    sql_err!(
3532        "Key constraint ({}) conflicts with existing key ({})",
3533        user_keys,
3534        existing_keys
3535    )
3536}
3537
3538/// Creating this by hand instead of using generate_extracted_config! macro
3539/// because the macro doesn't support parameterized enums. See <https://github.com/MaterializeInc/database-issues/issues/6698>
3540#[derive(Debug, Default, PartialEq, Clone)]
3541pub struct CsrConfigOptionExtracted {
3542    seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3543    pub(crate) avro_key_fullname: Option<String>,
3544    pub(crate) avro_value_fullname: Option<String>,
3545    pub(crate) null_defaults: bool,
3546    pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3547    pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3548    pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3549    pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3550}
3551
3552impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3553    type Error = crate::plan::PlanError;
3554    fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3555        let mut extracted = CsrConfigOptionExtracted::default();
3556        let mut common_doc_comments = BTreeMap::new();
3557        for option in v {
3558            if !extracted.seen.insert(option.name.clone()) {
3559                return Err(PlanError::Unstructured({
3560                    format!("{} specified more than once", option.name)
3561                }));
3562            }
3563            let option_name = option.name.clone();
3564            let option_name_str = option_name.to_ast_string_simple();
3565            let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3566                option_name: option_name.to_ast_string_simple(),
3567                err: e.into(),
3568            };
3569            let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3570                val.map(|s| match s {
3571                    WithOptionValue::Value(Value::String(s)) => {
3572                        mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3573                    }
3574                    _ => Err("must be a string".to_string()),
3575                })
3576                .transpose()
3577                .map_err(PlanError::Unstructured)
3578                .map_err(better_error)
3579            };
3580            match option.name {
3581                CsrConfigOptionName::AvroKeyFullname => {
3582                    extracted.avro_key_fullname =
3583                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3584                }
3585                CsrConfigOptionName::AvroValueFullname => {
3586                    extracted.avro_value_fullname =
3587                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3588                }
3589                CsrConfigOptionName::NullDefaults => {
3590                    extracted.null_defaults =
3591                        <bool>::try_from_value(option.value).map_err(better_error)?;
3592                }
3593                CsrConfigOptionName::AvroDocOn(doc_on) => {
3594                    let value = String::try_from_value(option.value.ok_or_else(|| {
3595                        PlanError::InvalidOptionValue {
3596                            option_name: option_name_str,
3597                            err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3598                        }
3599                    })?)
3600                    .map_err(better_error)?;
3601                    let key = match doc_on.identifier {
3602                        DocOnIdentifier::Column(ast::ColumnName {
3603                            relation: ResolvedItemName::Item { id, .. },
3604                            column: ResolvedColumnReference::Column { name, index: _ },
3605                        }) => DocTarget::Field {
3606                            object_id: id,
3607                            column_name: name,
3608                        },
3609                        DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3610                            DocTarget::Type(id)
3611                        }
3612                        _ => sql_bail!("invalid DOC ON identifier"),
3613                    };
3614
3615                    match doc_on.for_schema {
3616                        DocOnSchema::KeyOnly => {
3617                            extracted.key_doc_options.insert(key, value);
3618                        }
3619                        DocOnSchema::ValueOnly => {
3620                            extracted.value_doc_options.insert(key, value);
3621                        }
3622                        DocOnSchema::All => {
3623                            common_doc_comments.insert(key, value);
3624                        }
3625                    }
3626                }
3627                CsrConfigOptionName::KeyCompatibilityLevel => {
3628                    extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3629                }
3630                CsrConfigOptionName::ValueCompatibilityLevel => {
3631                    extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3632                }
3633            }
3634        }
3635
3636        for (key, value) in common_doc_comments {
3637            if !extracted.key_doc_options.contains_key(&key) {
3638                extracted.key_doc_options.insert(key.clone(), value.clone());
3639            }
3640            if !extracted.value_doc_options.contains_key(&key) {
3641                extracted.value_doc_options.insert(key, value);
3642            }
3643        }
3644        Ok(extracted)
3645    }
3646}
3647
3648fn iceberg_sink_builder(
3649    scx: &StatementContext,
3650    catalog_connection: ResolvedItemName,
3651    aws_connection: ResolvedItemName,
3652    options: Vec<IcebergSinkConfigOption<Aug>>,
3653    relation_key_indices: Option<Vec<usize>>,
3654    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3655    commit_interval: Option<Duration>,
3656    desc: &RelationDesc,
3657) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3658    scx.require_feature_flag(&vars::ENABLE_ICEBERG_SINK)?;
3659
3660    // Reject types that arrow-rs's parquet writer cannot handle, before
3661    // sink creation. Pass the iceberg overrides so types iceberg remaps
3662    // (e.g. interval -> string) don't trip the check.
3663    ArrowBuilder::validate_desc_for_parquet(desc, iceberg_type_overrides)
3664        .map_err(|e| sql_err!("{}", e))?;
3665    let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
3666    let catalog_connection_id = catalog_connection_item.id();
3667    let aws_connection_item = scx.get_item_by_resolved_name(&aws_connection)?;
3668    let aws_connection_id = aws_connection_item.id();
3669    if !matches!(
3670        catalog_connection_item.connection()?,
3671        Connection::IcebergCatalog(_)
3672    ) {
3673        sql_bail!(
3674            "{} is not an iceberg catalog connection",
3675            scx.catalog
3676                .resolve_full_name(catalog_connection_item.name())
3677                .to_string()
3678                .quoted()
3679        );
3680    };
3681
3682    if !matches!(aws_connection_item.connection()?, Connection::Aws(_)) {
3683        sql_bail!(
3684            "{} is not an AWS connection",
3685            scx.catalog
3686                .resolve_full_name(aws_connection_item.name())
3687                .to_string()
3688                .quoted()
3689        );
3690    }
3691
3692    let IcebergSinkConfigOptionExtracted {
3693        table,
3694        namespace,
3695        seen: _,
3696    }: IcebergSinkConfigOptionExtracted = options.try_into()?;
3697
3698    let Some(table) = table else {
3699        sql_bail!("Iceberg sink must specify TABLE");
3700    };
3701    let Some(namespace) = namespace else {
3702        sql_bail!("Iceberg sink must specify NAMESPACE");
3703    };
3704    if commit_interval.is_none() {
3705        sql_bail!("Iceberg sink must specify COMMIT INTERVAL");
3706    }
3707
3708    Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
3709        catalog_connection_id,
3710        catalog_connection: catalog_connection_id,
3711        aws_connection_id,
3712        aws_connection: aws_connection_id,
3713        table,
3714        namespace,
3715        relation_key_indices,
3716        key_desc_and_indices,
3717    }))
3718}
3719
3720fn kafka_sink_builder(
3721    scx: &StatementContext,
3722    connection: ResolvedItemName,
3723    options: Vec<KafkaSinkConfigOption<Aug>>,
3724    format: Option<FormatSpecifier<Aug>>,
3725    relation_key_indices: Option<Vec<usize>>,
3726    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3727    headers_index: Option<usize>,
3728    value_desc: RelationDesc,
3729    envelope: SinkEnvelope,
3730    sink_from: CatalogItemId,
3731    commit_interval: Option<Duration>,
3732) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3733    // Get Kafka connection.
3734    let connection_item = scx.get_item_by_resolved_name(&connection)?;
3735    let connection_id = connection_item.id();
3736    match connection_item.connection()? {
3737        Connection::Kafka(_) => (),
3738        _ => sql_bail!(
3739            "{} is not a kafka connection",
3740            scx.catalog.resolve_full_name(connection_item.name())
3741        ),
3742    };
3743
3744    if commit_interval.is_some() {
3745        sql_bail!("COMMIT INTERVAL option is not supported with KAFKA sinks");
3746    }
3747
3748    let KafkaSinkConfigOptionExtracted {
3749        topic,
3750        compression_type,
3751        partition_by,
3752        progress_group_id_prefix,
3753        transactional_id_prefix,
3754        legacy_ids,
3755        topic_config,
3756        topic_metadata_refresh_interval,
3757        topic_partition_count,
3758        topic_replication_factor,
3759        seen: _,
3760    }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3761
3762    let transactional_id = match (transactional_id_prefix, legacy_ids) {
3763        (Some(_), Some(true)) => {
3764            sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3765        }
3766        (None, Some(true)) => KafkaIdStyle::Legacy,
3767        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3768    };
3769
3770    let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3771        (Some(_), Some(true)) => {
3772            sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3773        }
3774        (None, Some(true)) => KafkaIdStyle::Legacy,
3775        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3776    };
3777
3778    let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3779
3780    if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3781        // This is a librdkafka-enforced restriction that, if violated,
3782        // would result in a runtime error for the source.
3783        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3784    }
3785
3786    let assert_positive = |val: Option<i32>, name: &str| {
3787        if let Some(val) = val {
3788            if val <= 0 {
3789                sql_bail!("{} must be a positive integer", name);
3790            }
3791        }
3792        val.map(NonNeg::try_from)
3793            .transpose()
3794            .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3795    };
3796    let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3797    let topic_replication_factor =
3798        assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3799
3800    // Helper method to parse avro connection options for format specifiers that use avro
3801    // for either key or value encoding.
3802    let gen_avro_schema_options = |conn| {
3803        let CsrConnectionAvro {
3804            connection:
3805                CsrConnection {
3806                    connection,
3807                    options,
3808                },
3809            seed,
3810            key_strategy,
3811            value_strategy,
3812        } = conn;
3813        if seed.is_some() {
3814            sql_bail!("SEED option does not make sense with sinks");
3815        }
3816        if key_strategy.is_some() {
3817            sql_bail!("KEY STRATEGY option does not make sense with sinks");
3818        }
3819        if value_strategy.is_some() {
3820            sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3821        }
3822
3823        let item = scx.get_item_by_resolved_name(&connection)?;
3824        let csr_connection = match item.connection()? {
3825            Connection::Csr(_) => item.id(),
3826            _ => {
3827                sql_bail!(
3828                    "{} is not a schema registry connection",
3829                    scx.catalog
3830                        .resolve_full_name(item.name())
3831                        .to_string()
3832                        .quoted()
3833                )
3834            }
3835        };
3836        let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
3837
3838        if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
3839            sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
3840        }
3841
3842        if key_desc_and_indices.is_some()
3843            && (extracted_options.avro_key_fullname.is_some()
3844                ^ extracted_options.avro_value_fullname.is_some())
3845        {
3846            sql_bail!(
3847                "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
3848            );
3849        }
3850
3851        Ok((csr_connection, extracted_options))
3852    };
3853
3854    let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
3855        Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
3856        Format::Bytes if desc.arity() == 1 => {
3857            let col_type = &desc.typ().column_types[0].scalar_type;
3858            if !mz_pgrepr::Value::can_encode_binary(col_type) {
3859                bail_unsupported!(format!(
3860                    "BYTES format with non-encodable type: {:?}",
3861                    col_type
3862                ));
3863            }
3864
3865            Ok(KafkaSinkFormatType::Bytes)
3866        }
3867        Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
3868        Format::Bytes | Format::Text => {
3869            bail_unsupported!("BYTES or TEXT format with multiple columns")
3870        }
3871        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
3872        Format::Avro(AvroSchema::Csr { csr_connection }) => {
3873            let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
3874            let schema = if is_key {
3875                AvroSchemaGenerator::new(
3876                    desc.clone(),
3877                    false,
3878                    options.key_doc_options,
3879                    options.avro_key_fullname.as_deref().unwrap_or("row"),
3880                    options.null_defaults,
3881                    Some(sink_from),
3882                    false,
3883                )?
3884                .schema()
3885                .to_string()
3886            } else {
3887                AvroSchemaGenerator::new(
3888                    desc.clone(),
3889                    matches!(envelope, SinkEnvelope::Debezium),
3890                    options.value_doc_options,
3891                    options.avro_value_fullname.as_deref().unwrap_or("envelope"),
3892                    options.null_defaults,
3893                    Some(sink_from),
3894                    true,
3895                )?
3896                .schema()
3897                .to_string()
3898            };
3899            Ok(KafkaSinkFormatType::Avro {
3900                schema,
3901                compatibility_level: if is_key {
3902                    options.key_compatibility_level
3903                } else {
3904                    options.value_compatibility_level
3905                },
3906                csr_connection,
3907            })
3908        }
3909        format => bail_unsupported!(format!("sink format {:?}", format)),
3910    };
3911
3912    let partition_by = match &partition_by {
3913        Some(partition_by) => {
3914            let mut scope = Scope::from_source(None, value_desc.iter_names());
3915
3916            match envelope {
3917                SinkEnvelope::Upsert | SinkEnvelope::Append => (),
3918                SinkEnvelope::Debezium => {
3919                    let key_indices: HashSet<_> = key_desc_and_indices
3920                        .as_ref()
3921                        .map(|(_desc, indices)| indices.as_slice())
3922                        .unwrap_or_default()
3923                        .into_iter()
3924                        .collect();
3925                    for (i, item) in scope.items.iter_mut().enumerate() {
3926                        if !key_indices.contains(&i) {
3927                            item.error_if_referenced = Some(|_table, column| {
3928                                PlanError::InvalidPartitionByEnvelopeDebezium {
3929                                    column_name: column.to_string(),
3930                                }
3931                            });
3932                        }
3933                    }
3934                }
3935            };
3936
3937            let ecx = &ExprContext {
3938                qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
3939                name: "PARTITION BY",
3940                scope: &scope,
3941                relation_type: value_desc.typ(),
3942                allow_aggregates: false,
3943                allow_subqueries: false,
3944                allow_parameters: false,
3945                allow_windows: false,
3946            };
3947            let expr = plan_expr(ecx, partition_by)?.cast_to(
3948                ecx,
3949                CastContext::Assignment,
3950                &SqlScalarType::UInt64,
3951            )?;
3952            let expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
3953
3954            Some(expr)
3955        }
3956        _ => None,
3957    };
3958
3959    // Map from the format specifier of the statement to the individual key/value formats for the sink.
3960    let format = match format {
3961        Some(FormatSpecifier::KeyValue { key, value }) => {
3962            let key_format = match key_desc_and_indices.as_ref() {
3963                Some((desc, _indices)) => Some(map_format(key, desc, true)?),
3964                None => None,
3965            };
3966            KafkaSinkFormat {
3967                value_format: map_format(value, &value_desc, false)?,
3968                key_format,
3969            }
3970        }
3971        Some(FormatSpecifier::Bare(format)) => {
3972            let key_format = match key_desc_and_indices.as_ref() {
3973                Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
3974                None => None,
3975            };
3976            KafkaSinkFormat {
3977                value_format: map_format(format, &value_desc, false)?,
3978                key_format,
3979            }
3980        }
3981        None => bail_unsupported!("sink without format"),
3982    };
3983
3984    Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
3985        connection_id,
3986        connection: connection_id,
3987        format,
3988        topic: topic_name,
3989        relation_key_indices,
3990        key_desc_and_indices,
3991        headers_index,
3992        value_desc,
3993        partition_by,
3994        compression_type,
3995        progress_group_id,
3996        transactional_id,
3997        topic_options: KafkaTopicOptions {
3998            partition_count: topic_partition_count,
3999            replication_factor: topic_replication_factor,
4000            topic_config: topic_config.unwrap_or_default(),
4001        },
4002        topic_metadata_refresh_interval,
4003    }))
4004}
4005
4006pub fn describe_create_index(
4007    _: &StatementContext,
4008    _: CreateIndexStatement<Aug>,
4009) -> Result<StatementDesc, PlanError> {
4010    Ok(StatementDesc::new(None))
4011}
4012
4013pub fn plan_create_index(
4014    scx: &StatementContext,
4015    mut stmt: CreateIndexStatement<Aug>,
4016) -> Result<Plan, PlanError> {
4017    let CreateIndexStatement {
4018        name,
4019        on_name,
4020        in_cluster,
4021        key_parts,
4022        with_options,
4023        if_not_exists,
4024    } = &mut stmt;
4025    let on = scx.get_item_by_resolved_name(on_name)?;
4026
4027    {
4028        use CatalogItemType::*;
4029        match on.item_type() {
4030            Table | Source | View | MaterializedView => {
4031                if on.replacement_target().is_some() {
4032                    sql_bail!(
4033                        "index cannot be created on {} because it is a replacement {}",
4034                        on_name.full_name_str(),
4035                        on.item_type(),
4036                    );
4037                }
4038            }
4039            Sink | Index | Type | Func | Secret | Connection => {
4040                sql_bail!(
4041                    "index cannot be created on {} because it is a {}",
4042                    on_name.full_name_str(),
4043                    on.item_type(),
4044                );
4045            }
4046        }
4047    }
4048
4049    let on_desc = on
4050        .relation_desc()
4051        .ok_or_else(|| sql_err!("item does not have a relation description"))?;
4052
4053    let filled_key_parts = match key_parts {
4054        Some(kp) => kp.to_vec(),
4055        None => {
4056            // `key_parts` is None if we're creating a "default" index.
4057            // Precompute which column names are unambiguous in a single pass,
4058            // avoiding the O(n * k) cost of calling get_unambiguous_name per
4059            // key column.
4060            let mut name_counts = BTreeMap::new();
4061            for name in on_desc.iter_names() {
4062                *name_counts.entry(name).or_insert(0usize) += 1;
4063            }
4064            let key = on_desc.typ().default_key();
4065            key.iter()
4066                .map(|i| {
4067                    let name = on_desc.get_name(*i);
4068                    if name_counts.get(name).copied() == Some(1) {
4069                        Expr::Identifier(vec![name.clone().into()])
4070                    } else {
4071                        Expr::Value(Value::Number((i + 1).to_string()))
4072                    }
4073                })
4074                .collect()
4075        }
4076    };
4077    let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4078
4079    let index_name = if let Some(name) = name {
4080        QualifiedItemName {
4081            qualifiers: on.name().qualifiers.clone(),
4082            item: normalize::ident(name.clone()),
4083        }
4084    } else {
4085        let mut idx_name = QualifiedItemName {
4086            qualifiers: on.name().qualifiers.clone(),
4087            item: on.name().item.clone(),
4088        };
4089        if key_parts.is_none() {
4090            // We're trying to create the "default" index.
4091            idx_name.item += "_primary_idx";
4092        } else {
4093            // Use PG schema for automatically naming indexes:
4094            // `<table>_<_-separated indexed expressions>_idx`
4095            let index_name_col_suffix = keys
4096                .iter()
4097                .map(|k| match k {
4098                    mz_expr::MirScalarExpr::Column(i, name) => {
4099                        match (on_desc.get_unambiguous_name(*i), &name.0) {
4100                            (Some(col_name), _) => col_name.to_string(),
4101                            (None, Some(name)) => name.to_string(),
4102                            (None, None) => format!("{}", i + 1),
4103                        }
4104                    }
4105                    _ => "expr".to_string(),
4106                })
4107                .join("_");
4108            write!(idx_name.item, "_{index_name_col_suffix}_idx")
4109                .expect("write on strings cannot fail");
4110            idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4111        }
4112
4113        if !*if_not_exists {
4114            scx.catalog.find_available_name(idx_name)
4115        } else {
4116            idx_name
4117        }
4118    };
4119
4120    // Check for an object in the catalog with this same name
4121    let full_name = scx.catalog.resolve_full_name(&index_name);
4122    let partial_name = PartialItemName::from(full_name.clone());
4123    // For PostgreSQL compatibility, we need to prevent creating indexes when
4124    // there is an existing object *or* type of the same name.
4125    //
4126    // Technically, we only need to prevent coexistence of indexes and types
4127    // that have an associated relation (record types but not list/map types).
4128    // Enforcing that would be more complicated, though. It's backwards
4129    // compatible to weaken this restriction in the future.
4130    if let (Ok(item), false, false) = (
4131        scx.catalog.resolve_item_or_type(&partial_name),
4132        *if_not_exists,
4133        scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4134    ) {
4135        return Err(PlanError::ItemAlreadyExists {
4136            name: full_name.to_string(),
4137            item_type: item.item_type(),
4138        });
4139    }
4140
4141    let options = plan_index_options(scx, with_options.clone())?;
4142    let cluster_id = match in_cluster {
4143        None => scx.resolve_cluster(None)?.id(),
4144        Some(in_cluster) => in_cluster.id,
4145    };
4146
4147    *in_cluster = Some(ResolvedClusterName {
4148        id: cluster_id,
4149        print_name: None,
4150    });
4151
4152    // Normalize `stmt`.
4153    *name = Some(Ident::new(index_name.item.clone())?);
4154    *key_parts = Some(filled_key_parts);
4155    let if_not_exists = *if_not_exists;
4156
4157    let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4158    let compaction_window = options.iter().find_map(|o| {
4159        #[allow(irrefutable_let_patterns)]
4160        if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4161            Some(lcw.clone())
4162        } else {
4163            None
4164        }
4165    });
4166
4167    Ok(Plan::CreateIndex(CreateIndexPlan {
4168        name: index_name,
4169        index: Index {
4170            create_sql,
4171            on: on.global_id(),
4172            keys,
4173            cluster_id,
4174            compaction_window,
4175        },
4176        if_not_exists,
4177    }))
4178}
4179
4180pub fn describe_create_type(
4181    _: &StatementContext,
4182    _: CreateTypeStatement<Aug>,
4183) -> Result<StatementDesc, PlanError> {
4184    Ok(StatementDesc::new(None))
4185}
4186
4187pub fn plan_create_type(
4188    scx: &StatementContext,
4189    stmt: CreateTypeStatement<Aug>,
4190) -> Result<Plan, PlanError> {
4191    let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4192    let CreateTypeStatement { name, as_type, .. } = stmt;
4193
4194    fn validate_data_type(
4195        scx: &StatementContext,
4196        data_type: ResolvedDataType,
4197        as_type: &str,
4198        key: &str,
4199    ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4200        let (id, modifiers) = match data_type {
4201            ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4202            _ => sql_bail!(
4203                "CREATE TYPE ... AS {}option {} can only use named data types, but \
4204                        found unnamed data type {}. Use CREATE TYPE to create a named type first",
4205                as_type,
4206                key,
4207                data_type.human_readable_name(),
4208            ),
4209        };
4210
4211        let item = scx.catalog.get_item(&id);
4212        match item.type_details() {
4213            None => sql_bail!(
4214                "{} must be of class type, but received {} which is of class {}",
4215                key,
4216                scx.catalog.resolve_full_name(item.name()),
4217                item.item_type()
4218            ),
4219            Some(CatalogTypeDetails {
4220                typ: CatalogType::Char,
4221                ..
4222            }) => {
4223                bail_unsupported!("embedding char type in a list or map")
4224            }
4225            _ => {
4226                // Validate that the modifiers are actually valid.
4227                scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4228
4229                Ok((id, modifiers))
4230            }
4231        }
4232    }
4233
4234    let inner = match as_type {
4235        CreateTypeAs::List { options } => {
4236            let CreateTypeListOptionExtracted {
4237                element_type,
4238                seen: _,
4239            } = CreateTypeListOptionExtracted::try_from(options)?;
4240            let element_type =
4241                element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4242            let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4243            CatalogType::List {
4244                element_reference: id,
4245                element_modifiers: modifiers,
4246            }
4247        }
4248        CreateTypeAs::Map { options } => {
4249            let CreateTypeMapOptionExtracted {
4250                key_type,
4251                value_type,
4252                seen: _,
4253            } = CreateTypeMapOptionExtracted::try_from(options)?;
4254            let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4255            let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4256            let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4257            let (value_id, value_modifiers) =
4258                validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4259            CatalogType::Map {
4260                key_reference: key_id,
4261                key_modifiers,
4262                value_reference: value_id,
4263                value_modifiers,
4264            }
4265        }
4266        CreateTypeAs::Record { column_defs } => {
4267            let mut fields = vec![];
4268            for column_def in column_defs {
4269                let data_type = column_def.data_type;
4270                let key = ident(column_def.name.clone());
4271                let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4272                fields.push(CatalogRecordField {
4273                    name: ColumnName::from(key.clone()),
4274                    type_reference: id,
4275                    type_modifiers: modifiers,
4276                });
4277            }
4278            CatalogType::Record { fields }
4279        }
4280    };
4281
4282    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4283
4284    // Check for an object in the catalog with this same name
4285    let full_name = scx.catalog.resolve_full_name(&name);
4286    let partial_name = PartialItemName::from(full_name.clone());
4287    // For PostgreSQL compatibility, we need to prevent creating types when
4288    // there is an existing object *or* type of the same name.
4289    if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4290        if item.item_type().conflicts_with_type() {
4291            return Err(PlanError::ItemAlreadyExists {
4292                name: full_name.to_string(),
4293                item_type: item.item_type(),
4294            });
4295        }
4296    }
4297
4298    Ok(Plan::CreateType(CreateTypePlan {
4299        name,
4300        typ: Type { create_sql, inner },
4301    }))
4302}
4303
4304generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4305
4306generate_extracted_config!(
4307    CreateTypeMapOption,
4308    (KeyType, ResolvedDataType),
4309    (ValueType, ResolvedDataType)
4310);
4311
4312#[derive(Debug)]
4313pub enum PlannedAlterRoleOption {
4314    Attributes(PlannedRoleAttributes),
4315    Variable(PlannedRoleVariable),
4316}
4317
4318#[derive(Debug, Clone)]
4319pub struct PlannedRoleAttributes {
4320    pub inherit: Option<bool>,
4321    pub password: Option<Password>,
4322    pub scram_iterations: Option<NonZeroU32>,
4323    /// `nopassword` is set to true if the password is from the parser is None.
4324    /// This is semantically different than not supplying a password at all,
4325    /// to allow for unsetting a password.
4326    pub nopassword: Option<bool>,
4327    pub superuser: Option<bool>,
4328    pub login: Option<bool>,
4329}
4330
4331fn plan_role_attributes(
4332    options: Vec<RoleAttribute>,
4333    scx: &StatementContext,
4334) -> Result<PlannedRoleAttributes, PlanError> {
4335    let mut planned_attributes = PlannedRoleAttributes {
4336        inherit: None,
4337        password: None,
4338        scram_iterations: None,
4339        superuser: None,
4340        login: None,
4341        nopassword: None,
4342    };
4343
4344    for option in options {
4345        match option {
4346            RoleAttribute::Inherit | RoleAttribute::NoInherit
4347                if planned_attributes.inherit.is_some() =>
4348            {
4349                sql_bail!("conflicting or redundant options");
4350            }
4351            RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4352                bail_never_supported!(
4353                    "CREATECLUSTER attribute",
4354                    "sql/create-role/#details",
4355                    "Use system privileges instead."
4356                );
4357            }
4358            RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4359                bail_never_supported!(
4360                    "CREATEDB attribute",
4361                    "sql/create-role/#details",
4362                    "Use system privileges instead."
4363                );
4364            }
4365            RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4366                bail_never_supported!(
4367                    "CREATEROLE attribute",
4368                    "sql/create-role/#details",
4369                    "Use system privileges instead."
4370                );
4371            }
4372            RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4373                sql_bail!("conflicting or redundant options");
4374            }
4375
4376            RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4377            RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4378            RoleAttribute::Password(password) => {
4379                if let Some(password) = password {
4380                    planned_attributes.password = Some(password.into());
4381                    planned_attributes.scram_iterations =
4382                        Some(scx.catalog.system_vars().scram_iterations())
4383                } else {
4384                    planned_attributes.nopassword = Some(true);
4385                }
4386            }
4387            RoleAttribute::SuperUser => {
4388                if planned_attributes.superuser == Some(false) {
4389                    sql_bail!("conflicting or redundant options");
4390                }
4391                planned_attributes.superuser = Some(true);
4392            }
4393            RoleAttribute::NoSuperUser => {
4394                if planned_attributes.superuser == Some(true) {
4395                    sql_bail!("conflicting or redundant options");
4396                }
4397                planned_attributes.superuser = Some(false);
4398            }
4399            RoleAttribute::Login => {
4400                if planned_attributes.login == Some(false) {
4401                    sql_bail!("conflicting or redundant options");
4402                }
4403                planned_attributes.login = Some(true);
4404            }
4405            RoleAttribute::NoLogin => {
4406                if planned_attributes.login == Some(true) {
4407                    sql_bail!("conflicting or redundant options");
4408                }
4409                planned_attributes.login = Some(false);
4410            }
4411        }
4412    }
4413    if planned_attributes.inherit == Some(false) {
4414        bail_unsupported!("non inherit roles");
4415    }
4416
4417    Ok(planned_attributes)
4418}
4419
4420#[derive(Debug)]
4421pub enum PlannedRoleVariable {
4422    Set { name: String, value: VariableValue },
4423    Reset { name: String },
4424}
4425
4426impl PlannedRoleVariable {
4427    pub fn name(&self) -> &str {
4428        match self {
4429            PlannedRoleVariable::Set { name, .. } => name,
4430            PlannedRoleVariable::Reset { name } => name,
4431        }
4432    }
4433}
4434
4435fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4436    let plan = match variable {
4437        SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4438            name: name.to_string(),
4439            value: scl::plan_set_variable_to(value)?,
4440        },
4441        SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4442            name: name.to_string(),
4443        },
4444    };
4445    Ok(plan)
4446}
4447
4448pub fn describe_create_role(
4449    _: &StatementContext,
4450    _: CreateRoleStatement,
4451) -> Result<StatementDesc, PlanError> {
4452    Ok(StatementDesc::new(None))
4453}
4454
4455pub fn plan_create_role(
4456    scx: &StatementContext,
4457    CreateRoleStatement { name, options }: CreateRoleStatement,
4458) -> Result<Plan, PlanError> {
4459    let attributes = plan_role_attributes(options, scx)?;
4460    Ok(Plan::CreateRole(CreateRolePlan {
4461        name: normalize::ident(name),
4462        attributes: attributes.into(),
4463    }))
4464}
4465
4466pub fn plan_create_network_policy(
4467    ctx: &StatementContext,
4468    CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4469) -> Result<Plan, PlanError> {
4470    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4471    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4472
4473    let Some(rule_defs) = policy_options.rules else {
4474        sql_bail!("RULES must be specified when creating network policies.");
4475    };
4476
4477    let mut rules = vec![];
4478    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4479        let NetworkPolicyRuleOptionExtracted {
4480            seen: _,
4481            direction,
4482            action,
4483            address,
4484        } = options.try_into()?;
4485        let (direction, action, address) = match (direction, action, address) {
4486            (Some(direction), Some(action), Some(address)) => (
4487                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4488                NetworkPolicyRuleAction::try_from(action.as_str())?,
4489                PolicyAddress::try_from(address.as_str())?,
4490            ),
4491            (_, _, _) => {
4492                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4493            }
4494        };
4495        rules.push(NetworkPolicyRule {
4496            name: normalize::ident(name),
4497            direction,
4498            action,
4499            address,
4500        });
4501    }
4502
4503    if rules.len()
4504        > ctx
4505            .catalog
4506            .system_vars()
4507            .max_rules_per_network_policy()
4508            .try_into()?
4509    {
4510        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4511    }
4512
4513    Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4514        name: normalize::ident(name),
4515        rules,
4516    }))
4517}
4518
4519pub fn plan_alter_network_policy(
4520    ctx: &StatementContext,
4521    AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4522) -> Result<Plan, PlanError> {
4523    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4524
4525    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4526    let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4527
4528    let Some(rule_defs) = policy_options.rules else {
4529        sql_bail!("RULES must be specified when creating network policies.");
4530    };
4531
4532    let mut rules = vec![];
4533    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4534        let NetworkPolicyRuleOptionExtracted {
4535            seen: _,
4536            direction,
4537            action,
4538            address,
4539        } = options.try_into()?;
4540
4541        let (direction, action, address) = match (direction, action, address) {
4542            (Some(direction), Some(action), Some(address)) => (
4543                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4544                NetworkPolicyRuleAction::try_from(action.as_str())?,
4545                PolicyAddress::try_from(address.as_str())?,
4546            ),
4547            (_, _, _) => {
4548                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4549            }
4550        };
4551        rules.push(NetworkPolicyRule {
4552            name: normalize::ident(name),
4553            direction,
4554            action,
4555            address,
4556        });
4557    }
4558    if rules.len()
4559        > ctx
4560            .catalog
4561            .system_vars()
4562            .max_rules_per_network_policy()
4563            .try_into()?
4564    {
4565        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4566    }
4567
4568    Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4569        id: policy.id(),
4570        name: normalize::ident(name),
4571        rules,
4572    }))
4573}
4574
4575pub fn describe_create_cluster(
4576    _: &StatementContext,
4577    _: CreateClusterStatement<Aug>,
4578) -> Result<StatementDesc, PlanError> {
4579    Ok(StatementDesc::new(None))
4580}
4581
4582// WARNING:
4583// DO NOT set any `Default` value here using the built-in mechanism of `generate_extracted_config`!
4584// These options are also used in ALTER CLUSTER, where not giving an option means that the value of
4585// that option stays the same. If you were to give a default value here, then not giving that option
4586// to ALTER CLUSTER would always reset the value of that option to the default.
4587generate_extracted_config!(
4588    ClusterOption,
4589    (AvailabilityZones, Vec<String>),
4590    (Disk, bool),
4591    (IntrospectionDebugging, bool),
4592    (IntrospectionInterval, OptionalDuration),
4593    (Managed, bool),
4594    (Replicas, Vec<ReplicaDefinition<Aug>>),
4595    (ReplicationFactor, u32),
4596    (Size, String),
4597    (Schedule, ClusterScheduleOptionValue),
4598    (WorkloadClass, OptionalString)
4599);
4600
4601generate_extracted_config!(
4602    NetworkPolicyOption,
4603    (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4604);
4605
4606generate_extracted_config!(
4607    NetworkPolicyRuleOption,
4608    (Direction, String),
4609    (Action, String),
4610    (Address, String)
4611);
4612
4613generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4614
4615generate_extracted_config!(
4616    ClusterAlterUntilReadyOption,
4617    (Timeout, Duration),
4618    (OnTimeout, String)
4619);
4620
4621generate_extracted_config!(
4622    ClusterFeature,
4623    (ReoptimizeImportedViews, Option<bool>, Default(None)),
4624    (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4625    (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4626    (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4627    (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4628    (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4629    (
4630        EnableProjectionPushdownAfterRelationCse,
4631        Option<bool>,
4632        Default(None)
4633    )
4634);
4635
4636/// Convert a [`CreateClusterStatement`] into a [`Plan`].
4637///
4638/// The reverse of [`unplan_create_cluster`].
4639pub fn plan_create_cluster(
4640    scx: &StatementContext,
4641    stmt: CreateClusterStatement<Aug>,
4642) -> Result<Plan, PlanError> {
4643    let plan = plan_create_cluster_inner(scx, stmt)?;
4644
4645    // Roundtrip through unplan and make sure that we end up with the same plan.
4646    if let CreateClusterVariant::Managed(_) = &plan.variant {
4647        let stmt = unplan_create_cluster(scx, plan.clone())
4648            .map_err(|e| PlanError::Replan(e.to_string()))?;
4649        let create_sql = stmt.to_ast_string_stable();
4650        let stmt = parse::parse(&create_sql)
4651            .map_err(|e| PlanError::Replan(e.to_string()))?
4652            .into_element()
4653            .ast;
4654        let (stmt, _resolved_ids) =
4655            names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4656        let stmt = match stmt {
4657            Statement::CreateCluster(stmt) => stmt,
4658            stmt => {
4659                return Err(PlanError::Replan(format!(
4660                    "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4661                )));
4662            }
4663        };
4664        let replan =
4665            plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4666        if plan != replan {
4667            return Err(PlanError::Replan(format!(
4668                "replan does not match: plan={plan:?}, replan={replan:?}"
4669            )));
4670        }
4671    }
4672
4673    Ok(Plan::CreateCluster(plan))
4674}
4675
4676pub fn plan_create_cluster_inner(
4677    scx: &StatementContext,
4678    CreateClusterStatement {
4679        name,
4680        options,
4681        features,
4682    }: CreateClusterStatement<Aug>,
4683) -> Result<CreateClusterPlan, PlanError> {
4684    let ClusterOptionExtracted {
4685        availability_zones,
4686        introspection_debugging,
4687        introspection_interval,
4688        managed,
4689        replicas,
4690        replication_factor,
4691        seen: _,
4692        size,
4693        disk,
4694        schedule,
4695        workload_class,
4696    }: ClusterOptionExtracted = options.try_into()?;
4697
4698    let managed = managed.unwrap_or_else(|| replicas.is_none());
4699
4700    if !scx.catalog.active_role_id().is_system() {
4701        if !features.is_empty() {
4702            sql_bail!("FEATURES not supported for non-system users");
4703        }
4704        if workload_class.is_some() {
4705            sql_bail!("WORKLOAD CLASS not supported for non-system users");
4706        }
4707    }
4708
4709    let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4710    let workload_class = workload_class.and_then(|v| v.0);
4711
4712    if managed {
4713        if replicas.is_some() {
4714            sql_bail!("REPLICAS not supported for managed clusters");
4715        }
4716        let Some(size) = size else {
4717            sql_bail!("SIZE must be specified for managed clusters");
4718        };
4719
4720        if disk.is_some() {
4721            // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
4722            // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
4723            // we'll be able to remove the `DISK` option entirely.
4724            if scx.catalog.is_cluster_size_cc(&size) {
4725                sql_bail!(
4726                    "DISK option not supported for modern cluster sizes because disk is always enabled"
4727                );
4728            }
4729
4730            scx.catalog
4731                .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4732        }
4733
4734        let compute = plan_compute_replica_config(
4735            introspection_interval,
4736            introspection_debugging.unwrap_or(false),
4737        )?;
4738
4739        let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4740            replication_factor.unwrap_or_else(|| {
4741                scx.catalog
4742                    .system_vars()
4743                    .default_cluster_replication_factor()
4744            })
4745        } else {
4746            scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4747            if replication_factor.is_some() {
4748                sql_bail!(
4749                    "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4750                );
4751            }
4752            // If we have a non-trivial schedule, then let's not have any replicas initially,
4753            // to avoid quickly going back and forth if the schedule doesn't want a replica
4754            // initially.
4755            0
4756        };
4757        let availability_zones = availability_zones.unwrap_or_default();
4758
4759        if !availability_zones.is_empty() {
4760            scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4761        }
4762
4763        // Plan OptimizerFeatureOverrides.
4764        let ClusterFeatureExtracted {
4765            reoptimize_imported_views,
4766            enable_eager_delta_joins,
4767            enable_new_outer_join_lowering,
4768            enable_variadic_left_join_lowering,
4769            enable_letrec_fixpoint_analysis,
4770            enable_join_prioritize_arranged,
4771            enable_projection_pushdown_after_relation_cse,
4772            seen: _,
4773        } = ClusterFeatureExtracted::try_from(features)?;
4774        let optimizer_feature_overrides = OptimizerFeatureOverrides {
4775            reoptimize_imported_views,
4776            enable_eager_delta_joins,
4777            enable_new_outer_join_lowering,
4778            enable_variadic_left_join_lowering,
4779            enable_letrec_fixpoint_analysis,
4780            enable_join_prioritize_arranged,
4781            enable_projection_pushdown_after_relation_cse,
4782            ..Default::default()
4783        };
4784
4785        let schedule = plan_cluster_schedule(schedule)?;
4786
4787        Ok(CreateClusterPlan {
4788            name: normalize::ident(name),
4789            variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4790                replication_factor,
4791                size,
4792                availability_zones,
4793                compute,
4794                optimizer_feature_overrides,
4795                schedule,
4796            }),
4797            workload_class,
4798        })
4799    } else {
4800        let Some(replica_defs) = replicas else {
4801            sql_bail!("REPLICAS must be specified for unmanaged clusters");
4802        };
4803        if availability_zones.is_some() {
4804            sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4805        }
4806        if replication_factor.is_some() {
4807            sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4808        }
4809        if introspection_debugging.is_some() {
4810            sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4811        }
4812        if introspection_interval.is_some() {
4813            sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4814        }
4815        if size.is_some() {
4816            sql_bail!("SIZE not supported for unmanaged clusters");
4817        }
4818        if disk.is_some() {
4819            sql_bail!("DISK not supported for unmanaged clusters");
4820        }
4821        if !features.is_empty() {
4822            sql_bail!("FEATURES not supported for unmanaged clusters");
4823        }
4824        if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4825            sql_bail!(
4826                "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4827            );
4828        }
4829
4830        let mut replicas = vec![];
4831        for ReplicaDefinition { name, options } in replica_defs {
4832            replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4833        }
4834
4835        Ok(CreateClusterPlan {
4836            name: normalize::ident(name),
4837            variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4838            workload_class,
4839        })
4840    }
4841}
4842
4843/// Convert a [`CreateClusterPlan`] into a [`CreateClusterStatement`].
4844///
4845/// The reverse of [`plan_create_cluster`].
4846pub fn unplan_create_cluster(
4847    scx: &StatementContext,
4848    CreateClusterPlan {
4849        name,
4850        variant,
4851        workload_class,
4852    }: CreateClusterPlan,
4853) -> Result<CreateClusterStatement<Aug>, PlanError> {
4854    match variant {
4855        CreateClusterVariant::Managed(CreateClusterManagedPlan {
4856            replication_factor,
4857            size,
4858            availability_zones,
4859            compute,
4860            optimizer_feature_overrides,
4861            schedule,
4862        }) => {
4863            let schedule = unplan_cluster_schedule(schedule);
4864            let OptimizerFeatureOverrides {
4865                enable_guard_subquery_tablefunc: _,
4866                enable_consolidate_after_union_negate: _,
4867                enable_reduce_mfp_fusion: _,
4868                enable_cardinality_estimates: _,
4869                persist_fast_path_limit: _,
4870                reoptimize_imported_views,
4871                enable_eager_delta_joins,
4872                enable_new_outer_join_lowering,
4873                enable_variadic_left_join_lowering,
4874                enable_letrec_fixpoint_analysis,
4875                enable_join_prioritize_arranged,
4876                enable_projection_pushdown_after_relation_cse,
4877                enable_less_reduce_in_eqprop: _,
4878                enable_dequadratic_eqprop_map: _,
4879                enable_eq_classes_withholding_errors: _,
4880                enable_fast_path_plan_insights: _,
4881                enable_cast_elimination: _,
4882                enable_case_literal_transform: _,
4883                enable_simplify_quantified_comparisons: _,
4884                enable_coalesce_case_transform: _,
4885            } = optimizer_feature_overrides;
4886            // The ones from above that don't occur below are not wired up to cluster features.
4887            let features_extracted = ClusterFeatureExtracted {
4888                // Seen is ignored when unplanning.
4889                seen: Default::default(),
4890                reoptimize_imported_views,
4891                enable_eager_delta_joins,
4892                enable_new_outer_join_lowering,
4893                enable_variadic_left_join_lowering,
4894                enable_letrec_fixpoint_analysis,
4895                enable_join_prioritize_arranged,
4896                enable_projection_pushdown_after_relation_cse,
4897            };
4898            let features = features_extracted.into_values(scx.catalog);
4899            let availability_zones = if availability_zones.is_empty() {
4900                None
4901            } else {
4902                Some(availability_zones)
4903            };
4904            let (introspection_interval, introspection_debugging) =
4905                unplan_compute_replica_config(compute);
4906            // Replication factor cannot be explicitly specified with a refresh schedule, it's
4907            // always 1 or less.
4908            let replication_factor = match &schedule {
4909                ClusterScheduleOptionValue::Manual => Some(replication_factor),
4910                ClusterScheduleOptionValue::Refresh { .. } => {
4911                    assert!(
4912                        replication_factor <= 1,
4913                        "replication factor, {replication_factor:?}, must be <= 1"
4914                    );
4915                    None
4916                }
4917            };
4918            let workload_class = workload_class.map(|s| OptionalString(Some(s)));
4919            let options_extracted = ClusterOptionExtracted {
4920                // Seen is ignored when unplanning.
4921                seen: Default::default(),
4922                availability_zones,
4923                disk: None,
4924                introspection_debugging: Some(introspection_debugging),
4925                introspection_interval,
4926                managed: Some(true),
4927                replicas: None,
4928                replication_factor,
4929                size: Some(size),
4930                schedule: Some(schedule),
4931                workload_class,
4932            };
4933            let options = options_extracted.into_values(scx.catalog);
4934            let name = Ident::new_unchecked(name);
4935            Ok(CreateClusterStatement {
4936                name,
4937                options,
4938                features,
4939            })
4940        }
4941        CreateClusterVariant::Unmanaged(_) => {
4942            bail_unsupported!("SHOW CREATE for unmanaged clusters")
4943        }
4944    }
4945}
4946
4947generate_extracted_config!(
4948    ReplicaOption,
4949    (AvailabilityZone, String),
4950    (BilledAs, String),
4951    (ComputeAddresses, Vec<String>),
4952    (ComputectlAddresses, Vec<String>),
4953    (Disk, bool),
4954    (Internal, bool, Default(false)),
4955    (IntrospectionDebugging, bool, Default(false)),
4956    (IntrospectionInterval, OptionalDuration),
4957    (Size, String),
4958    (StorageAddresses, Vec<String>),
4959    (StoragectlAddresses, Vec<String>),
4960    (Workers, u16)
4961);
4962
4963fn plan_replica_config(
4964    scx: &StatementContext,
4965    options: Vec<ReplicaOption<Aug>>,
4966) -> Result<ReplicaConfig, PlanError> {
4967    let ReplicaOptionExtracted {
4968        availability_zone,
4969        billed_as,
4970        computectl_addresses,
4971        disk,
4972        internal,
4973        introspection_debugging,
4974        introspection_interval,
4975        size,
4976        storagectl_addresses,
4977        ..
4978    }: ReplicaOptionExtracted = options.try_into()?;
4979
4980    let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
4981
4982    match (
4983        size,
4984        availability_zone,
4985        billed_as,
4986        storagectl_addresses,
4987        computectl_addresses,
4988    ) {
4989        // Common cases we expect end users to hit.
4990        (None, _, None, None, None) => {
4991            // We don't mention the unmanaged options in the error message
4992            // because they are only available in unsafe mode.
4993            sql_bail!("SIZE option must be specified");
4994        }
4995        (Some(size), availability_zone, billed_as, None, None) => {
4996            if disk.is_some() {
4997                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
4998                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
4999                // we'll be able to remove the `DISK` option entirely.
5000                if scx.catalog.is_cluster_size_cc(&size) {
5001                    sql_bail!(
5002                        "DISK option not supported for modern cluster sizes because disk is always enabled"
5003                    );
5004                }
5005
5006                scx.catalog
5007                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5008            }
5009
5010            Ok(ReplicaConfig::Orchestrated {
5011                size,
5012                availability_zone,
5013                compute,
5014                billed_as,
5015                internal,
5016            })
5017        }
5018
5019        (None, None, None, storagectl_addresses, computectl_addresses) => {
5020            scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
5021
5022            // When manually testing Materialize in unsafe mode, it's easy to
5023            // accidentally omit one of these options, so we try to produce
5024            // helpful error messages.
5025            let Some(storagectl_addrs) = storagectl_addresses else {
5026                sql_bail!("missing STORAGECTL ADDRESSES option");
5027            };
5028            let Some(computectl_addrs) = computectl_addresses else {
5029                sql_bail!("missing COMPUTECTL ADDRESSES option");
5030            };
5031
5032            if storagectl_addrs.len() != computectl_addrs.len() {
5033                sql_bail!(
5034                    "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
5035                );
5036            }
5037
5038            if disk.is_some() {
5039                sql_bail!("DISK can't be specified for unorchestrated clusters");
5040            }
5041
5042            Ok(ReplicaConfig::Unorchestrated {
5043                storagectl_addrs,
5044                computectl_addrs,
5045                compute,
5046            })
5047        }
5048        _ => {
5049            // We don't bother trying to produce a more helpful error message
5050            // here because no user is likely to hit this path.
5051            sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
5052        }
5053    }
5054}
5055
5056/// Convert an [`Option<OptionalDuration>`] and [`bool`] into a [`ComputeReplicaConfig`].
5057///
5058/// The reverse of [`unplan_compute_replica_config`].
5059fn plan_compute_replica_config(
5060    introspection_interval: Option<OptionalDuration>,
5061    introspection_debugging: bool,
5062) -> Result<ComputeReplicaConfig, PlanError> {
5063    let introspection_interval = introspection_interval
5064        .map(|OptionalDuration(i)| i)
5065        .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5066    let introspection = match introspection_interval {
5067        Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5068            interval,
5069            debugging: introspection_debugging,
5070        }),
5071        None if introspection_debugging => {
5072            sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5073        }
5074        None => None,
5075    };
5076    let compute = ComputeReplicaConfig { introspection };
5077    Ok(compute)
5078}
5079
5080/// Convert a [`ComputeReplicaConfig`] into an [`Option<OptionalDuration>`] and [`bool`].
5081///
5082/// The reverse of [`plan_compute_replica_config`].
5083fn unplan_compute_replica_config(
5084    compute_replica_config: ComputeReplicaConfig,
5085) -> (Option<OptionalDuration>, bool) {
5086    match compute_replica_config.introspection {
5087        Some(ComputeReplicaIntrospectionConfig {
5088            debugging,
5089            interval,
5090        }) => (Some(OptionalDuration(Some(interval))), debugging),
5091        None => (Some(OptionalDuration(None)), false),
5092    }
5093}
5094
5095/// Convert a [`ClusterScheduleOptionValue`] into a [`ClusterSchedule`].
5096///
5097/// The reverse of [`unplan_cluster_schedule`].
5098fn plan_cluster_schedule(
5099    schedule: ClusterScheduleOptionValue,
5100) -> Result<ClusterSchedule, PlanError> {
5101    Ok(match schedule {
5102        ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5103        // If `HYDRATION TIME ESTIMATE` is not explicitly given, we default to 0.
5104        ClusterScheduleOptionValue::Refresh {
5105            hydration_time_estimate: None,
5106        } => ClusterSchedule::Refresh {
5107            hydration_time_estimate: Duration::from_millis(0),
5108        },
5109        // Otherwise we convert the `IntervalValue` to a `Duration`.
5110        ClusterScheduleOptionValue::Refresh {
5111            hydration_time_estimate: Some(interval_value),
5112        } => {
5113            let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5114            if interval.as_microseconds() < 0 {
5115                sql_bail!(
5116                    "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5117                    interval
5118                );
5119            }
5120            if interval.months != 0 {
5121                // This limitation is because we want this interval to be cleanly convertable
5122                // to a unix epoch timestamp difference. When the interval involves months, then
5123                // this is not true anymore, because months have variable lengths.
5124                sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5125            }
5126            let duration = interval.duration()?;
5127            if u64::try_from(duration.as_millis()).is_err()
5128                || Interval::from_duration(&duration).is_err()
5129            {
5130                sql_bail!("HYDRATION TIME ESTIMATE too large");
5131            }
5132            ClusterSchedule::Refresh {
5133                hydration_time_estimate: duration,
5134            }
5135        }
5136    })
5137}
5138
5139/// Convert a [`ClusterSchedule`] into a [`ClusterScheduleOptionValue`].
5140///
5141/// The reverse of [`plan_cluster_schedule`].
5142fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5143    match schedule {
5144        ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5145        ClusterSchedule::Refresh {
5146            hydration_time_estimate,
5147        } => {
5148            let interval = Interval::from_duration(&hydration_time_estimate)
5149                .expect("planning ensured that this is convertible back to Interval");
5150            let interval_value = literal::unplan_interval(&interval);
5151            ClusterScheduleOptionValue::Refresh {
5152                hydration_time_estimate: Some(interval_value),
5153            }
5154        }
5155    }
5156}
5157
5158pub fn describe_create_cluster_replica(
5159    _: &StatementContext,
5160    _: CreateClusterReplicaStatement<Aug>,
5161) -> Result<StatementDesc, PlanError> {
5162    Ok(StatementDesc::new(None))
5163}
5164
5165pub fn plan_create_cluster_replica(
5166    scx: &StatementContext,
5167    CreateClusterReplicaStatement {
5168        definition: ReplicaDefinition { name, options },
5169        of_cluster,
5170    }: CreateClusterReplicaStatement<Aug>,
5171) -> Result<Plan, PlanError> {
5172    let cluster = scx
5173        .catalog
5174        .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5175    let current_replica_count = cluster.replica_ids().iter().count();
5176    if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5177        let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5178        return Err(PlanError::CreateReplicaFailStorageObjects {
5179            current_replica_count,
5180            internal_replica_count,
5181            hypothetical_replica_count: current_replica_count + 1,
5182        });
5183    }
5184
5185    let config = plan_replica_config(scx, options)?;
5186
5187    if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5188        if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5189            return Err(PlanError::MangedReplicaName(name.into_string()));
5190        }
5191    } else {
5192        ensure_cluster_is_not_managed(scx, cluster.id())?;
5193    }
5194
5195    Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5196        name: normalize::ident(name),
5197        cluster_id: cluster.id(),
5198        config,
5199    }))
5200}
5201
5202pub fn describe_create_secret(
5203    _: &StatementContext,
5204    _: CreateSecretStatement<Aug>,
5205) -> Result<StatementDesc, PlanError> {
5206    Ok(StatementDesc::new(None))
5207}
5208
5209pub fn plan_create_secret(
5210    scx: &StatementContext,
5211    stmt: CreateSecretStatement<Aug>,
5212) -> Result<Plan, PlanError> {
5213    let CreateSecretStatement {
5214        name,
5215        if_not_exists,
5216        value,
5217    } = &stmt;
5218
5219    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5220    let mut create_sql_statement = stmt.clone();
5221    create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5222    let create_sql =
5223        normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5224    let secret_as = query::plan_secret_as(scx, value.clone())?;
5225
5226    let secret = Secret {
5227        create_sql,
5228        secret_as,
5229    };
5230
5231    Ok(Plan::CreateSecret(CreateSecretPlan {
5232        name,
5233        secret,
5234        if_not_exists: *if_not_exists,
5235    }))
5236}
5237
5238pub fn describe_create_connection(
5239    _: &StatementContext,
5240    _: CreateConnectionStatement<Aug>,
5241) -> Result<StatementDesc, PlanError> {
5242    Ok(StatementDesc::new(None))
5243}
5244
5245generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5246
5247pub fn plan_create_connection(
5248    scx: &StatementContext,
5249    mut stmt: CreateConnectionStatement<Aug>,
5250) -> Result<Plan, PlanError> {
5251    let CreateConnectionStatement {
5252        name,
5253        connection_type,
5254        values,
5255        if_not_exists,
5256        with_options,
5257    } = stmt.clone();
5258    let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5259    let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5260    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5261
5262    let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5263    if options.validate.is_some() {
5264        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5265    }
5266    let validate = match options.validate {
5267        Some(val) => val,
5268        None => {
5269            scx.catalog
5270                .system_vars()
5271                .enable_default_connection_validation()
5272                && details.to_connection().validate_by_default()
5273        }
5274    };
5275
5276    // Check for an object in the catalog with this same name
5277    let full_name = scx.catalog.resolve_full_name(&name);
5278    let partial_name = PartialItemName::from(full_name.clone());
5279    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5280        return Err(PlanError::ItemAlreadyExists {
5281            name: full_name.to_string(),
5282            item_type: item.item_type(),
5283        });
5284    }
5285
5286    // For SSH connections, overwrite the public key options based on the
5287    // connection details, in case we generated new keys during planning.
5288    if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5289        stmt.values.retain(|v| {
5290            v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5291        });
5292        stmt.values.push(ConnectionOption {
5293            name: ConnectionOptionName::PublicKey1,
5294            value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5295        });
5296        stmt.values.push(ConnectionOption {
5297            name: ConnectionOptionName::PublicKey2,
5298            value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5299        });
5300    }
5301    let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5302
5303    let plan = CreateConnectionPlan {
5304        name,
5305        if_not_exists,
5306        connection: crate::plan::Connection {
5307            create_sql,
5308            details,
5309        },
5310        validate,
5311    };
5312    Ok(Plan::CreateConnection(plan))
5313}
5314
5315fn plan_drop_database(
5316    scx: &StatementContext,
5317    if_exists: bool,
5318    name: &UnresolvedDatabaseName,
5319    cascade: bool,
5320) -> Result<Option<DatabaseId>, PlanError> {
5321    Ok(match resolve_database(scx, name, if_exists)? {
5322        Some(database) => {
5323            if !cascade && database.has_schemas() {
5324                sql_bail!(
5325                    "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5326                    name,
5327                );
5328            }
5329            Some(database.id())
5330        }
5331        None => None,
5332    })
5333}
5334
5335pub fn describe_drop_objects(
5336    _: &StatementContext,
5337    _: DropObjectsStatement,
5338) -> Result<StatementDesc, PlanError> {
5339    Ok(StatementDesc::new(None))
5340}
5341
5342pub fn plan_drop_objects(
5343    scx: &mut StatementContext,
5344    DropObjectsStatement {
5345        object_type,
5346        if_exists,
5347        names,
5348        cascade,
5349    }: DropObjectsStatement,
5350) -> Result<Plan, PlanError> {
5351    if object_type == mz_sql_parser::ast::ObjectType::Func {
5352        bail_unsupported!("DROP FUNCTION");
5353    }
5354    let object_type = object_type.into();
5355
5356    let mut referenced_ids = Vec::new();
5357    for name in names {
5358        let id = match &name {
5359            UnresolvedObjectName::Cluster(name) => {
5360                plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5361            }
5362            UnresolvedObjectName::ClusterReplica(name) => {
5363                plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5364            }
5365            UnresolvedObjectName::Database(name) => {
5366                plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5367            }
5368            UnresolvedObjectName::Schema(name) => {
5369                plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5370            }
5371            UnresolvedObjectName::Role(name) => {
5372                plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5373            }
5374            UnresolvedObjectName::Item(name) => {
5375                plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5376                    .map(ObjectId::Item)
5377            }
5378            UnresolvedObjectName::NetworkPolicy(name) => {
5379                plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5380            }
5381        };
5382        match id {
5383            Some(id) => referenced_ids.push(id),
5384            None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5385                name: name.to_ast_string_simple(),
5386                object_type,
5387            }),
5388        }
5389    }
5390    let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5391
5392    Ok(Plan::DropObjects(DropObjectsPlan {
5393        referenced_ids,
5394        drop_ids,
5395        object_type,
5396    }))
5397}
5398
5399fn plan_drop_schema(
5400    scx: &StatementContext,
5401    if_exists: bool,
5402    name: &UnresolvedSchemaName,
5403    cascade: bool,
5404) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5405    // Special case for mz_temp: with lazy temporary schema creation, the temp
5406    // schema may not exist yet, but we still need to return the correct error.
5407    // Check the schema name directly against MZ_TEMP_SCHEMA.
5408    let normalized = normalize::unresolved_schema_name(name.clone())?;
5409    if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
5410        sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5411    }
5412
5413    Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5414        Some((database_spec, schema_spec)) => {
5415            if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5416                sql_bail!(
5417                    "cannot drop schema {name} because it is required by the database system",
5418                );
5419            }
5420            if let SchemaSpecifier::Temporary = schema_spec {
5421                sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5422            }
5423            let schema = scx.get_schema(&database_spec, &schema_spec);
5424            if !cascade && schema.has_items() {
5425                let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5426                sql_bail!(
5427                    "schema '{}' cannot be dropped without CASCADE while it contains objects",
5428                    full_schema_name
5429                );
5430            }
5431            Some((database_spec, schema_spec))
5432        }
5433        None => None,
5434    })
5435}
5436
5437fn plan_drop_role(
5438    scx: &StatementContext,
5439    if_exists: bool,
5440    name: &Ident,
5441) -> Result<Option<RoleId>, PlanError> {
5442    match scx.catalog.resolve_role(name.as_str()) {
5443        Ok(role) => {
5444            let id = role.id();
5445            if &id == scx.catalog.active_role_id() {
5446                sql_bail!("current role cannot be dropped");
5447            }
5448            for role in scx.catalog.get_roles() {
5449                for (member_id, grantor_id) in role.membership() {
5450                    if &id == grantor_id {
5451                        let member_role = scx.catalog.get_role(member_id);
5452                        sql_bail!(
5453                            "cannot drop role {}: still depended up by membership of role {} in role {}",
5454                            name.as_str(),
5455                            role.name(),
5456                            member_role.name()
5457                        );
5458                    }
5459                }
5460            }
5461            Ok(Some(role.id()))
5462        }
5463        Err(_) if if_exists => Ok(None),
5464        Err(e) => Err(e.into()),
5465    }
5466}
5467
5468fn plan_drop_cluster(
5469    scx: &StatementContext,
5470    if_exists: bool,
5471    name: &Ident,
5472    cascade: bool,
5473) -> Result<Option<ClusterId>, PlanError> {
5474    Ok(match resolve_cluster(scx, name, if_exists)? {
5475        Some(cluster) => {
5476            if !cascade && !cluster.bound_objects().is_empty() {
5477                return Err(PlanError::DependentObjectsStillExist {
5478                    object_type: "cluster".to_string(),
5479                    object_name: cluster.name().to_string(),
5480                    dependents: Vec::new(),
5481                });
5482            }
5483            Some(cluster.id())
5484        }
5485        None => None,
5486    })
5487}
5488
5489fn plan_drop_network_policy(
5490    scx: &StatementContext,
5491    if_exists: bool,
5492    name: &Ident,
5493) -> Result<Option<NetworkPolicyId>, PlanError> {
5494    match scx.catalog.resolve_network_policy(name.as_str()) {
5495        Ok(policy) => {
5496            // TODO(network_policy): When we support role based network policies, check if any role
5497            // currently has the specified policy set.
5498            if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5499                Err(PlanError::NetworkPolicyInUse)
5500            } else {
5501                Ok(Some(policy.id()))
5502            }
5503        }
5504        Err(_) if if_exists => Ok(None),
5505        Err(e) => Err(e.into()),
5506    }
5507}
5508
5509/// Returns `true` if the cluster has any object that requires a single replica.
5510/// Returns `false` if the cluster has no objects.
5511fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5512    // If this feature is enabled then all objects support multiple-replicas
5513    if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5514        false
5515    } else {
5516        // Othewise we check for the existence of sources or sinks
5517        cluster.bound_objects().iter().any(|id| {
5518            let item = scx.catalog.get_item(id);
5519            matches!(
5520                item.item_type(),
5521                CatalogItemType::Sink | CatalogItemType::Source
5522            )
5523        })
5524    }
5525}
5526
5527fn plan_drop_cluster_replica(
5528    scx: &StatementContext,
5529    if_exists: bool,
5530    name: &QualifiedReplica,
5531) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5532    let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5533    Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5534}
5535
5536/// Returns the [`CatalogItemId`] of the item we should drop, if it exists.
5537fn plan_drop_item(
5538    scx: &StatementContext,
5539    object_type: ObjectType,
5540    if_exists: bool,
5541    name: UnresolvedItemName,
5542    cascade: bool,
5543) -> Result<Option<CatalogItemId>, PlanError> {
5544    let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5545        Ok(r) => r,
5546        // Return a more helpful error on `DROP VIEW <materialized-view>`.
5547        Err(PlanError::MismatchedObjectType {
5548            name,
5549            is_type: ObjectType::MaterializedView,
5550            expected_type: ObjectType::View,
5551        }) => {
5552            return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5553        }
5554        e => e?,
5555    };
5556
5557    Ok(match resolved {
5558        Some(catalog_item) => {
5559            if catalog_item.id().is_system() {
5560                sql_bail!(
5561                    "cannot drop {} {} because it is required by the database system",
5562                    catalog_item.item_type(),
5563                    scx.catalog.minimal_qualification(catalog_item.name()),
5564                );
5565            }
5566
5567            if !cascade {
5568                for id in catalog_item.used_by() {
5569                    let dep = scx.catalog.get_item(id);
5570                    if dependency_prevents_drop(object_type, dep) {
5571                        return Err(PlanError::DependentObjectsStillExist {
5572                            object_type: catalog_item.item_type().to_string(),
5573                            object_name: scx
5574                                .catalog
5575                                .minimal_qualification(catalog_item.name())
5576                                .to_string(),
5577                            dependents: vec![(
5578                                dep.item_type().to_string(),
5579                                scx.catalog.minimal_qualification(dep.name()).to_string(),
5580                            )],
5581                        });
5582                    }
5583                }
5584                // TODO(jkosh44) It would be nice to also check if any active subscribe or pending peek
5585                //  relies on entry. Unfortunately, we don't have that information readily available.
5586            }
5587            Some(catalog_item.id())
5588        }
5589        None => None,
5590    })
5591}
5592
5593/// Does the dependency `dep` prevent a drop of a non-cascade query?
5594fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5595    match object_type {
5596        ObjectType::Type => true,
5597        ObjectType::Table
5598        | ObjectType::View
5599        | ObjectType::MaterializedView
5600        | ObjectType::Source
5601        | ObjectType::Sink
5602        | ObjectType::Index
5603        | ObjectType::Role
5604        | ObjectType::Cluster
5605        | ObjectType::ClusterReplica
5606        | ObjectType::Secret
5607        | ObjectType::Connection
5608        | ObjectType::Database
5609        | ObjectType::Schema
5610        | ObjectType::Func
5611        | ObjectType::NetworkPolicy => match dep.item_type() {
5612            CatalogItemType::Func
5613            | CatalogItemType::Table
5614            | CatalogItemType::Source
5615            | CatalogItemType::View
5616            | CatalogItemType::MaterializedView
5617            | CatalogItemType::Sink
5618            | CatalogItemType::Type
5619            | CatalogItemType::Secret
5620            | CatalogItemType::Connection => true,
5621            CatalogItemType::Index => false,
5622        },
5623    }
5624}
5625
5626pub fn describe_alter_index_options(
5627    _: &StatementContext,
5628    _: AlterIndexStatement<Aug>,
5629) -> Result<StatementDesc, PlanError> {
5630    Ok(StatementDesc::new(None))
5631}
5632
5633pub fn describe_drop_owned(
5634    _: &StatementContext,
5635    _: DropOwnedStatement<Aug>,
5636) -> Result<StatementDesc, PlanError> {
5637    Ok(StatementDesc::new(None))
5638}
5639
5640pub fn plan_drop_owned(
5641    scx: &StatementContext,
5642    drop: DropOwnedStatement<Aug>,
5643) -> Result<Plan, PlanError> {
5644    let cascade = drop.cascade();
5645    let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5646    let mut drop_ids = Vec::new();
5647    let mut privilege_revokes = Vec::new();
5648    let mut default_privilege_revokes = Vec::new();
5649
5650    fn update_privilege_revokes(
5651        object_id: SystemObjectId,
5652        privileges: &PrivilegeMap,
5653        role_ids: &BTreeSet<RoleId>,
5654        privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5655    ) {
5656        privilege_revokes.extend(iter::zip(
5657            iter::repeat(object_id),
5658            privileges
5659                .all_values()
5660                .filter(|privilege| role_ids.contains(&privilege.grantee))
5661                .cloned(),
5662        ));
5663    }
5664
5665    // Replicas
5666    for replica in scx.catalog.get_cluster_replicas() {
5667        if role_ids.contains(&replica.owner_id()) {
5668            drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5669        }
5670    }
5671
5672    // Clusters
5673    for cluster in scx.catalog.get_clusters() {
5674        if role_ids.contains(&cluster.owner_id()) {
5675            // Note: CASCADE is not required for replicas.
5676            if !cascade {
5677                let non_owned_bound_objects: Vec<_> = cluster
5678                    .bound_objects()
5679                    .into_iter()
5680                    .map(|item_id| scx.catalog.get_item(item_id))
5681                    .filter(|item| !role_ids.contains(&item.owner_id()))
5682                    .collect();
5683                if !non_owned_bound_objects.is_empty() {
5684                    let names: Vec<_> = non_owned_bound_objects
5685                        .into_iter()
5686                        .map(|item| {
5687                            (
5688                                item.item_type().to_string(),
5689                                scx.catalog.resolve_full_name(item.name()).to_string(),
5690                            )
5691                        })
5692                        .collect();
5693                    return Err(PlanError::DependentObjectsStillExist {
5694                        object_type: "cluster".to_string(),
5695                        object_name: cluster.name().to_string(),
5696                        dependents: names,
5697                    });
5698                }
5699            }
5700            drop_ids.push(cluster.id().into());
5701        }
5702        update_privilege_revokes(
5703            SystemObjectId::Object(cluster.id().into()),
5704            cluster.privileges(),
5705            &role_ids,
5706            &mut privilege_revokes,
5707        );
5708    }
5709
5710    // Items
5711    for item in scx.catalog.get_items() {
5712        if role_ids.contains(&item.owner_id()) {
5713            if !cascade {
5714                // Checks if any items still depend on this one, returning an error if so.
5715                let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5716                    let non_owned_dependencies: Vec<_> = used_by
5717                        .into_iter()
5718                        .map(|item_id| scx.catalog.get_item(item_id))
5719                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5720                        .filter(|item| !role_ids.contains(&item.owner_id()))
5721                        .collect();
5722                    if !non_owned_dependencies.is_empty() {
5723                        let names: Vec<_> = non_owned_dependencies
5724                            .into_iter()
5725                            .map(|item| {
5726                                let item_typ = item.item_type().to_string();
5727                                let item_name =
5728                                    scx.catalog.resolve_full_name(item.name()).to_string();
5729                                (item_typ, item_name)
5730                            })
5731                            .collect();
5732                        Err(PlanError::DependentObjectsStillExist {
5733                            object_type: item.item_type().to_string(),
5734                            object_name: scx
5735                                .catalog
5736                                .resolve_full_name(item.name())
5737                                .to_string()
5738                                .to_string(),
5739                            dependents: names,
5740                        })
5741                    } else {
5742                        Ok(())
5743                    }
5744                };
5745
5746                // When this item gets dropped it will also drop its progress source, so we need to
5747                // check the users of those.
5748                if let Some(id) = item.progress_id() {
5749                    let progress_item = scx.catalog.get_item(&id);
5750                    check_if_dependents_exist(progress_item.used_by())?;
5751                }
5752                check_if_dependents_exist(item.used_by())?;
5753            }
5754            drop_ids.push(item.id().into());
5755        }
5756        update_privilege_revokes(
5757            SystemObjectId::Object(item.id().into()),
5758            item.privileges(),
5759            &role_ids,
5760            &mut privilege_revokes,
5761        );
5762    }
5763
5764    // Schemas
5765    for schema in scx.catalog.get_schemas() {
5766        if !schema.id().is_temporary() {
5767            if role_ids.contains(&schema.owner_id()) {
5768                if !cascade {
5769                    let non_owned_dependencies: Vec<_> = schema
5770                        .item_ids()
5771                        .map(|item_id| scx.catalog.get_item(&item_id))
5772                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5773                        .filter(|item| !role_ids.contains(&item.owner_id()))
5774                        .collect();
5775                    if !non_owned_dependencies.is_empty() {
5776                        let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5777                        sql_bail!(
5778                            "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5779                            full_schema_name.to_string().quoted()
5780                        );
5781                    }
5782                }
5783                drop_ids.push((*schema.database(), *schema.id()).into())
5784            }
5785            update_privilege_revokes(
5786                SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5787                schema.privileges(),
5788                &role_ids,
5789                &mut privilege_revokes,
5790            );
5791        }
5792    }
5793
5794    // Databases
5795    for database in scx.catalog.get_databases() {
5796        if role_ids.contains(&database.owner_id()) {
5797            if !cascade {
5798                let non_owned_schemas: Vec<_> = database
5799                    .schemas()
5800                    .into_iter()
5801                    .filter(|schema| !role_ids.contains(&schema.owner_id()))
5802                    .collect();
5803                if !non_owned_schemas.is_empty() {
5804                    sql_bail!(
5805                        "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5806                        database.name().quoted(),
5807                    );
5808                }
5809            }
5810            drop_ids.push(database.id().into());
5811        }
5812        update_privilege_revokes(
5813            SystemObjectId::Object(database.id().into()),
5814            database.privileges(),
5815            &role_ids,
5816            &mut privilege_revokes,
5817        );
5818    }
5819
5820    // Network policies
5821    for network_policy in scx.catalog.get_network_policies() {
5822        if role_ids.contains(&network_policy.owner_id()) {
5823            drop_ids.push(ObjectId::NetworkPolicy(network_policy.id()));
5824        }
5825        update_privilege_revokes(
5826            SystemObjectId::Object(ObjectId::NetworkPolicy(network_policy.id())),
5827            network_policy.privileges(),
5828            &role_ids,
5829            &mut privilege_revokes,
5830        );
5831    }
5832
5833    // System
5834    update_privilege_revokes(
5835        SystemObjectId::System,
5836        scx.catalog.get_system_privileges(),
5837        &role_ids,
5838        &mut privilege_revokes,
5839    );
5840
5841    for (default_privilege_object, default_privilege_acl_items) in
5842        scx.catalog.get_default_privileges()
5843    {
5844        for default_privilege_acl_item in default_privilege_acl_items {
5845            if role_ids.contains(&default_privilege_object.role_id)
5846                || role_ids.contains(&default_privilege_acl_item.grantee)
5847            {
5848                default_privilege_revokes.push((
5849                    default_privilege_object.clone(),
5850                    default_privilege_acl_item.clone(),
5851                ));
5852            }
5853        }
5854    }
5855
5856    let drop_ids = scx.catalog.object_dependents(&drop_ids);
5857
5858    let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5859    if !system_ids.is_empty() {
5860        let mut owners = system_ids
5861            .into_iter()
5862            .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5863            .collect::<BTreeSet<_>>()
5864            .into_iter()
5865            .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5866        sql_bail!(
5867            "cannot drop objects owned by role {} because they are required by the database system",
5868            owners.join(", "),
5869        );
5870    }
5871
5872    Ok(Plan::DropOwned(DropOwnedPlan {
5873        role_ids: role_ids.into_iter().collect(),
5874        drop_ids,
5875        privilege_revokes,
5876        default_privilege_revokes,
5877    }))
5878}
5879
5880fn plan_retain_history_option(
5881    scx: &StatementContext,
5882    retain_history: Option<OptionalDuration>,
5883) -> Result<Option<CompactionWindow>, PlanError> {
5884    if let Some(OptionalDuration(lcw)) = retain_history {
5885        Ok(Some(plan_retain_history(scx, lcw)?))
5886    } else {
5887        Ok(None)
5888    }
5889}
5890
5891// Convert a specified RETAIN HISTORY option into a compaction window. `None` corresponds to
5892// `DisableCompaction`. A zero duration will error. This is because the `OptionalDuration` type
5893// already converts the zero duration into `None`. This function must not be called in the `RESET
5894// (RETAIN HISTORY)` path, which should be handled by the outer `Option<OptionalDuration>` being
5895// `None`.
5896fn plan_retain_history(
5897    scx: &StatementContext,
5898    lcw: Option<Duration>,
5899) -> Result<CompactionWindow, PlanError> {
5900    scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
5901    match lcw {
5902        // A zero duration has already been converted to `None` by `OptionalDuration` (and means
5903        // disable compaction), and should never occur here. Furthermore, some things actually do
5904        // break when this is set to real zero:
5905        // https://github.com/MaterializeInc/database-issues/issues/3798.
5906        Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
5907            option_name: "RETAIN HISTORY".to_string(),
5908            err: Box::new(PlanError::Unstructured(
5909                "internal error: unexpectedly zero".to_string(),
5910            )),
5911        }),
5912        Some(duration) => {
5913            // Error if the duration is low and enable_unlimited_retain_history is not set (which
5914            // should only be possible during testing).
5915            if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
5916                && scx
5917                    .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5918                    .is_err()
5919            {
5920                return Err(PlanError::RetainHistoryLow {
5921                    limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
5922                });
5923            }
5924            Ok(duration.try_into()?)
5925        }
5926        // In the past `RETAIN HISTORY FOR '0'` meant disable compaction. Disabling compaction seems
5927        // to be a bad choice, so prevent it.
5928        None => {
5929            if scx
5930                .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5931                .is_err()
5932            {
5933                Err(PlanError::RetainHistoryRequired)
5934            } else {
5935                Ok(CompactionWindow::DisableCompaction)
5936            }
5937        }
5938    }
5939}
5940
5941generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
5942
5943fn plan_index_options(
5944    scx: &StatementContext,
5945    with_opts: Vec<IndexOption<Aug>>,
5946) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
5947    if !with_opts.is_empty() {
5948        // Index options are not durable.
5949        scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
5950    }
5951
5952    let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
5953
5954    let mut out = Vec::with_capacity(1);
5955    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5956        out.push(crate::plan::IndexOption::RetainHistory(cw));
5957    }
5958    Ok(out)
5959}
5960
5961generate_extracted_config!(
5962    TableOption,
5963    (PartitionBy, Vec<Ident>),
5964    (RetainHistory, OptionalDuration),
5965    (RedactedTest, String)
5966);
5967
5968fn plan_table_options(
5969    scx: &StatementContext,
5970    desc: &RelationDesc,
5971    with_opts: Vec<TableOption<Aug>>,
5972) -> Result<Vec<crate::plan::TableOption>, PlanError> {
5973    let TableOptionExtracted {
5974        partition_by,
5975        retain_history,
5976        redacted_test,
5977        ..
5978    }: TableOptionExtracted = with_opts.try_into()?;
5979
5980    if let Some(partition_by) = partition_by {
5981        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
5982        check_partition_by(desc, partition_by)?;
5983    }
5984
5985    if redacted_test.is_some() {
5986        scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
5987    }
5988
5989    let mut out = Vec::with_capacity(1);
5990    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5991        out.push(crate::plan::TableOption::RetainHistory(cw));
5992    }
5993    Ok(out)
5994}
5995
5996pub fn plan_alter_index_options(
5997    scx: &mut StatementContext,
5998    AlterIndexStatement {
5999        index_name,
6000        if_exists,
6001        action,
6002    }: AlterIndexStatement<Aug>,
6003) -> Result<Plan, PlanError> {
6004    let object_type = ObjectType::Index;
6005    match action {
6006        AlterIndexAction::ResetOptions(options) => {
6007            let mut options = options.into_iter();
6008            if let Some(opt) = options.next() {
6009                match opt {
6010                    IndexOptionName::RetainHistory => {
6011                        if options.next().is_some() {
6012                            sql_bail!("RETAIN HISTORY must be only option");
6013                        }
6014                        return alter_retain_history(
6015                            scx,
6016                            object_type,
6017                            if_exists,
6018                            UnresolvedObjectName::Item(index_name),
6019                            None,
6020                        );
6021                    }
6022                }
6023            }
6024            sql_bail!("expected option");
6025        }
6026        AlterIndexAction::SetOptions(options) => {
6027            let mut options = options.into_iter();
6028            if let Some(opt) = options.next() {
6029                match opt.name {
6030                    IndexOptionName::RetainHistory => {
6031                        if options.next().is_some() {
6032                            sql_bail!("RETAIN HISTORY must be only option");
6033                        }
6034                        return alter_retain_history(
6035                            scx,
6036                            object_type,
6037                            if_exists,
6038                            UnresolvedObjectName::Item(index_name),
6039                            opt.value,
6040                        );
6041                    }
6042                }
6043            }
6044            sql_bail!("expected option");
6045        }
6046    }
6047}
6048
6049pub fn describe_alter_cluster_set_options(
6050    _: &StatementContext,
6051    _: AlterClusterStatement<Aug>,
6052) -> Result<StatementDesc, PlanError> {
6053    Ok(StatementDesc::new(None))
6054}
6055
6056pub fn plan_alter_cluster(
6057    scx: &mut StatementContext,
6058    AlterClusterStatement {
6059        name,
6060        action,
6061        if_exists,
6062    }: AlterClusterStatement<Aug>,
6063) -> Result<Plan, PlanError> {
6064    let cluster = match resolve_cluster(scx, &name, if_exists)? {
6065        Some(entry) => entry,
6066        None => {
6067            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6068                name: name.to_ast_string_simple(),
6069                object_type: ObjectType::Cluster,
6070            });
6071
6072            return Ok(Plan::AlterNoop(AlterNoopPlan {
6073                object_type: ObjectType::Cluster,
6074            }));
6075        }
6076    };
6077
6078    let mut options: PlanClusterOption = Default::default();
6079    let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
6080
6081    match action {
6082        AlterClusterAction::SetOptions {
6083            options: set_options,
6084            with_options,
6085        } => {
6086            let ClusterOptionExtracted {
6087                availability_zones,
6088                introspection_debugging,
6089                introspection_interval,
6090                managed,
6091                replicas: replica_defs,
6092                replication_factor,
6093                seen: _,
6094                size,
6095                disk,
6096                schedule,
6097                workload_class,
6098            }: ClusterOptionExtracted = set_options.try_into()?;
6099
6100            if !scx.catalog.active_role_id().is_system() {
6101                if workload_class.is_some() {
6102                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6103                }
6104            }
6105
6106            match managed.unwrap_or_else(|| cluster.is_managed()) {
6107                true => {
6108                    let alter_strategy_extracted =
6109                        ClusterAlterOptionExtracted::try_from(with_options)?;
6110                    alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6111
6112                    match alter_strategy {
6113                        AlterClusterPlanStrategy::None => {}
6114                        _ => {
6115                            scx.require_feature_flag(
6116                                &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6117                            )?;
6118                        }
6119                    }
6120
6121                    if replica_defs.is_some() {
6122                        sql_bail!("REPLICAS not supported for managed clusters");
6123                    }
6124                    if schedule.is_some()
6125                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6126                    {
6127                        scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6128                    }
6129
6130                    if let Some(replication_factor) = replication_factor {
6131                        if schedule.is_some()
6132                            && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6133                        {
6134                            sql_bail!(
6135                                "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6136                            );
6137                        }
6138                        if let Some(current_schedule) = cluster.schedule() {
6139                            if !matches!(current_schedule, ClusterSchedule::Manual) {
6140                                sql_bail!(
6141                                    "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6142                                );
6143                            }
6144                        }
6145
6146                        let internal_replica_count =
6147                            cluster.replicas().iter().filter(|r| r.internal()).count();
6148                        let hypothetical_replica_count =
6149                            internal_replica_count + usize::cast_from(replication_factor);
6150
6151                        // Total number of replicas running is internal replicas
6152                        // + replication factor.
6153                        if contains_single_replica_objects(scx, cluster)
6154                            && hypothetical_replica_count > 1
6155                        {
6156                            return Err(PlanError::CreateReplicaFailStorageObjects {
6157                                current_replica_count: cluster.replica_ids().iter().count(),
6158                                internal_replica_count,
6159                                hypothetical_replica_count,
6160                            });
6161                        }
6162                    } else if alter_strategy.is_some() {
6163                        // AlterClusterPlanStrategies that are not None will standup pending replicas of the new configuration
6164                        // and violate the single replica for sources constraint. If there are any storage objects (sources or sinks) we should
6165                        // just fail.
6166                        let internal_replica_count =
6167                            cluster.replicas().iter().filter(|r| r.internal()).count();
6168                        let hypothetical_replica_count = internal_replica_count * 2;
6169                        if contains_single_replica_objects(scx, cluster) {
6170                            return Err(PlanError::CreateReplicaFailStorageObjects {
6171                                current_replica_count: cluster.replica_ids().iter().count(),
6172                                internal_replica_count,
6173                                hypothetical_replica_count,
6174                            });
6175                        }
6176                    }
6177                }
6178                false => {
6179                    if !alter_strategy.is_none() {
6180                        sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6181                    }
6182                    if availability_zones.is_some() {
6183                        sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6184                    }
6185                    if replication_factor.is_some() {
6186                        sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6187                    }
6188                    if introspection_debugging.is_some() {
6189                        sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6190                    }
6191                    if introspection_interval.is_some() {
6192                        sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6193                    }
6194                    if size.is_some() {
6195                        sql_bail!("SIZE not supported for unmanaged clusters");
6196                    }
6197                    if disk.is_some() {
6198                        sql_bail!("DISK not supported for unmanaged clusters");
6199                    }
6200                    if schedule.is_some()
6201                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6202                    {
6203                        sql_bail!(
6204                            "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6205                        );
6206                    }
6207                    if let Some(current_schedule) = cluster.schedule() {
6208                        if !matches!(current_schedule, ClusterSchedule::Manual)
6209                            && schedule.is_none()
6210                        {
6211                            sql_bail!(
6212                                "when switching a cluster to unmanaged, if the managed \
6213                                cluster's SCHEDULE is anything other than MANUAL, you have to \
6214                                explicitly set the SCHEDULE to MANUAL"
6215                            );
6216                        }
6217                    }
6218                }
6219            }
6220
6221            let mut replicas = vec![];
6222            for ReplicaDefinition { name, options } in
6223                replica_defs.into_iter().flat_map(Vec::into_iter)
6224            {
6225                replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6226            }
6227
6228            if let Some(managed) = managed {
6229                options.managed = AlterOptionParameter::Set(managed);
6230            }
6231            if let Some(replication_factor) = replication_factor {
6232                options.replication_factor = AlterOptionParameter::Set(replication_factor);
6233            }
6234            if let Some(size) = &size {
6235                options.size = AlterOptionParameter::Set(size.clone());
6236            }
6237            if let Some(availability_zones) = availability_zones {
6238                options.availability_zones = AlterOptionParameter::Set(availability_zones);
6239            }
6240            if let Some(introspection_debugging) = introspection_debugging {
6241                options.introspection_debugging =
6242                    AlterOptionParameter::Set(introspection_debugging);
6243            }
6244            if let Some(introspection_interval) = introspection_interval {
6245                options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6246            }
6247            if disk.is_some() {
6248                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
6249                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
6250                // we'll be able to remove the `DISK` option entirely.
6251                let size = match size.as_deref() {
6252                    Some(s) => s,
6253                    None => cluster
6254                        .managed_size()
6255                        .ok_or_else(|| sql_err!("cluster is not managed"))?,
6256                };
6257                if scx.catalog.is_cluster_size_cc(size) {
6258                    sql_bail!(
6259                        "DISK option not supported for modern cluster sizes because disk is always enabled"
6260                    );
6261                }
6262
6263                scx.catalog
6264                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6265            }
6266            if !replicas.is_empty() {
6267                options.replicas = AlterOptionParameter::Set(replicas);
6268            }
6269            if let Some(schedule) = schedule {
6270                options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6271            }
6272            if let Some(workload_class) = workload_class {
6273                options.workload_class = AlterOptionParameter::Set(workload_class.0);
6274            }
6275        }
6276        AlterClusterAction::ResetOptions(reset_options) => {
6277            use AlterOptionParameter::Reset;
6278            use ClusterOptionName::*;
6279
6280            if !scx.catalog.active_role_id().is_system() {
6281                if reset_options.contains(&WorkloadClass) {
6282                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6283                }
6284            }
6285
6286            for option in reset_options {
6287                match option {
6288                    AvailabilityZones => options.availability_zones = Reset,
6289                    Disk => scx
6290                        .catalog
6291                        .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6292                    IntrospectionInterval => options.introspection_interval = Reset,
6293                    IntrospectionDebugging => options.introspection_debugging = Reset,
6294                    Managed => options.managed = Reset,
6295                    Replicas => options.replicas = Reset,
6296                    ReplicationFactor => options.replication_factor = Reset,
6297                    Size => options.size = Reset,
6298                    Schedule => options.schedule = Reset,
6299                    WorkloadClass => options.workload_class = Reset,
6300                }
6301            }
6302        }
6303    }
6304    Ok(Plan::AlterCluster(AlterClusterPlan {
6305        id: cluster.id(),
6306        name: cluster.name().to_string(),
6307        options,
6308        strategy: alter_strategy,
6309    }))
6310}
6311
6312pub fn describe_alter_set_cluster(
6313    _: &StatementContext,
6314    _: AlterSetClusterStatement<Aug>,
6315) -> Result<StatementDesc, PlanError> {
6316    Ok(StatementDesc::new(None))
6317}
6318
6319pub fn plan_alter_item_set_cluster(
6320    scx: &StatementContext,
6321    AlterSetClusterStatement {
6322        if_exists,
6323        set_cluster: in_cluster_name,
6324        name,
6325        object_type,
6326    }: AlterSetClusterStatement<Aug>,
6327) -> Result<Plan, PlanError> {
6328    scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6329
6330    let object_type = object_type.into();
6331
6332    // Prevent access to `SET CLUSTER` for unsupported objects.
6333    match object_type {
6334        ObjectType::MaterializedView => {}
6335        ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6336            bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6337        }
6338        ObjectType::Table
6339        | ObjectType::View
6340        | ObjectType::Type
6341        | ObjectType::Role
6342        | ObjectType::Cluster
6343        | ObjectType::ClusterReplica
6344        | ObjectType::Secret
6345        | ObjectType::Connection
6346        | ObjectType::Database
6347        | ObjectType::Schema
6348        | ObjectType::Func
6349        | ObjectType::NetworkPolicy => {
6350            bail_never_supported!(
6351                format!("ALTER {object_type} SET CLUSTER"),
6352                "sql/alter-set-cluster/",
6353                format!("{object_type} has no associated cluster")
6354            )
6355        }
6356    }
6357
6358    let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6359
6360    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6361        Some(entry) => {
6362            let current_cluster = entry.cluster_id();
6363            let Some(current_cluster) = current_cluster else {
6364                sql_bail!("No cluster associated with {name}");
6365            };
6366
6367            if current_cluster == in_cluster.id() {
6368                Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6369            } else {
6370                Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6371                    id: entry.id(),
6372                    set_cluster: in_cluster.id(),
6373                }))
6374            }
6375        }
6376        None => {
6377            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6378                name: name.to_ast_string_simple(),
6379                object_type,
6380            });
6381
6382            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6383        }
6384    }
6385}
6386
6387pub fn describe_alter_object_rename(
6388    _: &StatementContext,
6389    _: AlterObjectRenameStatement,
6390) -> Result<StatementDesc, PlanError> {
6391    Ok(StatementDesc::new(None))
6392}
6393
6394pub fn plan_alter_object_rename(
6395    scx: &mut StatementContext,
6396    AlterObjectRenameStatement {
6397        name,
6398        object_type,
6399        to_item_name,
6400        if_exists,
6401    }: AlterObjectRenameStatement,
6402) -> Result<Plan, PlanError> {
6403    let object_type = object_type.into();
6404    match (object_type, name) {
6405        (
6406            ObjectType::View
6407            | ObjectType::MaterializedView
6408            | ObjectType::Table
6409            | ObjectType::Source
6410            | ObjectType::Index
6411            | ObjectType::Sink
6412            | ObjectType::Secret
6413            | ObjectType::Connection,
6414            UnresolvedObjectName::Item(name),
6415        ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6416        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6417            plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6418        }
6419        (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6420            plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6421        }
6422        (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6423            plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6424        }
6425        (object_type, name) => {
6426            // The earlier dispatch + name resolution should make this
6427            // combination impossible.
6428            bail_internal!("invalid object type '{object_type}' for ALTER RENAME with name {name}")
6429        }
6430    }
6431}
6432
6433pub fn plan_alter_schema_rename(
6434    scx: &mut StatementContext,
6435    name: UnresolvedSchemaName,
6436    to_schema_name: Ident,
6437    if_exists: bool,
6438) -> Result<Plan, PlanError> {
6439    // Special case for mz_temp: with lazy temporary schema creation, the temp
6440    // schema may not exist yet, but we still need to return the correct error.
6441    // Check the schema name directly against MZ_TEMP_SCHEMA.
6442    let normalized = normalize::unresolved_schema_name(name.clone())?;
6443    if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6444        sql_bail!(
6445            "cannot rename schemas in the ambient database: {:?}",
6446            mz_repr::namespaces::MZ_TEMP_SCHEMA
6447        );
6448    }
6449
6450    let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6451        let object_type = ObjectType::Schema;
6452        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6453            name: name.to_ast_string_simple(),
6454            object_type,
6455        });
6456        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6457    };
6458
6459    // Make sure the name is unique.
6460    if scx
6461        .resolve_schema_in_database(&db_spec, &to_schema_name)
6462        .is_ok()
6463    {
6464        return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6465            to_schema_name.clone().into_string(),
6466        )));
6467    }
6468
6469    // Prevent users from renaming system related schemas.
6470    let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6471    if schema.id().is_system() {
6472        bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6473    }
6474
6475    Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6476        cur_schema_spec: (db_spec, schema_spec),
6477        new_schema_name: to_schema_name.into_string(),
6478    }))
6479}
6480
6481pub fn plan_alter_schema_swap<F>(
6482    scx: &mut StatementContext,
6483    name_a: UnresolvedSchemaName,
6484    name_b: Ident,
6485    if_exists: bool,
6486    gen_temp_suffix: F,
6487) -> Result<Plan, PlanError>
6488where
6489    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6490{
6491    // Special case for mz_temp: with lazy temporary schema creation, the temp
6492    // schema may not exist yet, but we still need to return the correct error.
6493    // Check the schema name directly against MZ_TEMP_SCHEMA.
6494    let normalized_a = normalize::unresolved_schema_name(name_a.clone())?;
6495    if normalized_a.database.is_none() && normalized_a.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA
6496    {
6497        sql_bail!("cannot swap schemas that are in the ambient database");
6498    }
6499    // Also check name_b (the target schema name)
6500    let name_b_str = normalize::ident_ref(&name_b);
6501    if name_b_str == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6502        sql_bail!("cannot swap schemas that are in the ambient database");
6503    }
6504
6505    let schema_a = match scx.resolve_schema(name_a.clone()) {
6506        Ok(schema) => schema,
6507        Err(_) if if_exists => {
6508            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6509                name: name_a.to_ast_string_simple(),
6510                object_type: ObjectType::Schema,
6511            });
6512            return Ok(Plan::AlterNoop(AlterNoopPlan {
6513                object_type: ObjectType::Schema,
6514            }));
6515        }
6516        Err(e) => return Err(e),
6517    };
6518
6519    let db_spec = schema_a.database().clone();
6520    if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6521        sql_bail!("cannot swap schemas that are in the ambient database");
6522    };
6523    let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6524
6525    // We cannot swap system schemas.
6526    if schema_a.id().is_system() || schema_b.id().is_system() {
6527        bail_never_supported!("swapping a system schema".to_string())
6528    }
6529
6530    // Generate a temporary name we can swap schema_a to.
6531    //
6532    // 'check' returns if the temp schema name would be valid.
6533    const SCHEMA_SWAP_PREFIX: &str = "mz_schema_swap_";
6534    let check = |temp_suffix: &str| {
6535        let mut temp_name = ident!(SCHEMA_SWAP_PREFIX);
6536        temp_name.append_lossy(temp_suffix);
6537        scx.resolve_schema_in_database(&db_spec, &temp_name)
6538            .is_err()
6539    };
6540    let temp_suffix = gen_temp_suffix(&check)?;
6541    let name_temp = format!("{SCHEMA_SWAP_PREFIX}{temp_suffix}");
6542
6543    Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6544        schema_a_spec: (*schema_a.database(), *schema_a.id()),
6545        schema_a_name: schema_a.name().schema.to_string(),
6546        schema_b_spec: (*schema_b.database(), *schema_b.id()),
6547        schema_b_name: schema_b.name().schema.to_string(),
6548        name_temp,
6549    }))
6550}
6551
6552pub fn plan_alter_item_rename(
6553    scx: &mut StatementContext,
6554    object_type: ObjectType,
6555    name: UnresolvedItemName,
6556    to_item_name: Ident,
6557    if_exists: bool,
6558) -> Result<Plan, PlanError> {
6559    let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6560        Ok(r) => r,
6561        // Return a more helpful error on `DROP VIEW <materialized-view>`.
6562        Err(PlanError::MismatchedObjectType {
6563            name,
6564            is_type: ObjectType::MaterializedView,
6565            expected_type: ObjectType::View,
6566        }) => {
6567            return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6568        }
6569        e => e?,
6570    };
6571
6572    match resolved {
6573        Some(entry) => {
6574            let full_name = scx.catalog.resolve_full_name(entry.name());
6575            let item_type = entry.item_type();
6576
6577            let proposed_name = QualifiedItemName {
6578                qualifiers: entry.name().qualifiers.clone(),
6579                item: to_item_name.clone().into_string(),
6580            };
6581
6582            // For PostgreSQL compatibility, items and types cannot have
6583            // overlapping names in a variety of situations. See the comment on
6584            // `CatalogItemType::conflicts_with_type` for details.
6585            let conflicting_type_exists;
6586            let conflicting_item_exists;
6587            if item_type == CatalogItemType::Type {
6588                conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6589                conflicting_item_exists = scx
6590                    .catalog
6591                    .get_item_by_name(&proposed_name)
6592                    .map(|item| item.item_type().conflicts_with_type())
6593                    .unwrap_or(false);
6594            } else {
6595                conflicting_type_exists = item_type.conflicts_with_type()
6596                    && scx.catalog.get_type_by_name(&proposed_name).is_some();
6597                conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6598            };
6599            if conflicting_type_exists || conflicting_item_exists {
6600                sql_bail!("catalog item '{}' already exists", to_item_name);
6601            }
6602
6603            Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6604                id: entry.id(),
6605                current_full_name: full_name,
6606                to_name: normalize::ident(to_item_name),
6607                object_type,
6608            }))
6609        }
6610        None => {
6611            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6612                name: name.to_ast_string_simple(),
6613                object_type,
6614            });
6615
6616            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6617        }
6618    }
6619}
6620
6621pub fn plan_alter_cluster_rename(
6622    scx: &mut StatementContext,
6623    object_type: ObjectType,
6624    name: Ident,
6625    to_name: Ident,
6626    if_exists: bool,
6627) -> Result<Plan, PlanError> {
6628    match resolve_cluster(scx, &name, if_exists)? {
6629        Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6630            id: entry.id(),
6631            name: entry.name().to_string(),
6632            to_name: ident(to_name),
6633        })),
6634        None => {
6635            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6636                name: name.to_ast_string_simple(),
6637                object_type,
6638            });
6639
6640            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6641        }
6642    }
6643}
6644
6645pub fn plan_alter_cluster_swap<F>(
6646    scx: &mut StatementContext,
6647    name_a: Ident,
6648    name_b: Ident,
6649    if_exists: bool,
6650    gen_temp_suffix: F,
6651) -> Result<Plan, PlanError>
6652where
6653    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6654{
6655    let cluster_a = match scx.resolve_cluster(Some(&name_a)) {
6656        Ok(cluster) => cluster,
6657        Err(_) if if_exists => {
6658            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6659                name: name_a.to_ast_string_simple(),
6660                object_type: ObjectType::Cluster,
6661            });
6662            return Ok(Plan::AlterNoop(AlterNoopPlan {
6663                object_type: ObjectType::Cluster,
6664            }));
6665        }
6666        Err(e) => return Err(e),
6667    };
6668    let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6669
6670    const CLUSTER_SWAP_PREFIX: &str = "mz_cluster_swap_";
6671    let check = |temp_suffix: &str| {
6672        let mut temp_name = ident!(CLUSTER_SWAP_PREFIX);
6673        temp_name.append_lossy(temp_suffix);
6674        match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6675            // Temp name does not exist, so we can use it.
6676            Err(CatalogError::UnknownCluster(_)) => true,
6677            // Temp name already exists!
6678            Ok(_) | Err(_) => false,
6679        }
6680    };
6681    let temp_suffix = gen_temp_suffix(&check)?;
6682    let name_temp = format!("{CLUSTER_SWAP_PREFIX}{temp_suffix}");
6683
6684    Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6685        id_a: cluster_a.id(),
6686        id_b: cluster_b.id(),
6687        name_a: name_a.into_string(),
6688        name_b: name_b.into_string(),
6689        name_temp,
6690    }))
6691}
6692
6693pub fn plan_alter_cluster_replica_rename(
6694    scx: &mut StatementContext,
6695    object_type: ObjectType,
6696    name: QualifiedReplica,
6697    to_item_name: Ident,
6698    if_exists: bool,
6699) -> Result<Plan, PlanError> {
6700    match resolve_cluster_replica(scx, &name, if_exists)? {
6701        Some((cluster, replica)) => {
6702            ensure_cluster_is_not_managed(scx, cluster.id())?;
6703            Ok(Plan::AlterClusterReplicaRename(
6704                AlterClusterReplicaRenamePlan {
6705                    cluster_id: cluster.id(),
6706                    replica_id: replica,
6707                    name: QualifiedReplica {
6708                        cluster: Ident::new(cluster.name())?,
6709                        replica: name.replica,
6710                    },
6711                    to_name: normalize::ident(to_item_name),
6712                },
6713            ))
6714        }
6715        None => {
6716            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6717                name: name.to_ast_string_simple(),
6718                object_type,
6719            });
6720
6721            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6722        }
6723    }
6724}
6725
6726pub fn describe_alter_object_swap(
6727    _: &StatementContext,
6728    _: AlterObjectSwapStatement,
6729) -> Result<StatementDesc, PlanError> {
6730    Ok(StatementDesc::new(None))
6731}
6732
6733pub fn plan_alter_object_swap(
6734    scx: &mut StatementContext,
6735    stmt: AlterObjectSwapStatement,
6736) -> Result<Plan, PlanError> {
6737    scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6738
6739    let AlterObjectSwapStatement {
6740        object_type,
6741        if_exists,
6742        name_a,
6743        name_b,
6744    } = stmt;
6745    let object_type = object_type.into();
6746
6747    // We'll try 10 times to generate a temporary suffix.
6748    let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6749        let mut attempts = 0;
6750        let name_temp = loop {
6751            attempts += 1;
6752            if attempts > 10 {
6753                tracing::warn!("Unable to generate temp id for swapping");
6754                sql_bail!("unable to swap!");
6755            }
6756
6757            // Call the provided closure to make sure this name is unique!
6758            let short_id = mz_ore::id_gen::temp_id();
6759            if check_fn(&short_id) {
6760                break short_id;
6761            }
6762        };
6763
6764        Ok(name_temp)
6765    };
6766
6767    match (object_type, name_a, name_b) {
6768        (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6769            plan_alter_schema_swap(scx, name_a, name_b, if_exists, gen_temp_suffix)
6770        }
6771        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6772            plan_alter_cluster_swap(scx, name_a, name_b, if_exists, gen_temp_suffix)
6773        }
6774        (ObjectType::Schema | ObjectType::Cluster, _, _) => {
6775            bail_internal!("name type does not match object type for ALTER SWAP")
6776        }
6777        (
6778            ObjectType::Table
6779            | ObjectType::View
6780            | ObjectType::MaterializedView
6781            | ObjectType::Source
6782            | ObjectType::Sink
6783            | ObjectType::Index
6784            | ObjectType::Type
6785            | ObjectType::Role
6786            | ObjectType::ClusterReplica
6787            | ObjectType::Secret
6788            | ObjectType::Connection
6789            | ObjectType::Database
6790            | ObjectType::Func
6791            | ObjectType::NetworkPolicy,
6792            _,
6793            _,
6794        ) => Err(PlanError::Unsupported {
6795            feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6796            discussion_no: None,
6797        }),
6798    }
6799}
6800
6801pub fn describe_alter_retain_history(
6802    _: &StatementContext,
6803    _: AlterRetainHistoryStatement<Aug>,
6804) -> Result<StatementDesc, PlanError> {
6805    Ok(StatementDesc::new(None))
6806}
6807
6808pub fn plan_alter_retain_history(
6809    scx: &StatementContext,
6810    AlterRetainHistoryStatement {
6811        object_type,
6812        if_exists,
6813        name,
6814        history,
6815    }: AlterRetainHistoryStatement<Aug>,
6816) -> Result<Plan, PlanError> {
6817    alter_retain_history(scx, object_type.into(), if_exists, name, history)
6818}
6819
6820fn alter_retain_history(
6821    scx: &StatementContext,
6822    object_type: ObjectType,
6823    if_exists: bool,
6824    name: UnresolvedObjectName,
6825    history: Option<WithOptionValue<Aug>>,
6826) -> Result<Plan, PlanError> {
6827    let name = match (object_type, name) {
6828        (
6829            // View gets a special error below.
6830            ObjectType::View
6831            | ObjectType::MaterializedView
6832            | ObjectType::Table
6833            | ObjectType::Source
6834            | ObjectType::Index,
6835            UnresolvedObjectName::Item(name),
6836        ) => name,
6837        (object_type, _) => {
6838            bail_unsupported!(format!("RETAIN HISTORY on {object_type}"))
6839        }
6840    };
6841    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6842        Some(entry) => {
6843            let full_name = scx.catalog.resolve_full_name(entry.name());
6844            let item_type = entry.item_type();
6845
6846            // Return a more helpful error on `ALTER VIEW <materialized-view>`.
6847            if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6848                return Err(PlanError::AlterViewOnMaterializedView(
6849                    full_name.to_string(),
6850                ));
6851            } else if object_type == ObjectType::View {
6852                sql_bail!("{object_type} does not support RETAIN HISTORY")
6853            } else if object_type != item_type {
6854                sql_bail!(
6855                    "\"{}\" is a {} not a {}",
6856                    full_name,
6857                    entry.item_type(),
6858                    format!("{object_type}").to_lowercase()
6859                )
6860            }
6861
6862            // Save the original value so we can write it back down in the create_sql catalog item.
6863            let (value, lcw) = match &history {
6864                Some(WithOptionValue::RetainHistoryFor(value)) => {
6865                    let window = OptionalDuration::try_from_value(value.clone())?;
6866                    (Some(value.clone()), window.0)
6867                }
6868                // None is RESET, so use the default CW.
6869                None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6870                _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6871            };
6872            let window = plan_retain_history(scx, lcw)?;
6873
6874            Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6875                id: entry.id(),
6876                value,
6877                window,
6878                object_type,
6879            }))
6880        }
6881        None => {
6882            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6883                name: name.to_ast_string_simple(),
6884                object_type,
6885            });
6886
6887            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6888        }
6889    }
6890}
6891
6892fn alter_source_timestamp_interval(
6893    scx: &StatementContext,
6894    if_exists: bool,
6895    source_name: UnresolvedItemName,
6896    value: Option<WithOptionValue<Aug>>,
6897) -> Result<Plan, PlanError> {
6898    let object_type = ObjectType::Source;
6899    match resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)? {
6900        Some(entry) => {
6901            let full_name = scx.catalog.resolve_full_name(entry.name());
6902            if entry.item_type() != CatalogItemType::Source {
6903                sql_bail!(
6904                    "\"{}\" is a {} not a {}",
6905                    full_name,
6906                    entry.item_type(),
6907                    format!("{object_type}").to_lowercase()
6908                )
6909            }
6910
6911            match value {
6912                Some(val) => {
6913                    let val = match val {
6914                        WithOptionValue::Value(v) => v,
6915                        _ => sql_bail!("TIMESTAMP INTERVAL requires an interval value"),
6916                    };
6917                    let duration = Duration::try_from_value(val.clone())?;
6918
6919                    let min = scx.catalog.system_vars().min_timestamp_interval();
6920                    let max = scx.catalog.system_vars().max_timestamp_interval();
6921                    if duration < min || duration > max {
6922                        return Err(PlanError::InvalidTimestampInterval {
6923                            min,
6924                            max,
6925                            requested: duration,
6926                        });
6927                    }
6928
6929                    Ok(Plan::AlterSourceTimestampInterval(
6930                        AlterSourceTimestampIntervalPlan {
6931                            id: entry.id(),
6932                            value: Some(val),
6933                            interval: duration,
6934                        },
6935                    ))
6936                }
6937                None => {
6938                    let interval = scx.catalog.system_vars().default_timestamp_interval();
6939                    Ok(Plan::AlterSourceTimestampInterval(
6940                        AlterSourceTimestampIntervalPlan {
6941                            id: entry.id(),
6942                            value: None,
6943                            interval,
6944                        },
6945                    ))
6946                }
6947            }
6948        }
6949        None => {
6950            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6951                name: source_name.to_ast_string_simple(),
6952                object_type,
6953            });
6954
6955            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6956        }
6957    }
6958}
6959
6960pub fn describe_alter_secret_options(
6961    _: &StatementContext,
6962    _: AlterSecretStatement<Aug>,
6963) -> Result<StatementDesc, PlanError> {
6964    Ok(StatementDesc::new(None))
6965}
6966
6967pub fn plan_alter_secret(
6968    scx: &mut StatementContext,
6969    stmt: AlterSecretStatement<Aug>,
6970) -> Result<Plan, PlanError> {
6971    let AlterSecretStatement {
6972        name,
6973        if_exists,
6974        value,
6975    } = stmt;
6976    let object_type = ObjectType::Secret;
6977    let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6978        Some(entry) => entry.id(),
6979        None => {
6980            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6981                name: name.to_string(),
6982                object_type,
6983            });
6984
6985            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6986        }
6987    };
6988
6989    let secret_as = query::plan_secret_as(scx, value)?;
6990
6991    Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
6992}
6993
6994pub fn describe_alter_connection(
6995    _: &StatementContext,
6996    _: AlterConnectionStatement<Aug>,
6997) -> Result<StatementDesc, PlanError> {
6998    Ok(StatementDesc::new(None))
6999}
7000
7001generate_extracted_config!(AlterConnectionOption, (Validate, bool));
7002
7003pub fn plan_alter_connection(
7004    scx: &StatementContext,
7005    stmt: AlterConnectionStatement<Aug>,
7006) -> Result<Plan, PlanError> {
7007    let AlterConnectionStatement {
7008        name,
7009        if_exists,
7010        actions,
7011        with_options,
7012    } = stmt;
7013    let conn_name = normalize::unresolved_item_name(name)?;
7014    let entry = match scx.catalog.resolve_item(&conn_name) {
7015        Ok(entry) => entry,
7016        Err(_) if if_exists => {
7017            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7018                name: conn_name.to_string(),
7019                object_type: ObjectType::Sink,
7020            });
7021
7022            return Ok(Plan::AlterNoop(AlterNoopPlan {
7023                object_type: ObjectType::Connection,
7024            }));
7025        }
7026        Err(e) => return Err(e.into()),
7027    };
7028
7029    let connection = entry.connection()?;
7030
7031    if actions
7032        .iter()
7033        .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
7034    {
7035        if actions.len() > 1 {
7036            sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
7037        }
7038
7039        if !with_options.is_empty() {
7040            sql_bail!(
7041                "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
7042                with_options
7043                    .iter()
7044                    .map(|o| o.to_ast_string_simple())
7045                    .join(", ")
7046            );
7047        }
7048
7049        if !matches!(connection, Connection::Ssh(_)) {
7050            sql_bail!(
7051                "{} is not an SSH connection",
7052                scx.catalog.resolve_full_name(entry.name())
7053            )
7054        }
7055
7056        return Ok(Plan::AlterConnection(AlterConnectionPlan {
7057            id: entry.id(),
7058            action: crate::plan::AlterConnectionAction::RotateKeys,
7059        }));
7060    }
7061
7062    let options = AlterConnectionOptionExtracted::try_from(with_options)?;
7063    if options.validate.is_some() {
7064        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
7065    }
7066
7067    let validate = match options.validate {
7068        Some(val) => val,
7069        None => {
7070            scx.catalog
7071                .system_vars()
7072                .enable_default_connection_validation()
7073                && connection.validate_by_default()
7074        }
7075    };
7076
7077    let connection_type = match connection {
7078        Connection::Aws(_) => CreateConnectionType::Aws,
7079        Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
7080        Connection::Kafka(_) => CreateConnectionType::Kafka,
7081        Connection::Csr(_) => CreateConnectionType::Csr,
7082        Connection::Postgres(_) => CreateConnectionType::Postgres,
7083        Connection::Ssh(_) => CreateConnectionType::Ssh,
7084        Connection::MySql(_) => CreateConnectionType::MySql,
7085        Connection::SqlServer(_) => CreateConnectionType::SqlServer,
7086        Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
7087    };
7088
7089    // Collect all options irrespective of action taken on them.
7090    let specified_options: BTreeSet<_> = actions
7091        .iter()
7092        .map(|action: &AlterConnectionAction<Aug>| match action {
7093            AlterConnectionAction::SetOption(option) => Ok(option.name.clone()),
7094            AlterConnectionAction::DropOption(name) => Ok(name.clone()),
7095            AlterConnectionAction::RotateKeys => {
7096                Err(internal_err!("RotateKeys is handled separately above"))
7097            }
7098        })
7099        .collect::<Result<_, PlanError>>()?;
7100
7101    for invalid in INALTERABLE_OPTIONS {
7102        if specified_options.contains(invalid) {
7103            sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
7104        }
7105    }
7106
7107    connection::validate_options_per_connection_type(connection_type, specified_options)?;
7108
7109    // Partition operations into set and drop.
7110    let mut set_options_vec: Vec<_> = Vec::new();
7111    let mut drop_options: BTreeSet<_> = BTreeSet::new();
7112    for action in actions {
7113        match action {
7114            AlterConnectionAction::SetOption(option) => set_options_vec.push(option),
7115            AlterConnectionAction::DropOption(name) => {
7116                drop_options.insert(name);
7117            }
7118            AlterConnectionAction::RotateKeys => {
7119                bail_internal!("RotateKeys is handled separately above")
7120            }
7121        }
7122    }
7123
7124    let set_options: BTreeMap<_, _> = set_options_vec
7125        .clone()
7126        .into_iter()
7127        .map(|option| (option.name, option.value))
7128        .collect();
7129
7130    // Type check values + avoid duplicates; we don't want to e.g. let users
7131    // drop and set the same option in the same statement, so treating drops as
7132    // sets here is fine.
7133    let connection_options_extracted =
7134        connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
7135
7136    let duplicates: Vec<_> = connection_options_extracted
7137        .seen
7138        .intersection(&drop_options)
7139        .collect();
7140
7141    if !duplicates.is_empty() {
7142        sql_bail!(
7143            "cannot both SET and DROP/RESET options {}",
7144            duplicates
7145                .iter()
7146                .map(|option| option.to_string())
7147                .join(", ")
7148        )
7149    }
7150
7151    for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
7152        let set_options_count = mutually_exclusive_options
7153            .iter()
7154            .filter(|o| set_options.contains_key(o))
7155            .count();
7156        let drop_options_count = mutually_exclusive_options
7157            .iter()
7158            .filter(|o| drop_options.contains(o))
7159            .count();
7160
7161        // Disallow setting _and_ resetting mutually exclusive options
7162        if set_options_count > 0 && drop_options_count > 0 {
7163            sql_bail!(
7164                "cannot both SET and DROP/RESET mutually exclusive {} options {}",
7165                connection_type,
7166                mutually_exclusive_options
7167                    .iter()
7168                    .map(|option| option.to_string())
7169                    .join(", ")
7170            )
7171        }
7172
7173        // If any option is either set or dropped, ensure all mutually exclusive
7174        // options are dropped. We do this "behind the scenes", even though we
7175        // disallow users from performing the same action because this is the
7176        // mechanism by which we overwrite values elsewhere in the code.
7177        if set_options_count > 0 || drop_options_count > 0 {
7178            drop_options.extend(mutually_exclusive_options.iter().cloned());
7179        }
7180
7181        // n.b. if mutually exclusive options are set, those will error when we
7182        // try to replan the connection.
7183    }
7184
7185    Ok(Plan::AlterConnection(AlterConnectionPlan {
7186        id: entry.id(),
7187        action: crate::plan::AlterConnectionAction::AlterOptions {
7188            set_options,
7189            drop_options,
7190            validate,
7191        },
7192    }))
7193}
7194
7195pub fn describe_alter_sink(
7196    _: &StatementContext,
7197    _: AlterSinkStatement<Aug>,
7198) -> Result<StatementDesc, PlanError> {
7199    Ok(StatementDesc::new(None))
7200}
7201
7202pub fn plan_alter_sink(
7203    scx: &mut StatementContext,
7204    stmt: AlterSinkStatement<Aug>,
7205) -> Result<Plan, PlanError> {
7206    let AlterSinkStatement {
7207        sink_name,
7208        if_exists,
7209        action,
7210    } = stmt;
7211
7212    let object_type = ObjectType::Sink;
7213    let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
7214
7215    let Some(item) = item else {
7216        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7217            name: sink_name.to_string(),
7218            object_type,
7219        });
7220
7221        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7222    };
7223    // Always ALTER objects from their latest version.
7224    let item = item.at_version(RelationVersionSelector::Latest);
7225
7226    match action {
7227        AlterSinkAction::ChangeRelation(new_from) => {
7228            // First we reconstruct the original CREATE SINK statement
7229            let create_sql = item.create_sql();
7230            let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
7231            let [stmt]: [StatementParseResult; 1] = stmts
7232                .try_into()
7233                .map_err(|_| internal_err!("create SQL of sink was not exactly one statement"))?;
7234            let Statement::CreateSink(stmt) = stmt.ast else {
7235                bail_internal!("create SQL of sink is not a CREATE SINK statement");
7236            };
7237
7238            // Then resolve and swap the resolved from relation to the new one
7239            let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7240            stmt.from = new_from;
7241
7242            // Finally re-plan the modified create sink statement to verify the new configuration is valid
7243            let Plan::CreateSink(mut plan) = plan_sink(scx, stmt)? else {
7244                bail_internal!("plan_sink did not produce a CreateSink plan");
7245            };
7246
7247            plan.sink.version += 1;
7248
7249            Ok(Plan::AlterSink(AlterSinkPlan {
7250                item_id: item.id(),
7251                global_id: item.global_id(),
7252                sink: plan.sink,
7253                with_snapshot: plan.with_snapshot,
7254                in_cluster: plan.in_cluster,
7255            }))
7256        }
7257        AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7258        AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7259    }
7260}
7261
7262pub fn describe_alter_source(
7263    _: &StatementContext,
7264    _: AlterSourceStatement<Aug>,
7265) -> Result<StatementDesc, PlanError> {
7266    // TODO: put the options here, right?
7267    Ok(StatementDesc::new(None))
7268}
7269
7270generate_extracted_config!(
7271    AlterSourceAddSubsourceOption,
7272    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7273    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7274    (Details, String)
7275);
7276
7277pub fn plan_alter_source(
7278    scx: &mut StatementContext,
7279    stmt: AlterSourceStatement<Aug>,
7280) -> Result<Plan, PlanError> {
7281    let AlterSourceStatement {
7282        source_name,
7283        if_exists,
7284        action,
7285    } = stmt;
7286    let object_type = ObjectType::Source;
7287
7288    if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7289        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7290            name: source_name.to_string(),
7291            object_type,
7292        });
7293
7294        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7295    }
7296
7297    match action {
7298        AlterSourceAction::SetOptions(options) => {
7299            let mut options = options.into_iter();
7300            let option = options
7301                .next()
7302                .ok_or_else(|| sql_err!("ALTER SOURCE SET requires at least one option"))?;
7303            if option.name == CreateSourceOptionName::RetainHistory {
7304                if options.next().is_some() {
7305                    sql_bail!("RETAIN HISTORY must be only option");
7306                }
7307                return alter_retain_history(
7308                    scx,
7309                    object_type,
7310                    if_exists,
7311                    UnresolvedObjectName::Item(source_name),
7312                    option.value,
7313                );
7314            }
7315            if option.name == CreateSourceOptionName::TimestampInterval {
7316                if options.next().is_some() {
7317                    sql_bail!("TIMESTAMP INTERVAL must be only option");
7318                }
7319                return alter_source_timestamp_interval(scx, if_exists, source_name, option.value);
7320            }
7321            // n.b we use this statement in purification in a way that cannot be
7322            // planned directly.
7323            sql_bail!(
7324                "Cannot modify the {} of a SOURCE.",
7325                option.name.to_ast_string_simple()
7326            );
7327        }
7328        AlterSourceAction::ResetOptions(reset) => {
7329            let mut options = reset.into_iter();
7330            let option = options
7331                .next()
7332                .ok_or_else(|| sql_err!("ALTER SOURCE RESET requires at least one option"))?;
7333            if option == CreateSourceOptionName::RetainHistory {
7334                if options.next().is_some() {
7335                    sql_bail!("RETAIN HISTORY must be only option");
7336                }
7337                return alter_retain_history(
7338                    scx,
7339                    object_type,
7340                    if_exists,
7341                    UnresolvedObjectName::Item(source_name),
7342                    None,
7343                );
7344            }
7345            if option == CreateSourceOptionName::TimestampInterval {
7346                if options.next().is_some() {
7347                    sql_bail!("TIMESTAMP INTERVAL must be only option");
7348                }
7349                return alter_source_timestamp_interval(scx, if_exists, source_name, None);
7350            }
7351            sql_bail!(
7352                "Cannot modify the {} of a SOURCE.",
7353                option.to_ast_string_simple()
7354            );
7355        }
7356        AlterSourceAction::DropSubsources { .. } => {
7357            sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7358        }
7359        AlterSourceAction::AddSubsources { .. } => {
7360            sql_bail!("ALTER SOURCE...ADD SUBSOURCE must be purified before planning")
7361        }
7362        AlterSourceAction::RefreshReferences => {
7363            sql_bail!("ALTER SOURCE...REFRESH REFERENCES must be purified before planning")
7364        }
7365    };
7366}
7367
7368pub fn describe_alter_system_set(
7369    _: &StatementContext,
7370    _: AlterSystemSetStatement,
7371) -> Result<StatementDesc, PlanError> {
7372    Ok(StatementDesc::new(None))
7373}
7374
7375pub fn plan_alter_system_set(
7376    _: &StatementContext,
7377    AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7378) -> Result<Plan, PlanError> {
7379    let name = name.to_string();
7380    Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7381        name,
7382        value: scl::plan_set_variable_to(to)?,
7383    }))
7384}
7385
7386pub fn describe_alter_system_reset(
7387    _: &StatementContext,
7388    _: AlterSystemResetStatement,
7389) -> Result<StatementDesc, PlanError> {
7390    Ok(StatementDesc::new(None))
7391}
7392
7393pub fn plan_alter_system_reset(
7394    _: &StatementContext,
7395    AlterSystemResetStatement { name }: AlterSystemResetStatement,
7396) -> Result<Plan, PlanError> {
7397    let name = name.to_string();
7398    Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7399}
7400
7401pub fn describe_alter_system_reset_all(
7402    _: &StatementContext,
7403    _: AlterSystemResetAllStatement,
7404) -> Result<StatementDesc, PlanError> {
7405    Ok(StatementDesc::new(None))
7406}
7407
7408pub fn plan_alter_system_reset_all(
7409    _: &StatementContext,
7410    _: AlterSystemResetAllStatement,
7411) -> Result<Plan, PlanError> {
7412    Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7413}
7414
7415pub fn describe_alter_role(
7416    _: &StatementContext,
7417    _: AlterRoleStatement<Aug>,
7418) -> Result<StatementDesc, PlanError> {
7419    Ok(StatementDesc::new(None))
7420}
7421
7422pub fn plan_alter_role(
7423    scx: &StatementContext,
7424    AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7425) -> Result<Plan, PlanError> {
7426    let option = match option {
7427        AlterRoleOption::Attributes(attrs) => {
7428            let attrs = plan_role_attributes(attrs, scx)?;
7429            PlannedAlterRoleOption::Attributes(attrs)
7430        }
7431        AlterRoleOption::Variable(variable) => {
7432            let var = plan_role_variable(variable)?;
7433            PlannedAlterRoleOption::Variable(var)
7434        }
7435    };
7436
7437    Ok(Plan::AlterRole(AlterRolePlan {
7438        id: name.id,
7439        name: name.name,
7440        option,
7441    }))
7442}
7443
7444pub fn describe_alter_table_add_column(
7445    _: &StatementContext,
7446    _: AlterTableAddColumnStatement<Aug>,
7447) -> Result<StatementDesc, PlanError> {
7448    Ok(StatementDesc::new(None))
7449}
7450
7451pub fn plan_alter_table_add_column(
7452    scx: &StatementContext,
7453    stmt: AlterTableAddColumnStatement<Aug>,
7454) -> Result<Plan, PlanError> {
7455    let AlterTableAddColumnStatement {
7456        if_exists,
7457        name,
7458        if_col_not_exist,
7459        column_name,
7460        data_type,
7461    } = stmt;
7462    let object_type = ObjectType::Table;
7463
7464    scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7465
7466    let (relation_id, item_name, desc) =
7467        match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7468            Some(item) => {
7469                // Always add columns to the latest version of the item.
7470                let item_name = scx.catalog.resolve_full_name(item.name());
7471                let item = item.at_version(RelationVersionSelector::Latest);
7472                let desc = item
7473                    .relation_desc()
7474                    .ok_or_else(|| sql_err!("item does not have a relation description"))?
7475                    .into_owned();
7476                (item.id(), item_name, desc)
7477            }
7478            None => {
7479                scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7480                    name: name.to_ast_string_simple(),
7481                    object_type,
7482                });
7483                return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7484            }
7485        };
7486
7487    let column_name = ColumnName::from(column_name.as_str());
7488    if desc.get_by_name(&column_name).is_some() {
7489        if if_col_not_exist {
7490            scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7491                column_name: column_name.to_string(),
7492                object_name: item_name.item,
7493            });
7494            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7495        } else {
7496            return Err(PlanError::ColumnAlreadyExists {
7497                column_name,
7498                object_name: item_name.item,
7499            });
7500        }
7501    }
7502
7503    let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7504    // TODO(alter_table): Support non-nullable columns with default values.
7505    let column_type = scalar_type.nullable(true);
7506    // "unresolve" our data type so we can later update the persisted create_sql.
7507    let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7508
7509    Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7510        relation_id,
7511        column_name,
7512        column_type,
7513        raw_sql_type,
7514    }))
7515}
7516
7517pub fn describe_alter_materialized_view_apply_replacement(
7518    _: &StatementContext,
7519    _: AlterMaterializedViewApplyReplacementStatement,
7520) -> Result<StatementDesc, PlanError> {
7521    Ok(StatementDesc::new(None))
7522}
7523
7524pub fn plan_alter_materialized_view_apply_replacement(
7525    scx: &StatementContext,
7526    stmt: AlterMaterializedViewApplyReplacementStatement,
7527) -> Result<Plan, PlanError> {
7528    let AlterMaterializedViewApplyReplacementStatement {
7529        if_exists,
7530        name,
7531        replacement_name,
7532    } = stmt;
7533
7534    scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
7535
7536    let object_type = ObjectType::MaterializedView;
7537    let Some(mv) = resolve_item_or_type(scx, object_type, name.clone(), if_exists)? else {
7538        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7539            name: name.to_ast_string_simple(),
7540            object_type,
7541        });
7542        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7543    };
7544
7545    let replacement = resolve_item_or_type(scx, object_type, replacement_name, false)?
7546        .ok_or_else(|| sql_err!("replacement materialized view does not exist"))?;
7547
7548    if replacement.replacement_target() != Some(mv.id()) {
7549        return Err(PlanError::InvalidReplacement {
7550            item_type: mv.item_type(),
7551            item_name: scx.catalog.minimal_qualification(mv.name()),
7552            replacement_type: replacement.item_type(),
7553            replacement_name: scx.catalog.minimal_qualification(replacement.name()),
7554        });
7555    }
7556
7557    Ok(Plan::AlterMaterializedViewApplyReplacement(
7558        AlterMaterializedViewApplyReplacementPlan {
7559            id: mv.id(),
7560            replacement_id: replacement.id(),
7561        },
7562    ))
7563}
7564
7565pub fn describe_comment(
7566    _: &StatementContext,
7567    _: CommentStatement<Aug>,
7568) -> Result<StatementDesc, PlanError> {
7569    Ok(StatementDesc::new(None))
7570}
7571
7572pub fn plan_comment(
7573    scx: &mut StatementContext,
7574    stmt: CommentStatement<Aug>,
7575) -> Result<Plan, PlanError> {
7576    const MAX_COMMENT_LENGTH: usize = 1024;
7577
7578    let CommentStatement { object, comment } = stmt;
7579
7580    // TODO(parkmycar): Make max comment length configurable.
7581    if let Some(c) = &comment {
7582        if c.len() > 1024 {
7583            return Err(PlanError::CommentTooLong {
7584                length: c.len(),
7585                max_size: MAX_COMMENT_LENGTH,
7586            });
7587        }
7588    }
7589
7590    let (object_id, column_pos) = match &object {
7591        com_ty @ CommentObjectType::Table { name }
7592        | com_ty @ CommentObjectType::View { name }
7593        | com_ty @ CommentObjectType::MaterializedView { name }
7594        | com_ty @ CommentObjectType::Index { name }
7595        | com_ty @ CommentObjectType::Func { name }
7596        | com_ty @ CommentObjectType::Connection { name }
7597        | com_ty @ CommentObjectType::Source { name }
7598        | com_ty @ CommentObjectType::Sink { name }
7599        | com_ty @ CommentObjectType::Secret { name } => {
7600            let item = scx.get_item_by_resolved_name(name)?;
7601            match (com_ty, item.item_type()) {
7602                (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7603                    (CommentObjectId::Table(item.id()), None)
7604                }
7605                (CommentObjectType::View { .. }, CatalogItemType::View) => {
7606                    (CommentObjectId::View(item.id()), None)
7607                }
7608                (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7609                    (CommentObjectId::MaterializedView(item.id()), None)
7610                }
7611                (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7612                    (CommentObjectId::Index(item.id()), None)
7613                }
7614                (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7615                    (CommentObjectId::Func(item.id()), None)
7616                }
7617                (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7618                    (CommentObjectId::Connection(item.id()), None)
7619                }
7620                (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7621                    (CommentObjectId::Source(item.id()), None)
7622                }
7623                (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7624                    (CommentObjectId::Sink(item.id()), None)
7625                }
7626                (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7627                    (CommentObjectId::Secret(item.id()), None)
7628                }
7629                (com_ty, cat_ty) => {
7630                    let expected_type = match com_ty {
7631                        CommentObjectType::Table { .. } => ObjectType::Table,
7632                        CommentObjectType::View { .. } => ObjectType::View,
7633                        CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7634                        CommentObjectType::Index { .. } => ObjectType::Index,
7635                        CommentObjectType::Func { .. } => ObjectType::Func,
7636                        CommentObjectType::Connection { .. } => ObjectType::Connection,
7637                        CommentObjectType::Source { .. } => ObjectType::Source,
7638                        CommentObjectType::Sink { .. } => ObjectType::Sink,
7639                        CommentObjectType::Secret { .. } => ObjectType::Secret,
7640                        _ => sql_bail!("cannot comment on this object type"),
7641                    };
7642
7643                    return Err(PlanError::InvalidObjectType {
7644                        expected_type: SystemObjectType::Object(expected_type),
7645                        actual_type: SystemObjectType::Object(cat_ty.into()),
7646                        object_name: item.name().item.clone(),
7647                    });
7648                }
7649            }
7650        }
7651        CommentObjectType::Type { ty } => match ty {
7652            ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7653                sql_bail!("cannot comment on anonymous list or map type");
7654            }
7655            ResolvedDataType::Named { id, modifiers, .. } => {
7656                if !modifiers.is_empty() {
7657                    sql_bail!("cannot comment on type with modifiers");
7658                }
7659                (CommentObjectId::Type(*id), None)
7660            }
7661            ResolvedDataType::Error => bail_internal!("unresolved data type"),
7662        },
7663        CommentObjectType::Column { name } => {
7664            let (item, pos) = scx.get_column_by_resolved_name(name)?;
7665            match item.item_type() {
7666                CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7667                CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7668                CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7669                CatalogItemType::MaterializedView => {
7670                    (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7671                }
7672                CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7673                r => {
7674                    return Err(PlanError::Unsupported {
7675                        feature: format!("Specifying comments on a column of {r}"),
7676                        discussion_no: None,
7677                    });
7678                }
7679            }
7680        }
7681        CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7682        CommentObjectType::Database { name } => {
7683            (CommentObjectId::Database(*name.database_id()), None)
7684        }
7685        CommentObjectType::Schema { name } => {
7686            // Temporary schemas cannot have comments - they are connection-specific
7687            // and transient. With lazy temporary schema creation, the temp schema
7688            // may not exist yet, but we still need to return the correct error.
7689            if matches!(name.schema_spec(), SchemaSpecifier::Temporary) {
7690                sql_bail!(
7691                    "cannot comment on schema {} because it is a temporary schema",
7692                    mz_repr::namespaces::MZ_TEMP_SCHEMA
7693                );
7694            }
7695            (
7696                CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7697                None,
7698            )
7699        }
7700        CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7701        CommentObjectType::ClusterReplica { name } => {
7702            let replica = scx.catalog.resolve_cluster_replica(name)?;
7703            (
7704                CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7705                None,
7706            )
7707        }
7708        CommentObjectType::NetworkPolicy { name } => {
7709            (CommentObjectId::NetworkPolicy(name.id), None)
7710        }
7711    };
7712
7713    // Note: the `mz_comments` table uses an `Int4` for the column position, but in the catalog storage we
7714    // store a `usize` which would be a `Uint8`. We guard against a safe conversion here because
7715    // it's the easiest place to raise an error.
7716    //
7717    // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
7718    if let Some(p) = column_pos {
7719        i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7720            max_num_columns: MAX_NUM_COLUMNS,
7721            req_num_columns: p,
7722        })?;
7723    }
7724
7725    Ok(Plan::Comment(CommentPlan {
7726        object_id,
7727        sub_component: column_pos,
7728        comment,
7729    }))
7730}
7731
7732pub(crate) fn resolve_cluster<'a>(
7733    scx: &'a StatementContext,
7734    name: &'a Ident,
7735    if_exists: bool,
7736) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7737    match scx.resolve_cluster(Some(name)) {
7738        Ok(cluster) => Ok(Some(cluster)),
7739        Err(_) if if_exists => Ok(None),
7740        Err(e) => Err(e),
7741    }
7742}
7743
7744pub(crate) fn resolve_cluster_replica<'a>(
7745    scx: &'a StatementContext,
7746    name: &QualifiedReplica,
7747    if_exists: bool,
7748) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7749    match scx.resolve_cluster(Some(&name.cluster)) {
7750        Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7751            Some(replica_id) => Ok(Some((cluster, *replica_id))),
7752            None if if_exists => Ok(None),
7753            None => Err(sql_err!(
7754                "CLUSTER {} has no CLUSTER REPLICA named {}",
7755                cluster.name(),
7756                name.replica.as_str().quoted(),
7757            )),
7758        },
7759        Err(_) if if_exists => Ok(None),
7760        Err(e) => Err(e),
7761    }
7762}
7763
7764pub(crate) fn resolve_database<'a>(
7765    scx: &'a StatementContext,
7766    name: &'a UnresolvedDatabaseName,
7767    if_exists: bool,
7768) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7769    match scx.resolve_database(name) {
7770        Ok(database) => Ok(Some(database)),
7771        Err(_) if if_exists => Ok(None),
7772        Err(e) => Err(e),
7773    }
7774}
7775
7776pub(crate) fn resolve_schema<'a>(
7777    scx: &'a StatementContext,
7778    name: UnresolvedSchemaName,
7779    if_exists: bool,
7780) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7781    match scx.resolve_schema(name) {
7782        Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7783        Err(_) if if_exists => Ok(None),
7784        Err(e) => Err(e),
7785    }
7786}
7787
7788pub(crate) fn resolve_network_policy<'a>(
7789    scx: &'a StatementContext,
7790    name: Ident,
7791    if_exists: bool,
7792) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7793    match scx.catalog.resolve_network_policy(&name.to_string()) {
7794        Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7795            id: policy.id(),
7796            name: policy.name().to_string(),
7797        })),
7798        Err(_) if if_exists => Ok(None),
7799        Err(e) => Err(e.into()),
7800    }
7801}
7802
7803pub(crate) fn resolve_item_or_type<'a>(
7804    scx: &'a StatementContext,
7805    object_type: ObjectType,
7806    name: UnresolvedItemName,
7807    if_exists: bool,
7808) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7809    let name = normalize::unresolved_item_name(name)?;
7810    let catalog_item = match object_type {
7811        ObjectType::Type => scx.catalog.resolve_type(&name),
7812        ObjectType::Table
7813        | ObjectType::View
7814        | ObjectType::MaterializedView
7815        | ObjectType::Source
7816        | ObjectType::Sink
7817        | ObjectType::Index
7818        | ObjectType::Role
7819        | ObjectType::Cluster
7820        | ObjectType::ClusterReplica
7821        | ObjectType::Secret
7822        | ObjectType::Connection
7823        | ObjectType::Database
7824        | ObjectType::Schema
7825        | ObjectType::Func
7826        | ObjectType::NetworkPolicy => scx.catalog.resolve_item(&name),
7827    };
7828
7829    match catalog_item {
7830        Ok(item) => {
7831            let is_type = ObjectType::from(item.item_type());
7832            if object_type == is_type {
7833                Ok(Some(item))
7834            } else {
7835                Err(PlanError::MismatchedObjectType {
7836                    name: scx.catalog.minimal_qualification(item.name()),
7837                    is_type,
7838                    expected_type: object_type,
7839                })
7840            }
7841        }
7842        Err(_) if if_exists => Ok(None),
7843        Err(e) => Err(e.into()),
7844    }
7845}
7846
7847/// Returns an error if the given cluster is a managed cluster
7848fn ensure_cluster_is_not_managed(
7849    scx: &StatementContext,
7850    cluster_id: ClusterId,
7851) -> Result<(), PlanError> {
7852    let cluster = scx.catalog.get_cluster(cluster_id);
7853    if cluster.is_managed() {
7854        Err(PlanError::ManagedCluster {
7855            cluster_name: cluster.name().to_string(),
7856        })
7857    } else {
7858        Ok(())
7859    }
7860}