Skip to main content

mz_sql/plan/statement/
ddl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data definition language (DDL).
11//!
12//! This module houses the handlers for statements that modify the catalog, like
13//! `ALTER`, `CREATE`, and `DROP`.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::num::NonZeroU32;
19use std::time::Duration;
20
21use itertools::Itertools;
22use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
23use mz_arrow_util::builder::ArrowBuilder;
24use mz_auth::password::Password;
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_expr::{CollectionPlan, UnmaterializableFunc};
27use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
28use mz_ore::cast::{CastFrom, TryCastFrom};
29use mz_ore::collections::{CollectionExt, HashSet};
30use mz_ore::num::NonNeg;
31use mz_ore::str::StrExt;
32use mz_ore::{soft_assert_or_log, soft_panic_or_log};
33use mz_proto::RustType;
34use mz_repr::adt::interval::Interval;
35use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
36use mz_repr::network_policy_id::NetworkPolicyId;
37use mz_repr::optimize::OptimizerFeatureOverrides;
38use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
39use mz_repr::role_id::RoleId;
40use mz_repr::{
41    CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
42    SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
43    preserves_order, strconv,
44};
45use mz_sql_parser::ast::{
46    self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
47    AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
48    AlterMaterializedViewApplyReplacementStatement, AlterNetworkPolicyStatement,
49    AlterObjectRenameStatement, AlterObjectSwapStatement, AlterRetainHistoryStatement,
50    AlterRoleOption, AlterRoleStatement, AlterSecretStatement, AlterSetClusterStatement,
51    AlterSinkAction, AlterSinkStatement, AlterSourceAction, AlterSourceAddSubsourceOption,
52    AlterSourceAddSubsourceOptionName, AlterSourceStatement, AlterSystemResetAllStatement,
53    AlterSystemResetStatement, AlterSystemSetStatement, AlterTableAddColumnStatement, AvroSchema,
54    AvroSchemaOption, AvroSchemaOptionName, ClusterAlterOption, ClusterAlterOptionName,
55    ClusterAlterOptionValue, ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName,
56    ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName,
57    ClusterScheduleOptionValue, ColumnDef, ColumnOption, CommentObjectType, CommentStatement,
58    ConnectionOption, ConnectionOptionName, CreateClusterReplicaStatement, CreateClusterStatement,
59    CreateConnectionOption, CreateConnectionOptionName, CreateConnectionStatement,
60    CreateConnectionType, CreateDatabaseStatement, CreateIndexStatement,
61    CreateMaterializedViewStatement, CreateNetworkPolicyStatement, CreateRoleStatement,
62    CreateSchemaStatement, CreateSecretStatement, CreateSinkConnection, CreateSinkOption,
63    CreateSinkOptionName, CreateSinkStatement, CreateSourceConnection, CreateSourceOption,
64    CreateSourceOptionName, CreateSourceStatement, CreateSubsourceOption,
65    CreateSubsourceOptionName, CreateSubsourceStatement, CreateTableFromSourceStatement,
66    CreateTableStatement, CreateTypeAs, CreateTypeListOption, CreateTypeListOptionName,
67    CreateTypeMapOption, CreateTypeMapOptionName, CreateTypeStatement, CreateViewStatement,
68    CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
69    CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DeferredItemName,
70    DocOnIdentifier, DocOnSchema, DropObjectsStatement, DropOwnedStatement, Expr, Format,
71    FormatSpecifier, IcebergSinkConfigOption, Ident, IfExistsBehavior, IndexOption,
72    IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
73    LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
74    MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
75    NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
76    PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
77    RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
78    ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
79    SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
80    TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
81    TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
82    UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
83};
84use mz_sql_parser::ident;
85use mz_sql_parser::parser::StatementParseResult;
86use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
87use mz_storage_types::connections::{Connection, KafkaTopicOptions};
88use mz_storage_types::sinks::{
89    IcebergSinkConnection, KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType,
90    SinkEnvelope, StorageSinkConnection, iceberg_type_overrides,
91};
92use mz_storage_types::sources::encoding::{
93    AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
94    SourceDataEncoding, included_column_desc,
95};
96use mz_storage_types::sources::envelope::{
97    KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
98};
99use mz_storage_types::sources::kafka::{
100    KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
101};
102use mz_storage_types::sources::load_generator::{
103    KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
104    LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
105};
106use mz_storage_types::sources::mysql::{
107    MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
108};
109use mz_storage_types::sources::postgres::{
110    PostgresSourceConnection, PostgresSourcePublicationDetails,
111    ProtoPostgresSourcePublicationDetails,
112};
113use mz_storage_types::sources::sql_server::{
114    ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
115};
116use mz_storage_types::sources::{
117    GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
118    ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
119    SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
120    SqlServerSourceExtras, Timeline,
121};
122use mz_storage_types::wire_format::WireFormat;
123use prost::Message;
124
125use crate::ast::display::AstDisplay;
126use crate::catalog::{
127    CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
128    CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
129};
130use crate::iceberg::IcebergSinkConfigOptionExtracted;
131use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
132use crate::names::{
133    Aug, CommentObjectId, DatabaseId, DependencyIds, ObjectId, PartialItemName, QualifiedItemName,
134    ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
135    ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
136};
137use crate::normalize::{self, ident};
138use crate::plan::error::PlanError;
139use crate::plan::query::{
140    ExprContext, QueryLifetime, plan_expr, scalar_type_from_catalog, scalar_type_from_sql,
141};
142use crate::plan::scope::Scope;
143use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
144use crate::plan::statement::{StatementContext, StatementDesc, scl};
145use crate::plan::typeconv::CastContext;
146use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
147use crate::plan::{
148    AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
149    AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
150    AlterMaterializedViewApplyReplacementPlan, AlterNetworkPolicyPlan, AlterNoopPlan,
151    AlterOptionParameter, AlterRetainHistoryPlan, AlterRolePlan, AlterSchemaRenamePlan,
152    AlterSchemaSwapPlan, AlterSecretPlan, AlterSetClusterPlan, AlterSinkPlan,
153    AlterSourceTimestampIntervalPlan, AlterSystemResetAllPlan, AlterSystemResetPlan,
154    AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig,
155    ComputeReplicaIntrospectionConfig, ConnectionDetails, CreateClusterManagedPlan,
156    CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant,
157    CreateConnectionPlan, CreateDatabasePlan, CreateIndexPlan, CreateMaterializedViewPlan,
158    CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan,
159    CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc,
160    DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index, MaterializedView, NetworkPolicyRule,
161    NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan, PlanClusterOption, PlanNotice,
162    PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink, Source, Table, TableDataSource, Type,
163    VariableValue, View, WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders,
164    WebhookValidation, literal, plan_utils, query, transform_ast,
165};
166use crate::session::vars::{
167    self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
168    ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
169    ENABLE_REPLICA_TARGETED_MATERIALIZED_VIEWS, VarInput,
170};
171use crate::{names, parse};
172
173mod connection;
174
175// TODO: Figure out what the maximum number of columns we can actually support is, and set that.
176//
177// The real max is probably higher than this, but it's easier to relax a constraint than make it
178// more strict.
179const MAX_NUM_COLUMNS: usize = 256;
180
181const MAX_KAFKA_TOPIC_METADATA_REFRESH_INTERVAL: Duration = Duration::from_secs(60 * 60);
182const MIN_KAFKA_TOPIC_METADATA_REFRESH_INTERVAL: Duration = Duration::from_secs(1);
183
184static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
185    std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
186
187/// Given a relation desc and a column list, checks that:
188/// - the column list is a prefix of the desc;
189/// - all the listed columns are types that have meaningful Persist-level ordering.
190fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
191    if partition_by.len() > desc.len() {
192        tracing::error!(
193            "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
194            desc.len(),
195            partition_by.len()
196        );
197        partition_by.truncate(desc.len());
198    }
199
200    let desc_prefix = desc.iter().take(partition_by.len());
201    for (idx, ((desc_name, desc_type), partition_name)) in
202        desc_prefix.zip_eq(partition_by).enumerate()
203    {
204        let partition_name = normalize::column_name(partition_name);
205        if *desc_name != partition_name {
206            sql_bail!(
207                "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
208            );
209        }
210        if !preserves_order(&desc_type.scalar_type) {
211            sql_bail!("PARTITION BY column {partition_name} has unsupported type");
212        }
213    }
214    Ok(())
215}
216
217pub fn describe_create_database(
218    _: &StatementContext,
219    _: CreateDatabaseStatement,
220) -> Result<StatementDesc, PlanError> {
221    Ok(StatementDesc::new(None))
222}
223
224pub fn plan_create_database(
225    _: &StatementContext,
226    CreateDatabaseStatement {
227        name,
228        if_not_exists,
229    }: CreateDatabaseStatement,
230) -> Result<Plan, PlanError> {
231    Ok(Plan::CreateDatabase(CreateDatabasePlan {
232        name: normalize::ident(name.0),
233        if_not_exists,
234    }))
235}
236
237pub fn describe_create_schema(
238    _: &StatementContext,
239    _: CreateSchemaStatement,
240) -> Result<StatementDesc, PlanError> {
241    Ok(StatementDesc::new(None))
242}
243
244pub fn plan_create_schema(
245    scx: &StatementContext,
246    CreateSchemaStatement {
247        mut name,
248        if_not_exists,
249    }: CreateSchemaStatement,
250) -> Result<Plan, PlanError> {
251    if name.0.len() > 2 {
252        sql_bail!("schema name {} has more than two components", name);
253    }
254    let schema_name = normalize::ident(
255        name.0
256            .pop()
257            .expect("names always have at least one component"),
258    );
259    let database_spec = match name.0.pop() {
260        None => match scx.catalog.active_database() {
261            Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
262            None => sql_bail!("no database specified and no active database"),
263        },
264        Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
265            Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
266            Err(_) => sql_bail!("invalid database {}", n.as_str()),
267        },
268    };
269    Ok(Plan::CreateSchema(CreateSchemaPlan {
270        database_spec,
271        schema_name,
272        if_not_exists,
273    }))
274}
275
276pub fn describe_create_table(
277    _: &StatementContext,
278    _: CreateTableStatement<Aug>,
279) -> Result<StatementDesc, PlanError> {
280    Ok(StatementDesc::new(None))
281}
282
283pub fn plan_create_table(
284    scx: &StatementContext,
285    stmt: CreateTableStatement<Aug>,
286) -> Result<Plan, PlanError> {
287    let CreateTableStatement {
288        name,
289        columns,
290        constraints,
291        if_not_exists,
292        temporary,
293        with_options,
294    } = &stmt;
295
296    let names: Vec<_> = columns
297        .iter()
298        .filter(|c| {
299            // This set of `names` is used to create the initial RelationDesc.
300            // Columns that have been added at later versions of the table will
301            // get added further below.
302            let is_versioned = c
303                .options
304                .iter()
305                .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
306            !is_versioned
307        })
308        .map(|c| normalize::column_name(c.name.clone()))
309        .collect();
310
311    if let Some(dup) = names.iter().duplicates().next() {
312        sql_bail!("column {} specified more than once", dup.quoted());
313    }
314
315    // Build initial relation type that handles declared data types
316    // and NOT NULL constraints.
317    let mut column_types = Vec::with_capacity(columns.len());
318    let mut defaults = Vec::with_capacity(columns.len());
319    let mut changes = BTreeMap::new();
320    let mut keys = Vec::new();
321
322    for (i, c) in columns.into_iter().enumerate() {
323        let aug_data_type = &c.data_type;
324        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
325        let mut nullable = true;
326        let mut default = Expr::null();
327        let mut versioned = false;
328        for option in &c.options {
329            match &option.option {
330                ColumnOption::NotNull => nullable = false,
331                ColumnOption::Default(expr) => {
332                    // Ensure expression can be planned and yields the correct
333                    // type.
334                    let mut expr = expr.clone();
335                    transform_ast::transform(scx, &mut expr)?;
336                    let _ = query::plan_default_expr(scx, &expr, &ty)?;
337                    default = expr.clone();
338                }
339                ColumnOption::Unique { is_primary } => {
340                    keys.push(vec![i]);
341                    if *is_primary {
342                        nullable = false;
343                    }
344                }
345                ColumnOption::Versioned { action, version } => {
346                    let version = RelationVersion::from(*version);
347                    versioned = true;
348
349                    let name = normalize::column_name(c.name.clone());
350                    let typ = ty.clone().nullable(nullable);
351
352                    changes.insert(version, (action.clone(), name, typ));
353                }
354                other => {
355                    bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
356                }
357            }
358        }
359        // TODO(alter_table): This assumes all versioned columns are at the
360        // end. This will no longer be true when we support dropping columns.
361        if !versioned {
362            column_types.push(ty.nullable(nullable));
363        }
364        defaults.push(default);
365    }
366
367    let mut seen_primary = false;
368    'c: for constraint in constraints {
369        match constraint {
370            TableConstraint::Unique {
371                name: _,
372                columns,
373                is_primary,
374                nulls_not_distinct,
375            } => {
376                if seen_primary && *is_primary {
377                    sql_bail!(
378                        "multiple primary keys for table {} are not allowed",
379                        name.to_ast_string_stable()
380                    );
381                }
382                seen_primary = *is_primary || seen_primary;
383
384                let mut key = vec![];
385                for column in columns {
386                    let column = normalize::column_name(column.clone());
387                    match names.iter().position(|name| *name == column) {
388                        None => sql_bail!("unknown column in constraint: {}", column),
389                        Some(i) => {
390                            let nullable = &mut column_types[i].nullable;
391                            if *is_primary {
392                                if *nulls_not_distinct {
393                                    sql_bail!(
394                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
395                                    );
396                                }
397
398                                *nullable = false;
399                            } else if !(*nulls_not_distinct || !*nullable) {
400                                // Non-primary key unique constraints are only keys if all of their
401                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
402                                break 'c;
403                            }
404
405                            key.push(i);
406                        }
407                    }
408                }
409
410                if *is_primary {
411                    keys.insert(0, key);
412                } else {
413                    keys.push(key);
414                }
415            }
416            TableConstraint::ForeignKey { .. } => {
417                // Foreign key constraints are not presently enforced. We allow
418                // them with feature flags for sqllogictest's sake.
419                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
420            }
421            TableConstraint::Check { .. } => {
422                // Check constraints are not presently enforced. We allow them
423                // with feature flags for sqllogictest's sake.
424                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
425            }
426        }
427    }
428
429    if !keys.is_empty() {
430        // Unique constraints are not presently enforced. We allow them with feature flags for
431        // sqllogictest's sake.
432        scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
433    }
434
435    let typ = SqlRelationType::new(column_types).with_keys(keys);
436
437    let temporary = *temporary;
438    let name = if temporary {
439        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
440    } else {
441        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
442    };
443
444    // Check for an object in the catalog with this same name
445    let full_name = scx.catalog.resolve_full_name(&name);
446    let partial_name = PartialItemName::from(full_name.clone());
447    // For PostgreSQL compatibility, we need to prevent creating tables when
448    // there is an existing object *or* type of the same name.
449    if let (false, Ok(item)) = (
450        if_not_exists,
451        scx.catalog.resolve_item_or_type(&partial_name),
452    ) {
453        return Err(PlanError::ItemAlreadyExists {
454            name: full_name.to_string(),
455            item_type: item.item_type(),
456        });
457    }
458
459    let desc = RelationDesc::new(typ, names);
460    let mut desc = VersionedRelationDesc::new(desc);
461    for (version, (_action, name, typ)) in changes.into_iter() {
462        let new_version = desc.add_column(name, typ);
463        if version != new_version {
464            return Err(PlanError::InvalidTable {
465                name: full_name.item,
466            });
467        }
468    }
469
470    let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
471
472    // Table options should only consider the original columns, since those
473    // were the only ones in scope when the table was created.
474    //
475    // TODO(alter_table): Will need to reconsider this when we support ALTERing
476    // the PARTITION BY columns.
477    let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
478    let options = plan_table_options(scx, &original_desc, with_options.clone())?;
479
480    let compaction_window = options.iter().find_map(|o| {
481        #[allow(irrefutable_let_patterns)]
482        if let crate::plan::TableOption::RetainHistory(lcw) = o {
483            Some(lcw.clone())
484        } else {
485            None
486        }
487    });
488
489    let table = Table {
490        create_sql,
491        desc,
492        temporary,
493        compaction_window,
494        data_source: TableDataSource::TableWrites { defaults },
495    };
496    Ok(Plan::CreateTable(CreateTablePlan {
497        name,
498        table,
499        if_not_exists: *if_not_exists,
500    }))
501}
502
503pub fn describe_create_table_from_source(
504    _: &StatementContext,
505    _: CreateTableFromSourceStatement<Aug>,
506) -> Result<StatementDesc, PlanError> {
507    Ok(StatementDesc::new(None))
508}
509
510pub fn describe_create_webhook_source(
511    _: &StatementContext,
512    _: CreateWebhookSourceStatement<Aug>,
513) -> Result<StatementDesc, PlanError> {
514    Ok(StatementDesc::new(None))
515}
516
517pub fn describe_create_source(
518    _: &StatementContext,
519    _: CreateSourceStatement<Aug>,
520) -> Result<StatementDesc, PlanError> {
521    Ok(StatementDesc::new(None))
522}
523
524pub fn describe_create_subsource(
525    _: &StatementContext,
526    _: CreateSubsourceStatement<Aug>,
527) -> Result<StatementDesc, PlanError> {
528    Ok(StatementDesc::new(None))
529}
530
531generate_extracted_config!(
532    CreateSourceOption,
533    (TimestampInterval, Duration),
534    (RetainHistory, OptionalDuration)
535);
536
537generate_extracted_config!(
538    PgConfigOption,
539    (Details, String),
540    (Publication, String),
541    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
542    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
543);
544
545generate_extracted_config!(
546    MySqlConfigOption,
547    (Details, String),
548    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
549    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
550);
551
552generate_extracted_config!(
553    SqlServerConfigOption,
554    (Details, String),
555    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
556    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
557);
558
559pub fn plan_create_webhook_source(
560    scx: &StatementContext,
561    mut stmt: CreateWebhookSourceStatement<Aug>,
562) -> Result<Plan, PlanError> {
563    if stmt.is_table {
564        scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
565    }
566
567    // We will rewrite the cluster if one is not provided, so we must use the `in_cluster` value
568    // we plan to normalize when we canonicalize the create statement.
569    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
570    let create_sql =
571        normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
572
573    let CreateWebhookSourceStatement {
574        name,
575        if_not_exists,
576        body_format,
577        include_headers,
578        validate_using,
579        is_table,
580        // We resolved `in_cluster` above, so we want to ignore it here.
581        in_cluster: _,
582    } = stmt;
583
584    let validate_using = validate_using
585        .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
586        .transpose()?;
587    if let Some(WebhookValidation { expression, .. }) = &validate_using {
588        // If the validation expression doesn't reference any part of the request, then we should
589        // return an error because it's almost definitely wrong.
590        if !expression.contains_column() {
591            return Err(PlanError::WebhookValidationDoesNotUseColumns);
592        }
593        // Validation expressions cannot contain unmaterializable functions, except `now()`. We
594        // allow calls to `now()` because some webhook providers recommend rejecting requests that
595        // are older than a certain threshold.
596        if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
597            return Err(PlanError::WebhookValidationNonDeterministic);
598        }
599    }
600
601    let body_format = match body_format {
602        Format::Bytes => WebhookBodyFormat::Bytes,
603        Format::Json { array } => WebhookBodyFormat::Json { array },
604        Format::Text => WebhookBodyFormat::Text,
605        // TODO(parkmycar): Make an issue to support more types, or change this to NeverSupported.
606        ty => {
607            return Err(PlanError::Unsupported {
608                feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
609                discussion_no: None,
610            });
611        }
612    };
613
614    let mut column_ty = vec![
615        // Always include the body of the request as the first column.
616        SqlColumnType {
617            scalar_type: SqlScalarType::from(body_format),
618            nullable: false,
619        },
620    ];
621    let mut column_names = vec!["body".to_string()];
622
623    let mut headers = WebhookHeaders::default();
624
625    // Include a `headers` column, possibly filtered.
626    if let Some(filters) = include_headers.column {
627        column_ty.push(SqlColumnType {
628            scalar_type: SqlScalarType::Map {
629                value_type: Box::new(SqlScalarType::String),
630                custom_id: None,
631            },
632            nullable: false,
633        });
634        column_names.push("headers".to_string());
635
636        let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
637            filters.into_iter().partition_map(|filter| {
638                if filter.block {
639                    itertools::Either::Right(filter.header_name)
640                } else {
641                    itertools::Either::Left(filter.header_name)
642                }
643            });
644        headers.header_column = Some(WebhookHeaderFilters { allow, block });
645    }
646
647    // Map headers to specific columns.
648    for header in include_headers.mappings {
649        let scalar_type = header
650            .use_bytes
651            .then_some(SqlScalarType::Bytes)
652            .unwrap_or(SqlScalarType::String);
653        column_ty.push(SqlColumnType {
654            scalar_type,
655            nullable: true,
656        });
657        column_names.push(header.column_name.into_string());
658
659        let column_idx = column_ty.len() - 1;
660        // Double check we're consistent with column names.
661        assert_eq!(
662            column_idx,
663            column_names.len() - 1,
664            "header column names and types don't match"
665        );
666        headers
667            .mapped_headers
668            .insert(column_idx, (header.header_name, header.use_bytes));
669    }
670
671    // Validate our columns.
672    let mut unique_check = HashSet::with_capacity(column_names.len());
673    for name in &column_names {
674        if !unique_check.insert(name) {
675            return Err(PlanError::AmbiguousColumn(name.clone().into()));
676        }
677    }
678    if column_names.len() > MAX_NUM_COLUMNS {
679        return Err(PlanError::TooManyColumns {
680            max_num_columns: MAX_NUM_COLUMNS,
681            req_num_columns: column_names.len(),
682        });
683    }
684
685    let typ = SqlRelationType::new(column_ty);
686    let desc = RelationDesc::new(typ, column_names);
687
688    // Check for an object in the catalog with this same name
689    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
690    let full_name = scx.catalog.resolve_full_name(&name);
691    let partial_name = PartialItemName::from(full_name.clone());
692    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
693        return Err(PlanError::ItemAlreadyExists {
694            name: full_name.to_string(),
695            item_type: item.item_type(),
696        });
697    }
698
699    // Note(parkmycar): We don't currently support specifying a timeline for Webhook sources. As
700    // such, we always use a default of EpochMilliseconds.
701    let timeline = Timeline::EpochMilliseconds;
702
703    let plan = if is_table {
704        let data_source = DataSourceDesc::Webhook {
705            validate_using,
706            body_format,
707            headers,
708            cluster_id: Some(in_cluster.id()),
709        };
710        let data_source = TableDataSource::DataSource {
711            desc: data_source,
712            timeline,
713        };
714        Plan::CreateTable(CreateTablePlan {
715            name,
716            if_not_exists,
717            table: Table {
718                create_sql,
719                desc: VersionedRelationDesc::new(desc),
720                temporary: false,
721                compaction_window: None,
722                data_source,
723            },
724        })
725    } else {
726        let data_source = DataSourceDesc::Webhook {
727            validate_using,
728            body_format,
729            headers,
730            // Important: The cluster is set at the `Source` level.
731            cluster_id: None,
732        };
733        Plan::CreateSource(CreateSourcePlan {
734            name,
735            source: Source {
736                create_sql,
737                data_source,
738                desc,
739                compaction_window: None,
740            },
741            if_not_exists,
742            timeline,
743            in_cluster: Some(in_cluster.id()),
744        })
745    };
746
747    Ok(plan)
748}
749
750pub fn plan_create_source(
751    scx: &StatementContext,
752    mut stmt: CreateSourceStatement<Aug>,
753) -> Result<Plan, PlanError> {
754    let CreateSourceStatement {
755        name,
756        in_cluster: _,
757        col_names,
758        connection: source_connection,
759        envelope,
760        if_not_exists,
761        format,
762        key_constraint,
763        include_metadata,
764        with_options,
765        external_references: referenced_subsources,
766        progress_subsource,
767    } = &stmt;
768
769    mz_ore::soft_assert_or_log!(
770        referenced_subsources.is_none(),
771        "referenced subsources must be cleared in purification"
772    );
773
774    let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
775        && scx.catalog.system_vars().force_source_table_syntax();
776
777    // If the new source table syntax is forced all the options related to the primary
778    // source output should be un-set.
779    if force_source_table_syntax {
780        if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
781            Err(PlanError::UseTablesForSources(
782                "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
783            ))?;
784        }
785    }
786
787    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
788
789    if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
790        && include_metadata
791            .iter()
792            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
793    {
794        // TODO(guswynn): should this be `bail_unsupported!`?
795        sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
796    }
797    if !matches!(
798        source_connection,
799        CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
800    ) && !include_metadata.is_empty()
801    {
802        bail_unsupported!("INCLUDE metadata with non-Kafka sources");
803    }
804
805    if !include_metadata.is_empty()
806        && !matches!(
807            envelope,
808            ast::SourceEnvelope::Upsert { .. }
809                | ast::SourceEnvelope::None
810                | ast::SourceEnvelope::Debezium
811        )
812    {
813        sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
814    }
815
816    let external_connection =
817        plan_generic_source_connection(scx, source_connection, include_metadata)?;
818
819    let CreateSourceOptionExtracted {
820        timestamp_interval,
821        retain_history,
822        seen: _,
823    } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
824
825    let metadata_columns_desc = match external_connection {
826        GenericSourceConnection::Kafka(KafkaSourceConnection {
827            ref metadata_columns,
828            ..
829        }) => kafka_metadata_columns_desc(metadata_columns),
830        _ => vec![],
831    };
832
833    // Generate the relation description for the primary export of the source.
834    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
835        scx,
836        &envelope,
837        format,
838        Some(external_connection.default_key_desc()),
839        external_connection.default_value_desc(),
840        include_metadata,
841        metadata_columns_desc,
842        &external_connection,
843    )?;
844    plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
845
846    let names: Vec<_> = desc.iter_names().cloned().collect();
847    if let Some(dup) = names.iter().duplicates().next() {
848        sql_bail!("column {} specified more than once", dup.quoted());
849    }
850
851    // Apply user-specified key constraint
852    if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
853        // Don't remove this without addressing
854        // https://github.com/MaterializeInc/database-issues/issues/4371.
855        scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
856
857        let key_columns = columns
858            .into_iter()
859            .map(normalize::column_name)
860            .collect::<Vec<_>>();
861
862        let mut uniq = BTreeSet::new();
863        for col in key_columns.iter() {
864            if !uniq.insert(col) {
865                sql_bail!("Repeated column name in source key constraint: {}", col);
866            }
867        }
868
869        let key_indices = key_columns
870            .iter()
871            .map(|col| {
872                let name_idx = desc
873                    .get_by_name(col)
874                    .map(|(idx, _type)| idx)
875                    .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
876                if desc.get_unambiguous_name(name_idx).is_none() {
877                    sql_bail!("Ambiguous column in source key constraint: {}", col);
878                }
879                Ok(name_idx)
880            })
881            .collect::<Result<Vec<_>, _>>()?;
882
883        if !desc.typ().keys.is_empty() {
884            return Err(key_constraint_err(&desc, &key_columns));
885        } else {
886            desc = desc.with_key(key_indices);
887        }
888    }
889
890    let timestamp_interval = match timestamp_interval {
891        Some(duration) => {
892            // Only validate bounds for new statements (pcx is Some), not during
893            // catalog deserialization (pcx is None). Previously persisted sources
894            // may have intervals that no longer fall within the current bounds.
895            if scx.pcx.is_some() {
896                let min = scx.catalog.system_vars().min_timestamp_interval();
897                let max = scx.catalog.system_vars().max_timestamp_interval();
898                if duration < min || duration > max {
899                    return Err(PlanError::InvalidTimestampInterval {
900                        min,
901                        max,
902                        requested: duration,
903                    });
904                }
905            }
906            duration
907        }
908        None => scx.catalog.system_vars().default_timestamp_interval(),
909    };
910
911    let (desc, data_source) = match progress_subsource {
912        Some(name) => {
913            let DeferredItemName::Named(name) = name else {
914                sql_bail!("[internal error] progress subsource must be named during purification");
915            };
916            let ResolvedItemName::Item { id, .. } = name else {
917                sql_bail!("[internal error] invalid target id");
918            };
919
920            let details = match external_connection {
921                GenericSourceConnection::Kafka(ref c) => {
922                    SourceExportDetails::Kafka(KafkaSourceExportDetails {
923                        metadata_columns: c.metadata_columns.clone(),
924                    })
925                }
926                GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
927                    LoadGenerator::Auction
928                    | LoadGenerator::Marketing
929                    | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
930                    LoadGenerator::Counter { .. }
931                    | LoadGenerator::Clock
932                    | LoadGenerator::Datums
933                    | LoadGenerator::KeyValue(_) => {
934                        SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
935                            output: LoadGeneratorOutput::Default,
936                        })
937                    }
938                },
939                GenericSourceConnection::Postgres(_)
940                | GenericSourceConnection::MySql(_)
941                | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
942            };
943
944            let data_source = DataSourceDesc::OldSyntaxIngestion {
945                desc: SourceDesc {
946                    connection: external_connection,
947                    timestamp_interval,
948                },
949                progress_subsource: *id,
950                data_config: SourceExportDataConfig {
951                    encoding,
952                    envelope: envelope.clone(),
953                },
954                details,
955            };
956            (desc, data_source)
957        }
958        None => {
959            let desc = external_connection.timestamp_desc();
960            let data_source = DataSourceDesc::Ingestion(SourceDesc {
961                connection: external_connection,
962                timestamp_interval,
963            });
964            (desc, data_source)
965        }
966    };
967
968    let if_not_exists = *if_not_exists;
969    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
970
971    // Check for an object in the catalog with this same name
972    let full_name = scx.catalog.resolve_full_name(&name);
973    let partial_name = PartialItemName::from(full_name.clone());
974    // For PostgreSQL compatibility, we need to prevent creating sources when
975    // there is an existing object *or* type of the same name.
976    if let (false, Ok(item)) = (
977        if_not_exists,
978        scx.catalog.resolve_item_or_type(&partial_name),
979    ) {
980        return Err(PlanError::ItemAlreadyExists {
981            name: full_name.to_string(),
982            item_type: item.item_type(),
983        });
984    }
985
986    // We will rewrite the cluster if one is not provided, so we must use the
987    // `in_cluster` value we plan to normalize when we canonicalize the create
988    // statement.
989    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
990
991    let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
992
993    // Determine a default timeline for the source.
994    let timeline = match envelope {
995        SourceEnvelope::CdcV2 => {
996            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
997        }
998        _ => Timeline::EpochMilliseconds,
999    };
1000
1001    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1002
1003    let source = Source {
1004        create_sql,
1005        data_source,
1006        desc,
1007        compaction_window,
1008    };
1009
1010    Ok(Plan::CreateSource(CreateSourcePlan {
1011        name,
1012        source,
1013        if_not_exists,
1014        timeline,
1015        in_cluster: Some(in_cluster.id()),
1016    }))
1017}
1018
1019pub fn plan_generic_source_connection(
1020    scx: &StatementContext<'_>,
1021    source_connection: &CreateSourceConnection<Aug>,
1022    include_metadata: &Vec<SourceIncludeMetadata>,
1023) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1024    Ok(match source_connection {
1025        CreateSourceConnection::Kafka {
1026            connection,
1027            options,
1028        } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1029            scx,
1030            connection,
1031            options,
1032            include_metadata,
1033        )?),
1034        CreateSourceConnection::Postgres {
1035            connection,
1036            options,
1037        } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1038            scx, connection, options,
1039        )?),
1040        CreateSourceConnection::SqlServer {
1041            connection,
1042            options,
1043        } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1044            scx, connection, options,
1045        )?),
1046        CreateSourceConnection::MySql {
1047            connection,
1048            options,
1049        } => {
1050            GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1051        }
1052        CreateSourceConnection::LoadGenerator { generator, options } => {
1053            GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1054                scx,
1055                generator,
1056                options,
1057                include_metadata,
1058            )?)
1059        }
1060    })
1061}
1062
1063fn plan_load_generator_source_connection(
1064    scx: &StatementContext<'_>,
1065    generator: &ast::LoadGenerator,
1066    options: &Vec<LoadGeneratorOption<Aug>>,
1067    include_metadata: &Vec<SourceIncludeMetadata>,
1068) -> Result<LoadGeneratorSourceConnection, PlanError> {
1069    let load_generator =
1070        load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1071    let LoadGeneratorOptionExtracted {
1072        tick_interval,
1073        as_of,
1074        up_to,
1075        ..
1076    } = options.clone().try_into()?;
1077    let tick_micros = match tick_interval {
1078        Some(interval) => Some(interval.as_micros().try_into()?),
1079        None => None,
1080    };
1081    if up_to < as_of {
1082        sql_bail!("UP TO cannot be less than AS OF");
1083    }
1084    Ok(LoadGeneratorSourceConnection {
1085        load_generator,
1086        tick_micros,
1087        as_of,
1088        up_to,
1089    })
1090}
1091
1092fn plan_mysql_source_connection(
1093    scx: &StatementContext<'_>,
1094    connection: &ResolvedItemName,
1095    options: &Vec<MySqlConfigOption<Aug>>,
1096) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1097    let connection_item = scx.get_item_by_resolved_name(connection)?;
1098    match connection_item.connection()? {
1099        Connection::MySql(connection) => connection,
1100        _ => sql_bail!(
1101            "{} is not a MySQL connection",
1102            scx.catalog.resolve_full_name(connection_item.name())
1103        ),
1104    };
1105    let MySqlConfigOptionExtracted {
1106        details,
1107        // text/exclude columns are already part of the source-exports and are only included
1108        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1109        // be removed once we drop support for implicitly created subsources.
1110        text_columns: _,
1111        exclude_columns: _,
1112        seen: _,
1113    } = options.clone().try_into()?;
1114    let details = details
1115        .as_ref()
1116        .ok_or_else(|| internal_err!("MySQL source missing details"))?;
1117    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1118    let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1119    let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1120    Ok(MySqlSourceConnection {
1121        connection: connection_item.id(),
1122        connection_id: connection_item.id(),
1123        details,
1124    })
1125}
1126
1127fn plan_sqlserver_source_connection(
1128    scx: &StatementContext<'_>,
1129    connection: &ResolvedItemName,
1130    options: &Vec<SqlServerConfigOption<Aug>>,
1131) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1132    let connection_item = scx.get_item_by_resolved_name(connection)?;
1133    match connection_item.connection()? {
1134        Connection::SqlServer(connection) => connection,
1135        _ => sql_bail!(
1136            "{} is not a SQL Server connection",
1137            scx.catalog.resolve_full_name(connection_item.name())
1138        ),
1139    };
1140    let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1141    let details = details
1142        .as_ref()
1143        .ok_or_else(|| internal_err!("SQL Server source missing details"))?;
1144    let extras = hex::decode(details)
1145        .map_err(|e| sql_err!("{e}"))
1146        .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1147        .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1148    Ok(SqlServerSourceConnection {
1149        connection_id: connection_item.id(),
1150        connection: connection_item.id(),
1151        extras,
1152    })
1153}
1154
1155fn plan_postgres_source_connection(
1156    scx: &StatementContext<'_>,
1157    connection: &ResolvedItemName,
1158    options: &Vec<PgConfigOption<Aug>>,
1159) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1160    let connection_item = scx.get_item_by_resolved_name(connection)?;
1161    let PgConfigOptionExtracted {
1162        details,
1163        publication,
1164        // text columns are already part of the source-exports and are only included
1165        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1166        // be removed once we drop support for implicitly created subsources.
1167        text_columns: _,
1168        // exclude columns are already part of the source-exports and are only included
1169        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1170        // be removed once we drop support for implicitly created subsources.
1171        exclude_columns: _,
1172        seen: _,
1173    } = options.clone().try_into()?;
1174    let details = details
1175        .as_ref()
1176        .ok_or_else(|| internal_err!("Postgres source missing details"))?;
1177    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1178    let details =
1179        ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1180    let publication_details =
1181        PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1182    Ok(PostgresSourceConnection {
1183        connection: connection_item.id(),
1184        connection_id: connection_item.id(),
1185        // Validated during purification.
1186        publication: publication.ok_or_else(|| internal_err!("PUBLICATION option is required"))?,
1187        publication_details,
1188    })
1189}
1190
1191fn plan_kafka_source_connection(
1192    scx: &StatementContext<'_>,
1193    connection_name: &ResolvedItemName,
1194    options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1195    include_metadata: &Vec<SourceIncludeMetadata>,
1196) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1197    let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1198    if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1199        sql_bail!(
1200            "{} is not a kafka connection",
1201            scx.catalog.resolve_full_name(connection_item.name())
1202        )
1203    }
1204    let KafkaSourceConfigOptionExtracted {
1205        group_id_prefix,
1206        topic,
1207        topic_metadata_refresh_interval,
1208        start_timestamp: _, // purified into `start_offset`
1209        start_offset,
1210        seen: _,
1211    }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1212    // Validated during purification.
1213    let topic = topic.ok_or_else(|| internal_err!("TOPIC option is required"))?;
1214    let mut start_offsets = BTreeMap::new();
1215    if let Some(offsets) = start_offset {
1216        for (part, offset) in offsets.iter().enumerate() {
1217            if *offset < 0 {
1218                sql_bail!("START OFFSET must be a nonnegative integer");
1219            }
1220            start_offsets.insert(i32::try_from(part)?, *offset);
1221        }
1222    }
1223    if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1224        // This is a librdkafka-enforced restriction that, if violated,
1225        // would result in a runtime error for the source.
1226        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1227    }
1228    if topic_metadata_refresh_interval < Duration::from_secs(1) {
1229        // This is a librdkafka-enforced restriction that, if violated,
1230        // would result in a runtime error for the source.
1231        sql_bail!("TOPIC METADATA REFRESH INTERVAL must be at least 1 second");
1232    }
1233    let metadata_columns = include_metadata
1234        .into_iter()
1235        .flat_map(|item| match item {
1236            SourceIncludeMetadata::Timestamp { alias } => {
1237                let name = match alias {
1238                    Some(name) => name.to_string(),
1239                    None => "timestamp".to_owned(),
1240                };
1241                Some((name, KafkaMetadataKind::Timestamp))
1242            }
1243            SourceIncludeMetadata::Partition { alias } => {
1244                let name = match alias {
1245                    Some(name) => name.to_string(),
1246                    None => "partition".to_owned(),
1247                };
1248                Some((name, KafkaMetadataKind::Partition))
1249            }
1250            SourceIncludeMetadata::Offset { alias } => {
1251                let name = match alias {
1252                    Some(name) => name.to_string(),
1253                    None => "offset".to_owned(),
1254                };
1255                Some((name, KafkaMetadataKind::Offset))
1256            }
1257            SourceIncludeMetadata::Headers { alias } => {
1258                let name = match alias {
1259                    Some(name) => name.to_string(),
1260                    None => "headers".to_owned(),
1261                };
1262                Some((name, KafkaMetadataKind::Headers))
1263            }
1264            SourceIncludeMetadata::Header {
1265                alias,
1266                key,
1267                use_bytes,
1268            } => Some((
1269                alias.to_string(),
1270                KafkaMetadataKind::Header {
1271                    key: key.clone(),
1272                    use_bytes: *use_bytes,
1273                },
1274            )),
1275            SourceIncludeMetadata::Key { .. } => {
1276                // handled below
1277                None
1278            }
1279        })
1280        .collect();
1281    Ok(KafkaSourceConnection {
1282        connection: connection_item.id(),
1283        connection_id: connection_item.id(),
1284        topic,
1285        start_offsets,
1286        group_id_prefix,
1287        topic_metadata_refresh_interval,
1288        metadata_columns,
1289    })
1290}
1291
1292fn apply_source_envelope_encoding(
1293    scx: &StatementContext,
1294    envelope: &ast::SourceEnvelope,
1295    format: &Option<FormatSpecifier<Aug>>,
1296    key_desc: Option<RelationDesc>,
1297    value_desc: RelationDesc,
1298    include_metadata: &[SourceIncludeMetadata],
1299    metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1300    source_connection: &GenericSourceConnection<ReferencedConnection>,
1301) -> Result<
1302    (
1303        RelationDesc,
1304        SourceEnvelope,
1305        Option<SourceDataEncoding<ReferencedConnection>>,
1306    ),
1307    PlanError,
1308> {
1309    let encoding = match format {
1310        Some(format) => Some(get_encoding(scx, format, envelope)?),
1311        None => None,
1312    };
1313
1314    let (key_desc, value_desc) = match &encoding {
1315        Some(encoding) => {
1316            // If we are applying an encoding we need to ensure that the incoming value_desc is a
1317            // single column of type bytes.
1318            match value_desc.typ().columns() {
1319                [typ] => match typ.scalar_type {
1320                    SqlScalarType::Bytes => {}
1321                    _ => sql_bail!(
1322                        "The schema produced by the source is incompatible with format decoding"
1323                    ),
1324                },
1325                _ => sql_bail!(
1326                    "The schema produced by the source is incompatible with format decoding"
1327                ),
1328            }
1329
1330            let (key_desc, value_desc) = encoding.desc()?;
1331
1332            // TODO(petrosagg): This piece of code seems to be making a statement about the
1333            // nullability of the NONE envelope when the source is Kafka. As written, the code
1334            // misses opportunities to mark columns as not nullable and is over conservative. For
1335            // example in the case of `FORMAT BYTES ENVELOPE NONE` the output is indeed
1336            // non-nullable but we will mark it as nullable anyway. This kind of crude reasoning
1337            // should be replaced with precise type-level reasoning.
1338            let key_desc = key_desc.map(|desc| {
1339                let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1340                let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1341                if is_kafka && is_envelope_none {
1342                    RelationDesc::from_names_and_types(
1343                        desc.into_iter()
1344                            .map(|(name, typ)| (name, typ.nullable(true))),
1345                    )
1346                } else {
1347                    desc
1348                }
1349            });
1350            (key_desc, value_desc)
1351        }
1352        None => (key_desc, value_desc),
1353    };
1354
1355    // KEY VALUE load generators are the only UPSERT source that
1356    // has no encoding but defaults to `INCLUDE KEY`.
1357    //
1358    // As discussed
1359    // <https://github.com/MaterializeInc/materialize/pull/26246#issuecomment-2023558097>,
1360    // removing this special case amounts to deciding how to handle null keys
1361    // from sources, in a holistic way. We aren't yet prepared to do this, so we leave
1362    // this special case in.
1363    //
1364    // Note that this is safe because this generator is
1365    // 1. The only source with no encoding that can have its key included.
1366    // 2. Never produces null keys (or values, for that matter).
1367    let key_envelope_no_encoding = matches!(
1368        source_connection,
1369        GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1370            load_generator: LoadGenerator::KeyValue(_),
1371            ..
1372        })
1373    );
1374    let mut key_envelope = get_key_envelope(
1375        include_metadata,
1376        encoding.as_ref(),
1377        key_envelope_no_encoding,
1378    )?;
1379
1380    match (&envelope, &key_envelope) {
1381        (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1382        (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1383            "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1384        ),
1385        _ => {}
1386    };
1387
1388    // Not all source envelopes are compatible with all source connections.
1389    // Whoever constructs the source ingestion pipeline is responsible for
1390    // choosing compatible envelopes and connections.
1391    //
1392    // TODO(guswynn): ambiguously assert which connections and envelopes are
1393    // compatible in typechecking
1394    //
1395    // TODO: remove bails as more support for upsert is added.
1396    let envelope = match &envelope {
1397        // TODO: fixup key envelope
1398        ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1399        ast::SourceEnvelope::Debezium => {
1400            //TODO check that key envelope is not set
1401            let after_idx = match typecheck_debezium(&value_desc) {
1402                Ok((_before_idx, after_idx)) => Ok(after_idx),
1403                Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1404                    Some(DataEncoding::Avro(_)) => Err(type_err),
1405                    _ => Err(sql_err!(
1406                        "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1407                    )),
1408                },
1409            }?;
1410
1411            UnplannedSourceEnvelope::Upsert {
1412                style: UpsertStyle::Debezium { after_idx },
1413            }
1414        }
1415        ast::SourceEnvelope::Upsert {
1416            value_decode_err_policy,
1417        } => {
1418            let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1419                None => {
1420                    if !key_envelope_no_encoding {
1421                        bail_unsupported!(format!(
1422                            "UPSERT requires a key/value format: {:?}",
1423                            format
1424                        ))
1425                    }
1426                    None
1427                }
1428                Some(key_encoding) => Some(key_encoding),
1429            };
1430            // `ENVELOPE UPSERT` implies `INCLUDE KEY`, if it is not explicitly
1431            // specified.
1432            if key_envelope == KeyEnvelope::None {
1433                key_envelope = get_unnamed_key_envelope(key_encoding)?;
1434            }
1435            // If the value decode error policy is not set we use the default upsert style.
1436            let style = match value_decode_err_policy.as_slice() {
1437                [] => UpsertStyle::Default(key_envelope),
1438                [SourceErrorPolicy::Inline { alias }] => {
1439                    scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1440                    UpsertStyle::ValueErrInline {
1441                        key_envelope,
1442                        error_column: alias
1443                            .as_ref()
1444                            .map_or_else(|| "error".to_string(), |a| a.to_string()),
1445                    }
1446                }
1447                _ => {
1448                    bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1449                }
1450            };
1451
1452            UnplannedSourceEnvelope::Upsert { style }
1453        }
1454        ast::SourceEnvelope::CdcV2 => {
1455            scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1456            //TODO check that key envelope is not set
1457            match format {
1458                Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1459                _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1460            }
1461            UnplannedSourceEnvelope::CdcV2
1462        }
1463    };
1464
1465    let metadata_desc = included_column_desc(metadata_columns_desc);
1466    let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1467
1468    Ok((desc, envelope, encoding))
1469}
1470
1471/// Plans the RelationDesc for a source export (subsource or table) that has a defined list
1472/// of columns and constraints.
1473fn plan_source_export_desc(
1474    scx: &StatementContext,
1475    name: &UnresolvedItemName,
1476    columns: &Vec<ColumnDef<Aug>>,
1477    constraints: &Vec<TableConstraint<Aug>>,
1478) -> Result<RelationDesc, PlanError> {
1479    let names: Vec<_> = columns
1480        .iter()
1481        .map(|c| normalize::column_name(c.name.clone()))
1482        .collect();
1483
1484    if let Some(dup) = names.iter().duplicates().next() {
1485        sql_bail!("column {} specified more than once", dup.quoted());
1486    }
1487
1488    // Build initial relation type that handles declared data types
1489    // and NOT NULL constraints.
1490    let mut column_types = Vec::with_capacity(columns.len());
1491    let mut keys = Vec::new();
1492
1493    for (i, c) in columns.into_iter().enumerate() {
1494        let aug_data_type = &c.data_type;
1495        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1496        let mut nullable = true;
1497        for option in &c.options {
1498            match &option.option {
1499                ColumnOption::NotNull => nullable = false,
1500                ColumnOption::Default(_) => {
1501                    bail_unsupported!("Source export with default value")
1502                }
1503                ColumnOption::Unique { is_primary } => {
1504                    keys.push(vec![i]);
1505                    if *is_primary {
1506                        nullable = false;
1507                    }
1508                }
1509                other => {
1510                    bail_unsupported!(format!("Source export with column constraint: {}", other))
1511                }
1512            }
1513        }
1514        column_types.push(ty.nullable(nullable));
1515    }
1516
1517    let mut seen_primary = false;
1518    'c: for constraint in constraints {
1519        match constraint {
1520            TableConstraint::Unique {
1521                name: _,
1522                columns,
1523                is_primary,
1524                nulls_not_distinct,
1525            } => {
1526                if seen_primary && *is_primary {
1527                    sql_bail!(
1528                        "multiple primary keys for source export {} are not allowed",
1529                        name.to_ast_string_stable()
1530                    );
1531                }
1532                seen_primary = *is_primary || seen_primary;
1533
1534                let mut key = vec![];
1535                for column in columns {
1536                    let column = normalize::column_name(column.clone());
1537                    match names.iter().position(|name| *name == column) {
1538                        None => sql_bail!("unknown column in constraint: {}", column),
1539                        Some(i) => {
1540                            let nullable = &mut column_types[i].nullable;
1541                            if *is_primary {
1542                                if *nulls_not_distinct {
1543                                    sql_bail!(
1544                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1545                                    );
1546                                }
1547                                *nullable = false;
1548                            } else if !(*nulls_not_distinct || !*nullable) {
1549                                // Non-primary key unique constraints are only keys if all of their
1550                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
1551                                break 'c;
1552                            }
1553
1554                            key.push(i);
1555                        }
1556                    }
1557                }
1558
1559                if *is_primary {
1560                    keys.insert(0, key);
1561                } else {
1562                    keys.push(key);
1563                }
1564            }
1565            TableConstraint::ForeignKey { .. } => {
1566                bail_unsupported!("Source export with a foreign key")
1567            }
1568            TableConstraint::Check { .. } => {
1569                bail_unsupported!("Source export with a check constraint")
1570            }
1571        }
1572    }
1573
1574    let typ = SqlRelationType::new(column_types).with_keys(keys);
1575    let desc = RelationDesc::new(typ, names);
1576    Ok(desc)
1577}
1578
1579generate_extracted_config!(
1580    CreateSubsourceOption,
1581    (Progress, bool, Default(false)),
1582    (ExternalReference, UnresolvedItemName),
1583    (RetainHistory, OptionalDuration),
1584    (TextColumns, Vec::<Ident>, Default(vec![])),
1585    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1586    (Details, String)
1587);
1588
1589pub fn plan_create_subsource(
1590    scx: &StatementContext,
1591    stmt: CreateSubsourceStatement<Aug>,
1592) -> Result<Plan, PlanError> {
1593    let CreateSubsourceStatement {
1594        name,
1595        columns,
1596        of_source,
1597        constraints,
1598        if_not_exists,
1599        with_options,
1600    } = &stmt;
1601
1602    let CreateSubsourceOptionExtracted {
1603        progress,
1604        retain_history,
1605        external_reference,
1606        text_columns,
1607        exclude_columns,
1608        details,
1609        seen: _,
1610    } = with_options.clone().try_into()?;
1611
1612    // This invariant is enforced during purification; we are responsible for
1613    // creating the AST for subsources as a response to CREATE SOURCE
1614    // statements, so this would fire in integration testing if we failed to
1615    // uphold it.
1616    if !(progress ^ (external_reference.is_some() && of_source.is_some())) {
1617        bail_internal!(
1618            "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1619        );
1620    }
1621
1622    let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1623
1624    let data_source = if let Some(source_reference) = of_source {
1625        // If the new source table syntax is forced we should not be creating any non-progress
1626        // subsources.
1627        if scx.catalog.system_vars().enable_create_table_from_source()
1628            && scx.catalog.system_vars().force_source_table_syntax()
1629        {
1630            Err(PlanError::UseTablesForSources(
1631                "CREATE SUBSOURCE".to_string(),
1632            ))?;
1633        }
1634
1635        // This is a subsource with the "natural" dependency order, i.e. it is
1636        // not a legacy subsource with the inverted structure.
1637        let ingestion_id = *source_reference.item_id();
1638        let external_reference = external_reference.ok_or_else(|| {
1639            sql_err!("CREATE SUBSOURCE with REFERENCES requires EXTERNAL REFERENCE option")
1640        })?;
1641
1642        // Decode the details option stored on the subsource statement, which contains information
1643        // created during the purification process.
1644        let details = details
1645            .as_ref()
1646            .ok_or_else(|| internal_err!("source-export subsource missing details"))?;
1647        let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1648        let details =
1649            ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1650        let details =
1651            SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1652        let details = match details {
1653            SourceExportStatementDetails::Postgres { table } => {
1654                SourceExportDetails::Postgres(PostgresSourceExportDetails {
1655                    column_casts: crate::pure::postgres::generate_column_casts(
1656                        scx,
1657                        &table,
1658                        &text_columns,
1659                    )?,
1660                    table,
1661                })
1662            }
1663            SourceExportStatementDetails::MySql {
1664                table,
1665                initial_gtid_set,
1666                binlog_full_metadata,
1667            } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1668                table,
1669                initial_gtid_set,
1670                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1671                exclude_columns: exclude_columns
1672                    .into_iter()
1673                    .map(|c| c.into_string())
1674                    .collect(),
1675                binlog_full_metadata,
1676            }),
1677            SourceExportStatementDetails::SqlServer {
1678                table,
1679                capture_instance,
1680                initial_lsn,
1681            } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1682                capture_instance,
1683                table,
1684                initial_lsn,
1685                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1686                exclude_columns: exclude_columns
1687                    .into_iter()
1688                    .map(|c| c.into_string())
1689                    .collect(),
1690            }),
1691            SourceExportStatementDetails::LoadGenerator { output } => {
1692                SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1693            }
1694            SourceExportStatementDetails::Kafka {} => {
1695                bail_unsupported!("subsources cannot reference Kafka sources")
1696            }
1697        };
1698        DataSourceDesc::IngestionExport {
1699            ingestion_id,
1700            external_reference,
1701            details,
1702            // Subsources don't currently support non-default envelopes / encoding
1703            data_config: SourceExportDataConfig {
1704                envelope: SourceEnvelope::None(NoneEnvelope {
1705                    key_envelope: KeyEnvelope::None,
1706                    key_arity: 0,
1707                }),
1708                encoding: None,
1709            },
1710        }
1711    } else if progress {
1712        DataSourceDesc::Progress
1713    } else {
1714        sql_bail!("CREATE SUBSOURCE must specify one of PROGRESS or REFERENCES option")
1715    };
1716
1717    let if_not_exists = *if_not_exists;
1718    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1719
1720    let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1721
1722    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1723    let source = Source {
1724        create_sql,
1725        data_source,
1726        desc,
1727        compaction_window,
1728    };
1729
1730    Ok(Plan::CreateSource(CreateSourcePlan {
1731        name,
1732        source,
1733        if_not_exists,
1734        timeline: Timeline::EpochMilliseconds,
1735        in_cluster: None,
1736    }))
1737}
1738
1739generate_extracted_config!(
1740    TableFromSourceOption,
1741    (TextColumns, Vec::<Ident>, Default(vec![])),
1742    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1743    (PartitionBy, Vec<Ident>),
1744    (RetainHistory, OptionalDuration),
1745    (Details, String)
1746);
1747
1748pub fn plan_create_table_from_source(
1749    scx: &StatementContext,
1750    stmt: CreateTableFromSourceStatement<Aug>,
1751) -> Result<Plan, PlanError> {
1752    if !scx.catalog.system_vars().enable_create_table_from_source() {
1753        sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1754    }
1755
1756    let CreateTableFromSourceStatement {
1757        name,
1758        columns,
1759        constraints,
1760        if_not_exists,
1761        source,
1762        external_reference,
1763        envelope,
1764        format,
1765        include_metadata,
1766        with_options,
1767    } = &stmt;
1768
1769    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1770
1771    let TableFromSourceOptionExtracted {
1772        text_columns,
1773        exclude_columns,
1774        retain_history,
1775        partition_by,
1776        details,
1777        seen: _,
1778    } = with_options.clone().try_into()?;
1779
1780    let source_item = scx.get_item_by_resolved_name(source)?;
1781    let ingestion_id = source_item.id();
1782
1783    // Decode the details option stored on the statement, which contains information
1784    // created during the purification process.
1785    let details = details
1786        .as_ref()
1787        .ok_or_else(|| internal_err!("source-export missing details"))?;
1788    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1789    let details =
1790        ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1791    let details =
1792        SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1793
1794    if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1795        && include_metadata
1796            .iter()
1797            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1798    {
1799        // TODO(guswynn): should this be `bail_unsupported!`?
1800        sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1801    }
1802    if !matches!(
1803        details,
1804        SourceExportStatementDetails::Kafka { .. }
1805            | SourceExportStatementDetails::LoadGenerator { .. }
1806    ) && !include_metadata.is_empty()
1807    {
1808        bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1809    }
1810
1811    let details = match details {
1812        SourceExportStatementDetails::Postgres { table } => {
1813            SourceExportDetails::Postgres(PostgresSourceExportDetails {
1814                column_casts: crate::pure::postgres::generate_column_casts(
1815                    scx,
1816                    &table,
1817                    &text_columns,
1818                )?,
1819                table,
1820            })
1821        }
1822        SourceExportStatementDetails::MySql {
1823            table,
1824            initial_gtid_set,
1825            binlog_full_metadata,
1826        } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1827            table,
1828            initial_gtid_set,
1829            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1830            exclude_columns: exclude_columns
1831                .into_iter()
1832                .map(|c| c.into_string())
1833                .collect(),
1834            binlog_full_metadata,
1835        }),
1836        SourceExportStatementDetails::SqlServer {
1837            table,
1838            capture_instance,
1839            initial_lsn,
1840        } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1841            table,
1842            capture_instance,
1843            initial_lsn,
1844            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1845            exclude_columns: exclude_columns
1846                .into_iter()
1847                .map(|c| c.into_string())
1848                .collect(),
1849        }),
1850        SourceExportStatementDetails::LoadGenerator { output } => {
1851            SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1852        }
1853        SourceExportStatementDetails::Kafka {} => {
1854            if !include_metadata.is_empty()
1855                && !matches!(
1856                    envelope,
1857                    ast::SourceEnvelope::Upsert { .. }
1858                        | ast::SourceEnvelope::None
1859                        | ast::SourceEnvelope::Debezium
1860                )
1861            {
1862                // TODO(guswynn): should this be `bail_unsupported!`?
1863                sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1864            }
1865
1866            let metadata_columns = include_metadata
1867                .into_iter()
1868                .flat_map(|item| match item {
1869                    SourceIncludeMetadata::Timestamp { alias } => {
1870                        let name = match alias {
1871                            Some(name) => name.to_string(),
1872                            None => "timestamp".to_owned(),
1873                        };
1874                        Some((name, KafkaMetadataKind::Timestamp))
1875                    }
1876                    SourceIncludeMetadata::Partition { alias } => {
1877                        let name = match alias {
1878                            Some(name) => name.to_string(),
1879                            None => "partition".to_owned(),
1880                        };
1881                        Some((name, KafkaMetadataKind::Partition))
1882                    }
1883                    SourceIncludeMetadata::Offset { alias } => {
1884                        let name = match alias {
1885                            Some(name) => name.to_string(),
1886                            None => "offset".to_owned(),
1887                        };
1888                        Some((name, KafkaMetadataKind::Offset))
1889                    }
1890                    SourceIncludeMetadata::Headers { alias } => {
1891                        let name = match alias {
1892                            Some(name) => name.to_string(),
1893                            None => "headers".to_owned(),
1894                        };
1895                        Some((name, KafkaMetadataKind::Headers))
1896                    }
1897                    SourceIncludeMetadata::Header {
1898                        alias,
1899                        key,
1900                        use_bytes,
1901                    } => Some((
1902                        alias.to_string(),
1903                        KafkaMetadataKind::Header {
1904                            key: key.clone(),
1905                            use_bytes: *use_bytes,
1906                        },
1907                    )),
1908                    SourceIncludeMetadata::Key { .. } => {
1909                        // handled below
1910                        None
1911                    }
1912                })
1913                .collect();
1914
1915            SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1916        }
1917    };
1918
1919    let source_connection = &source_item
1920        .source_desc()?
1921        .ok_or_else(|| sql_err!("item is not a source"))?
1922        .connection;
1923
1924    // Some source-types (e.g. postgres, mysql, multi-output load-gen sources) define a value_schema
1925    // during purification and define the `columns` and `constraints` fields for the statement,
1926    // whereas other source-types (e.g. kafka, single-output load-gen sources) do not, so instead
1927    // we use the source connection's default schema.
1928    let (key_desc, value_desc) =
1929        if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1930            let columns = match columns {
1931                TableFromSourceColumns::Defined(columns) => columns,
1932                _ => bail_internal!("expected column definitions to be present"),
1933            };
1934            let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1935            (None, desc)
1936        } else {
1937            let key_desc = source_connection.default_key_desc();
1938            let value_desc = source_connection.default_value_desc();
1939            (Some(key_desc), value_desc)
1940        };
1941
1942    let metadata_columns_desc = match &details {
1943        SourceExportDetails::Kafka(KafkaSourceExportDetails {
1944            metadata_columns, ..
1945        }) => kafka_metadata_columns_desc(metadata_columns),
1946        _ => vec![],
1947    };
1948
1949    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1950        scx,
1951        &envelope,
1952        format,
1953        key_desc,
1954        value_desc,
1955        include_metadata,
1956        metadata_columns_desc,
1957        source_connection,
1958    )?;
1959    if let TableFromSourceColumns::Named(col_names) = columns {
1960        plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1961    }
1962
1963    let names: Vec<_> = desc.iter_names().cloned().collect();
1964    if let Some(dup) = names.iter().duplicates().next() {
1965        sql_bail!("column {} specified more than once", dup.quoted());
1966    }
1967
1968    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1969
1970    // Allow users to specify a timeline. If they do not, determine a default
1971    // timeline for the source.
1972    let timeline = match envelope {
1973        SourceEnvelope::CdcV2 => {
1974            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1975        }
1976        _ => Timeline::EpochMilliseconds,
1977    };
1978
1979    if let Some(partition_by) = partition_by {
1980        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1981        check_partition_by(&desc, partition_by)?;
1982    }
1983
1984    let data_source = DataSourceDesc::IngestionExport {
1985        ingestion_id,
1986        // Populated during purification.
1987        external_reference: external_reference
1988            .as_ref()
1989            .ok_or_else(|| sql_err!("EXTERNAL REFERENCE is required"))?
1990            .clone(),
1991        details,
1992        data_config: SourceExportDataConfig { envelope, encoding },
1993    };
1994
1995    let if_not_exists = *if_not_exists;
1996
1997    let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1998
1999    let compaction_window = plan_retain_history_option(scx, retain_history)?;
2000    let table = Table {
2001        create_sql,
2002        desc: VersionedRelationDesc::new(desc),
2003        temporary: false,
2004        compaction_window,
2005        data_source: TableDataSource::DataSource {
2006            desc: data_source,
2007            timeline,
2008        },
2009    };
2010
2011    Ok(Plan::CreateTable(CreateTablePlan {
2012        name,
2013        table,
2014        if_not_exists,
2015    }))
2016}
2017
2018generate_extracted_config!(
2019    LoadGeneratorOption,
2020    (TickInterval, Duration),
2021    (AsOf, u64, Default(0_u64)),
2022    (UpTo, u64, Default(u64::MAX)),
2023    (ScaleFactor, f64),
2024    (MaxCardinality, u64),
2025    (Keys, u64),
2026    (SnapshotRounds, u64),
2027    (TransactionalSnapshot, bool),
2028    (ValueSize, u64),
2029    (Seed, u64),
2030    (Partitions, u64),
2031    (BatchSize, u64)
2032);
2033
2034impl LoadGeneratorOptionExtracted {
2035    pub(super) fn ensure_only_valid_options(
2036        &self,
2037        loadgen: &ast::LoadGenerator,
2038    ) -> Result<(), PlanError> {
2039        use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2040
2041        let mut options = self.seen.clone();
2042
2043        let permitted_options: &[_] = match loadgen {
2044            ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2045            ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2046            ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2047            ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2048            ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2049            ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2050            ast::LoadGenerator::KeyValue => &[
2051                TickInterval,
2052                Keys,
2053                SnapshotRounds,
2054                TransactionalSnapshot,
2055                ValueSize,
2056                Seed,
2057                Partitions,
2058                BatchSize,
2059            ],
2060        };
2061
2062        for o in permitted_options {
2063            options.remove(o);
2064        }
2065
2066        if !options.is_empty() {
2067            sql_bail!(
2068                "{} load generators do not support {} values",
2069                loadgen,
2070                options.iter().join(", ")
2071            )
2072        }
2073
2074        Ok(())
2075    }
2076}
2077
2078pub(crate) fn load_generator_ast_to_generator(
2079    scx: &StatementContext,
2080    loadgen: &ast::LoadGenerator,
2081    options: &[LoadGeneratorOption<Aug>],
2082    include_metadata: &[SourceIncludeMetadata],
2083) -> Result<LoadGenerator, PlanError> {
2084    let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2085    extracted.ensure_only_valid_options(loadgen)?;
2086
2087    if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2088        sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2089    }
2090
2091    let load_generator = match loadgen {
2092        ast::LoadGenerator::Auction => LoadGenerator::Auction,
2093        ast::LoadGenerator::Clock => {
2094            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_CLOCK)?;
2095            LoadGenerator::Clock
2096        }
2097        ast::LoadGenerator::Counter => {
2098            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_COUNTER)?;
2099            let LoadGeneratorOptionExtracted {
2100                max_cardinality, ..
2101            } = extracted;
2102            LoadGenerator::Counter { max_cardinality }
2103        }
2104        ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2105        ast::LoadGenerator::Datums => {
2106            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_DATUMS)?;
2107            LoadGenerator::Datums
2108        }
2109        ast::LoadGenerator::Tpch => {
2110            let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2111
2112            // Default to 0.01 scale factor (=10MB).
2113            let sf: f64 = scale_factor.unwrap_or(0.01);
2114            if !sf.is_finite() || sf < 0.0 {
2115                sql_bail!("unsupported scale factor {sf}");
2116            }
2117
2118            let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2119                let total = (sf * multiplier).floor();
2120                let mut i = i64::try_cast_from(total)
2121                    .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2122                if i < 1 {
2123                    i = 1;
2124                }
2125                Ok(i)
2126            };
2127
2128            // The multiplications here are safely unchecked because they will
2129            // overflow to infinity, which will be caught by f64_to_i64.
2130            let count_supplier = f_to_i(10_000f64)?;
2131            let count_part = f_to_i(200_000f64)?;
2132            let count_customer = f_to_i(150_000f64)?;
2133            let count_orders = f_to_i(150_000f64 * 10f64)?;
2134            let count_clerk = f_to_i(1_000f64)?;
2135
2136            LoadGenerator::Tpch {
2137                count_supplier,
2138                count_part,
2139                count_customer,
2140                count_orders,
2141                count_clerk,
2142            }
2143        }
2144        mz_sql_parser::ast::LoadGenerator::KeyValue => {
2145            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2146            let LoadGeneratorOptionExtracted {
2147                keys,
2148                snapshot_rounds,
2149                transactional_snapshot,
2150                value_size,
2151                tick_interval,
2152                seed,
2153                partitions,
2154                batch_size,
2155                ..
2156            } = extracted;
2157
2158            let mut include_offset = None;
2159            for im in include_metadata {
2160                match im {
2161                    SourceIncludeMetadata::Offset { alias } => {
2162                        include_offset = match alias {
2163                            Some(alias) => Some(alias.to_string()),
2164                            None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2165                        }
2166                    }
2167                    SourceIncludeMetadata::Key { .. } => continue,
2168
2169                    _ => {
2170                        sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2171                    }
2172                };
2173            }
2174
2175            let lgkv = KeyValueLoadGenerator {
2176                keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2177                snapshot_rounds: snapshot_rounds
2178                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2179                // Defaults to true.
2180                transactional_snapshot: transactional_snapshot.unwrap_or(true),
2181                value_size: value_size
2182                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2183                partitions: partitions
2184                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2185                tick_interval,
2186                batch_size: batch_size
2187                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2188                seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2189                include_offset,
2190            };
2191
2192            if lgkv.keys == 0
2193                || lgkv.partitions == 0
2194                || lgkv.value_size == 0
2195                || lgkv.batch_size == 0
2196            {
2197                sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2198            }
2199
2200            if lgkv.keys % lgkv.partitions != 0 {
2201                sql_bail!("KEYS must be a multiple of PARTITIONS")
2202            }
2203
2204            if lgkv.batch_size > lgkv.keys {
2205                sql_bail!("KEYS must be larger than BATCH SIZE")
2206            }
2207
2208            // This constraints simplifies the source implementation.
2209            // We can lift it later.
2210            if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2211                sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2212            }
2213
2214            if lgkv.snapshot_rounds == 0 {
2215                sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2216            }
2217
2218            LoadGenerator::KeyValue(lgkv)
2219        }
2220    };
2221
2222    Ok(load_generator)
2223}
2224
2225fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2226    let before = value_desc.get_by_name(&"before".into());
2227    let (after_idx, after_ty) = value_desc
2228        .get_by_name(&"after".into())
2229        .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2230    let before_idx = if let Some((before_idx, before_ty)) = before {
2231        if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2232            sql_bail!("'before' column must be of type record");
2233        }
2234        if before_ty != after_ty {
2235            sql_bail!("'before' type differs from 'after' column");
2236        }
2237        Some(before_idx)
2238    } else {
2239        None
2240    };
2241    Ok((before_idx, after_idx))
2242}
2243
2244fn get_encoding(
2245    scx: &StatementContext,
2246    format: &FormatSpecifier<Aug>,
2247    envelope: &ast::SourceEnvelope,
2248) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2249    let encoding = match format {
2250        FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2251        FormatSpecifier::KeyValue { key, value } => {
2252            let key = {
2253                let encoding = get_encoding_inner(scx, key)?;
2254                Some(encoding.key.unwrap_or(encoding.value))
2255            };
2256            let value = get_encoding_inner(scx, value)?.value;
2257            SourceDataEncoding { key, value }
2258        }
2259    };
2260
2261    let requires_keyvalue = matches!(
2262        envelope,
2263        ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2264    );
2265    let is_keyvalue = encoding.key.is_some();
2266    if requires_keyvalue && !is_keyvalue {
2267        sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2268    };
2269
2270    Ok(encoding)
2271}
2272
2273/// Determine the cluster ID to use for this item.
2274///
2275/// If `in_cluster` is `None` we will update it to refer to the default cluster.
2276/// Because of this, do not normalize/canonicalize the create SQL statement
2277/// until after calling this function.
2278fn source_sink_cluster_config<'a, 'ctx>(
2279    scx: &'a StatementContext<'ctx>,
2280    in_cluster: &mut Option<ResolvedClusterName>,
2281) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2282    let cluster = match in_cluster {
2283        None => {
2284            let cluster = scx.catalog.resolve_cluster(None)?;
2285            *in_cluster = Some(ResolvedClusterName {
2286                id: cluster.id(),
2287                print_name: None,
2288            });
2289            cluster
2290        }
2291        Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2292    };
2293
2294    Ok(cluster)
2295}
2296
2297generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2298
2299#[derive(Debug)]
2300pub struct Schema {
2301    pub key_schema: Option<String>,
2302    pub value_schema: String,
2303    /// Reference schemas for the key schema, in dependency order.
2304    pub key_reference_schemas: Vec<String>,
2305    /// Reference schemas for the value schema, in dependency order.
2306    pub value_reference_schemas: Vec<String>,
2307    pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2308    pub confluent_wire_format: bool,
2309}
2310
2311fn get_encoding_inner(
2312    scx: &StatementContext,
2313    format: &Format<Aug>,
2314) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2315    let value = match format {
2316        Format::Bytes => DataEncoding::Bytes,
2317        Format::Avro(schema) => {
2318            let Schema {
2319                key_schema,
2320                value_schema,
2321                key_reference_schemas,
2322                value_reference_schemas,
2323                csr_connection,
2324                confluent_wire_format,
2325            } = match schema {
2326                // TODO(jldlaughlin): we need a way to pass in primary key information
2327                // when building a source from a string or file.
2328                AvroSchema::InlineSchema {
2329                    schema: ast::Schema { schema },
2330                    with_options,
2331                } => {
2332                    let AvroSchemaOptionExtracted {
2333                        confluent_wire_format,
2334                        ..
2335                    } = with_options.clone().try_into()?;
2336
2337                    Schema {
2338                        key_schema: None,
2339                        value_schema: schema.clone(),
2340                        key_reference_schemas: vec![],
2341                        value_reference_schemas: vec![],
2342                        csr_connection: None,
2343                        confluent_wire_format,
2344                    }
2345                }
2346                AvroSchema::Csr {
2347                    csr_connection:
2348                        CsrConnectionAvro {
2349                            connection,
2350                            seed,
2351                            key_strategy: _,
2352                            value_strategy: _,
2353                        },
2354                } => {
2355                    let item = scx.get_item_by_resolved_name(&connection.connection)?;
2356                    let csr_connection = match item.connection()? {
2357                        Connection::Csr(_) => item.id(),
2358                        _ => {
2359                            sql_bail!(
2360                                "{} is not a schema registry connection",
2361                                scx.catalog
2362                                    .resolve_full_name(item.name())
2363                                    .to_string()
2364                                    .quoted()
2365                            )
2366                        }
2367                    };
2368
2369                    if let Some(seed) = seed {
2370                        Schema {
2371                            key_schema: seed.key_schema.clone(),
2372                            value_schema: seed.value_schema.clone(),
2373                            key_reference_schemas: seed.key_reference_schemas.clone(),
2374                            value_reference_schemas: seed.value_reference_schemas.clone(),
2375                            csr_connection: Some(csr_connection),
2376                            confluent_wire_format: true,
2377                        }
2378                    } else {
2379                        sql_bail!("Avro CSR seed resolution has not been performed")
2380                    }
2381                }
2382            };
2383
2384            // Map the legacy (csr_connection, confluent_wire_format) pair to
2385            // the unified `WireFormat`. `(Some, false)` is unreachable in
2386            // practice (the planner only sets `confluent_wire_format = false`
2387            // on inline schemas, which never carry a CSR connection) but is
2388            // rejected explicitly here to keep the invariant local.
2389            let wire_format = match (csr_connection.clone(), confluent_wire_format) {
2390                (None, false) => WireFormat::None,
2391                (None, true) => WireFormat::Confluent { registry: None },
2392                (Some(c), true) => WireFormat::Confluent { registry: Some(c) },
2393                (Some(_), false) => {
2394                    sql_bail!(
2395                        "internal error: AVRO source has CSR connection but \
2396                         CONFLUENT WIRE FORMAT = false"
2397                    )
2398                }
2399            };
2400
2401            if let Some(key_schema) = key_schema {
2402                return Ok(SourceDataEncoding {
2403                    key: Some(DataEncoding::Avro(AvroEncoding {
2404                        schema: key_schema,
2405                        reference_schemas: key_reference_schemas,
2406                        wire_format: wire_format.clone(),
2407                    })),
2408                    value: DataEncoding::Avro(AvroEncoding {
2409                        schema: value_schema,
2410                        reference_schemas: value_reference_schemas,
2411                        wire_format,
2412                    }),
2413                });
2414            } else {
2415                DataEncoding::Avro(AvroEncoding {
2416                    schema: value_schema,
2417                    reference_schemas: value_reference_schemas,
2418                    wire_format,
2419                })
2420            }
2421        }
2422        Format::Protobuf(schema) => match schema {
2423            ProtobufSchema::Csr {
2424                csr_connection:
2425                    CsrConnectionProtobuf {
2426                        connection:
2427                            CsrConnection {
2428                                connection,
2429                                options,
2430                            },
2431                        seed,
2432                    },
2433            } => {
2434                if let Some(CsrSeedProtobuf { key, value }) = seed {
2435                    let item = scx.get_item_by_resolved_name(connection)?;
2436                    let _ = match item.connection()? {
2437                        Connection::Csr(connection) => connection,
2438                        _ => {
2439                            sql_bail!(
2440                                "{} is not a schema registry connection",
2441                                scx.catalog
2442                                    .resolve_full_name(item.name())
2443                                    .to_string()
2444                                    .quoted()
2445                            )
2446                        }
2447                    };
2448
2449                    if !options.is_empty() {
2450                        sql_bail!("Protobuf CSR connections do not support any options");
2451                    }
2452
2453                    let value = DataEncoding::Protobuf(ProtobufEncoding {
2454                        descriptors: strconv::parse_bytes(&value.schema)?,
2455                        message_name: value.message_name.clone(),
2456                        confluent_wire_format: true,
2457                    });
2458                    if let Some(key) = key {
2459                        return Ok(SourceDataEncoding {
2460                            key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2461                                descriptors: strconv::parse_bytes(&key.schema)?,
2462                                message_name: key.message_name.clone(),
2463                                confluent_wire_format: true,
2464                            })),
2465                            value,
2466                        });
2467                    }
2468                    value
2469                } else {
2470                    sql_bail!("Protobuf CSR seed resolution has not been performed")
2471                }
2472            }
2473            ProtobufSchema::InlineSchema {
2474                message_name,
2475                schema: ast::Schema { schema },
2476            } => {
2477                let descriptors = strconv::parse_bytes(schema)?;
2478
2479                DataEncoding::Protobuf(ProtobufEncoding {
2480                    descriptors,
2481                    message_name: message_name.to_owned(),
2482                    confluent_wire_format: false,
2483                })
2484            }
2485        },
2486        Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2487            regex: mz_repr::adt::regex::Regex::new(regex, false)
2488                .map_err(|e| sql_err!("parsing regex: {e}"))?,
2489        }),
2490        Format::Csv { columns, delimiter } => {
2491            let columns = match columns {
2492                CsvColumns::Header { names } => {
2493                    if names.is_empty() {
2494                        sql_bail!("[internal error] column spec should get names in purify")
2495                    }
2496                    ColumnSpec::Header {
2497                        names: names.iter().cloned().map(|n| n.into_string()).collect(),
2498                    }
2499                }
2500                CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2501            };
2502            DataEncoding::Csv(CsvEncoding {
2503                columns,
2504                delimiter: u8::try_from(*delimiter)
2505                    .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2506            })
2507        }
2508        Format::Json { array: false } => DataEncoding::Json,
2509        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2510        Format::Text => DataEncoding::Text,
2511    };
2512    Ok(SourceDataEncoding { key: None, value })
2513}
2514
2515/// Extract the key envelope, if it is requested
2516fn get_key_envelope(
2517    included_items: &[SourceIncludeMetadata],
2518    encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2519    key_envelope_no_encoding: bool,
2520) -> Result<KeyEnvelope, PlanError> {
2521    let key_definition = included_items
2522        .iter()
2523        .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2524    if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2525        match (alias, encoding.and_then(|e| e.key.as_ref())) {
2526            (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2527            (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2528            (Some(name), _) if key_envelope_no_encoding => {
2529                Ok(KeyEnvelope::Named(name.as_str().to_string()))
2530            }
2531            (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2532            (_, None) => {
2533                // `kd.alias` == `None` means `INCLUDE KEY`
2534                // `kd.alias` == `Some(_) means INCLUDE KEY AS ___`
2535                // These both make sense with the same error message
2536                sql_bail!(
2537                    "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2538                        got bare FORMAT"
2539                );
2540            }
2541        }
2542    } else {
2543        Ok(KeyEnvelope::None)
2544    }
2545}
2546
2547/// Gets the key envelope for a given key encoding when no name for the key has
2548/// been requested by the user.
2549fn get_unnamed_key_envelope(
2550    key: Option<&DataEncoding<ReferencedConnection>>,
2551) -> Result<KeyEnvelope, PlanError> {
2552    // If the key is requested but comes from an unnamed type then it gets the name "key"
2553    //
2554    // Otherwise it gets the names of the columns in the type
2555    let is_composite = match key {
2556        Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2557        Some(
2558            DataEncoding::Avro(_)
2559            | DataEncoding::Csv(_)
2560            | DataEncoding::Protobuf(_)
2561            | DataEncoding::Regex { .. },
2562        ) => true,
2563        None => false,
2564    };
2565
2566    if is_composite {
2567        Ok(KeyEnvelope::Flattened)
2568    } else {
2569        Ok(KeyEnvelope::Named("key".to_string()))
2570    }
2571}
2572
2573pub fn describe_create_view(
2574    _: &StatementContext,
2575    _: CreateViewStatement<Aug>,
2576) -> Result<StatementDesc, PlanError> {
2577    Ok(StatementDesc::new(None))
2578}
2579
2580pub fn plan_view(
2581    scx: &StatementContext,
2582    def: &mut ViewDefinition<Aug>,
2583    temporary: bool,
2584) -> Result<(QualifiedItemName, View), PlanError> {
2585    let create_sql = normalize::create_statement(
2586        scx,
2587        Statement::CreateView(CreateViewStatement {
2588            if_exists: IfExistsBehavior::Error,
2589            temporary,
2590            definition: def.clone(),
2591        }),
2592    )?;
2593
2594    let ViewDefinition {
2595        name,
2596        columns,
2597        query,
2598    } = def;
2599
2600    let query::PlannedRootQuery {
2601        expr,
2602        mut desc,
2603        finishing,
2604        scope: _,
2605    } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2606    // We get back a trivial finishing, because `plan_root_query` applies the given finishing.
2607    // Note: Earlier, we were thinking to maybe persist the finishing information with the view
2608    // here to help with database-issues#236. However, in the meantime, there might be a better
2609    // approach to solve database-issues#236:
2610    // https://github.com/MaterializeInc/database-issues/issues/236#issuecomment-1688293709
2611    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2612        &finishing,
2613        expr.arity()
2614    ));
2615    if expr.contains_parameters()? {
2616        return Err(PlanError::ParameterNotAllowed("views".to_string()));
2617    }
2618
2619    let dependencies = expr
2620        .depends_on()
2621        .into_iter()
2622        .map(|gid| scx.catalog.resolve_item_id(&gid))
2623        .collect();
2624
2625    let name = if temporary {
2626        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2627    } else {
2628        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2629    };
2630
2631    plan_utils::maybe_rename_columns(
2632        format!("view {}", scx.catalog.resolve_full_name(&name)),
2633        &mut desc,
2634        columns,
2635    )?;
2636    let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2637
2638    if let Some(dup) = names.iter().duplicates().next() {
2639        sql_bail!("column {} specified more than once", dup.quoted());
2640    }
2641
2642    let view = View {
2643        create_sql,
2644        expr,
2645        dependencies,
2646        column_names: names,
2647        temporary,
2648    };
2649
2650    Ok((name, view))
2651}
2652
2653pub fn plan_create_view(
2654    scx: &StatementContext,
2655    mut stmt: CreateViewStatement<Aug>,
2656) -> Result<Plan, PlanError> {
2657    let CreateViewStatement {
2658        temporary,
2659        if_exists,
2660        definition,
2661    } = &mut stmt;
2662    let (name, view) = plan_view(scx, definition, *temporary)?;
2663
2664    // Override the statement-level IfExistsBehavior with Skip if this is
2665    // explicitly requested in the PlanContext (the default is `false`).
2666    let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2667
2668    let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2669        let if_exists = true;
2670        let cascade = false;
2671        let maybe_item_to_drop = plan_drop_item(
2672            scx,
2673            ObjectType::View,
2674            if_exists,
2675            definition.name.clone(),
2676            cascade,
2677        )?;
2678
2679        // Check if the new View depends on the item that we would be replacing.
2680        if let Some(id) = maybe_item_to_drop {
2681            let dependencies = view.expr.depends_on();
2682            let invalid_drop = scx
2683                .get_item(&id)
2684                .global_ids()
2685                .any(|gid| dependencies.contains(&gid));
2686            if invalid_drop {
2687                let item = scx.catalog.get_item(&id);
2688                sql_bail!(
2689                    "cannot replace view {0}: depended upon by new {0} definition",
2690                    scx.catalog.resolve_full_name(item.name())
2691                );
2692            }
2693
2694            Some(id)
2695        } else {
2696            None
2697        }
2698    } else {
2699        None
2700    };
2701    let drop_ids = replace
2702        .map(|id| {
2703            scx.catalog
2704                .item_dependents(id)
2705                .into_iter()
2706                .map(|id| id.unwrap_item_id())
2707                .collect()
2708        })
2709        .unwrap_or_default();
2710
2711    validate_view_dependencies(scx, &view.dependencies.0)?;
2712
2713    // Check for an object in the catalog with this same name
2714    let full_name = scx.catalog.resolve_full_name(&name);
2715    let partial_name = PartialItemName::from(full_name.clone());
2716    // For PostgreSQL compatibility, we need to prevent creating views when
2717    // there is an existing object *or* type of the same name.
2718    if let (Ok(item), IfExistsBehavior::Error, false) = (
2719        scx.catalog.resolve_item_or_type(&partial_name),
2720        *if_exists,
2721        ignore_if_exists_errors,
2722    ) {
2723        return Err(PlanError::ItemAlreadyExists {
2724            name: full_name.to_string(),
2725            item_type: item.item_type(),
2726        });
2727    }
2728
2729    Ok(Plan::CreateView(CreateViewPlan {
2730        name,
2731        view,
2732        replace,
2733        drop_ids,
2734        if_not_exists: *if_exists == IfExistsBehavior::Skip,
2735        ambiguous_columns: *scx.ambiguous_columns.borrow(),
2736    }))
2737}
2738
2739/// Validate the dependencies of a (materialized) view.
2740fn validate_view_dependencies(
2741    scx: &StatementContext,
2742    dependencies: &BTreeSet<CatalogItemId>,
2743) -> Result<(), PlanError> {
2744    for id in dependencies {
2745        let item = scx.catalog.get_item(id);
2746        if item.replacement_target().is_some() {
2747            let name = scx.catalog.minimal_qualification(item.name());
2748            return Err(PlanError::InvalidDependency {
2749                name: name.to_string(),
2750                item_type: format!("replacement {}", item.item_type()),
2751            });
2752        }
2753    }
2754
2755    Ok(())
2756}
2757
2758pub fn describe_create_materialized_view(
2759    _: &StatementContext,
2760    _: CreateMaterializedViewStatement<Aug>,
2761) -> Result<StatementDesc, PlanError> {
2762    Ok(StatementDesc::new(None))
2763}
2764
2765pub fn describe_create_network_policy(
2766    _: &StatementContext,
2767    _: CreateNetworkPolicyStatement<Aug>,
2768) -> Result<StatementDesc, PlanError> {
2769    Ok(StatementDesc::new(None))
2770}
2771
2772pub fn describe_alter_network_policy(
2773    _: &StatementContext,
2774    _: AlterNetworkPolicyStatement<Aug>,
2775) -> Result<StatementDesc, PlanError> {
2776    Ok(StatementDesc::new(None))
2777}
2778
2779pub fn plan_create_materialized_view(
2780    scx: &StatementContext,
2781    mut stmt: CreateMaterializedViewStatement<Aug>,
2782) -> Result<Plan, PlanError> {
2783    let cluster_id =
2784        crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2785    stmt.in_cluster = Some(ResolvedClusterName {
2786        id: cluster_id,
2787        print_name: None,
2788    });
2789
2790    let target_replica = match &stmt.in_cluster_replica {
2791        Some(replica_name) => {
2792            scx.require_feature_flag(&ENABLE_REPLICA_TARGETED_MATERIALIZED_VIEWS)?;
2793
2794            let cluster = scx.catalog.get_cluster(cluster_id);
2795            let replica_id = cluster
2796                .replica_ids()
2797                .get(replica_name.as_str())
2798                .copied()
2799                .ok_or_else(|| {
2800                    CatalogError::UnknownClusterReplica(replica_name.as_str().to_string())
2801                })?;
2802            Some(replica_id)
2803        }
2804        None => None,
2805    };
2806
2807    let create_sql =
2808        normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2809
2810    let partial_name = normalize::unresolved_item_name(stmt.name)?;
2811    let name = scx.allocate_qualified_name(partial_name.clone())?;
2812
2813    let query::PlannedRootQuery {
2814        expr,
2815        mut desc,
2816        finishing,
2817        scope: _,
2818    } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2819    // We get back a trivial finishing, see comment in `plan_view`.
2820    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2821        &finishing,
2822        expr.arity()
2823    ));
2824    if expr.contains_parameters()? {
2825        return Err(PlanError::ParameterNotAllowed(
2826            "materialized views".to_string(),
2827        ));
2828    }
2829
2830    plan_utils::maybe_rename_columns(
2831        format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2832        &mut desc,
2833        &stmt.columns,
2834    )?;
2835    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2836
2837    let MaterializedViewOptionExtracted {
2838        assert_not_null,
2839        partition_by,
2840        retain_history,
2841        refresh,
2842        seen: _,
2843    }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2844
2845    if let Some(partition_by) = partition_by {
2846        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2847        check_partition_by(&desc, partition_by)?;
2848    }
2849
2850    let refresh_schedule = {
2851        let mut refresh_schedule = RefreshSchedule::default();
2852        let mut on_commits_seen = 0;
2853        for refresh_option_value in refresh {
2854            if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2855                scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2856            }
2857            match refresh_option_value {
2858                RefreshOptionValue::OnCommit => {
2859                    on_commits_seen += 1;
2860                }
2861                RefreshOptionValue::AtCreation => {
2862                    soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2863                    bail_internal!("REFRESH AT CREATION should have been purified away")
2864                }
2865                RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2866                    transform_ast::transform(scx, &mut time)?; // Desugar the expression
2867                    let ecx = &ExprContext {
2868                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2869                        name: "REFRESH AT",
2870                        scope: &Scope::empty(),
2871                        relation_type: &SqlRelationType::empty(),
2872                        allow_aggregates: false,
2873                        allow_subqueries: false,
2874                        allow_parameters: false,
2875                        allow_windows: false,
2876                    };
2877                    let hir = plan_expr(ecx, &time)?.cast_to(
2878                        ecx,
2879                        CastContext::Assignment,
2880                        &SqlScalarType::MzTimestamp,
2881                    )?;
2882                    // (mz_now was purified away to a literal earlier)
2883                    let timestamp = hir
2884                        .into_literal_mz_timestamp()
2885                        .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2886                    refresh_schedule.ats.push(timestamp);
2887                }
2888                RefreshOptionValue::Every(RefreshEveryOptionValue {
2889                    interval,
2890                    aligned_to,
2891                }) => {
2892                    let interval = Interval::try_from_value(Value::Interval(interval))?;
2893                    if interval.as_microseconds() <= 0 {
2894                        sql_bail!("REFRESH interval must be positive; got: {}", interval);
2895                    }
2896                    if interval.months != 0 {
2897                        // This limitation is because we want Intervals to be cleanly convertable
2898                        // to a unix epoch timestamp difference. When the interval involves months, then
2899                        // this is not true anymore, because months have variable lengths.
2900                        // See `Timestamp::round_up`.
2901                        sql_bail!("REFRESH interval must not involve units larger than days");
2902                    }
2903                    let interval = interval.duration()?;
2904                    // `Interval::from_duration` (needed to unparse the interval, e.g. for
2905                    // `mz_materialized_view_refresh_strategies`) requires the micros to fit
2906                    // in an i64, which is a tighter bound than `Duration::duration` enforces.
2907                    // Reject too-large intervals here to avoid panicking later.
2908                    if u64::try_from(interval.as_millis()).is_err()
2909                        || Interval::from_duration(&interval).is_err()
2910                    {
2911                        sql_bail!("REFRESH interval too large");
2912                    }
2913                    if interval.as_micros() < 1000 {
2914                        sql_bail!("REFRESH interval must be at least 1 ms")
2915                    }
2916
2917                    let mut aligned_to = match aligned_to {
2918                        Some(aligned_to) => aligned_to,
2919                        None => {
2920                            soft_panic_or_log!(
2921                                "ALIGNED TO should have been filled in by purification"
2922                            );
2923                            sql_bail!(
2924                                "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2925                            )
2926                        }
2927                    };
2928
2929                    // Desugar the `aligned_to` expression
2930                    transform_ast::transform(scx, &mut aligned_to)?;
2931
2932                    let ecx = &ExprContext {
2933                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2934                        name: "REFRESH EVERY ... ALIGNED TO",
2935                        scope: &Scope::empty(),
2936                        relation_type: &SqlRelationType::empty(),
2937                        allow_aggregates: false,
2938                        allow_subqueries: false,
2939                        allow_parameters: false,
2940                        allow_windows: false,
2941                    };
2942                    let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2943                        ecx,
2944                        CastContext::Assignment,
2945                        &SqlScalarType::MzTimestamp,
2946                    )?;
2947                    // (mz_now was purified away to a literal earlier)
2948                    let aligned_to_const = aligned_to_hir
2949                        .into_literal_mz_timestamp()
2950                        .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2951
2952                    refresh_schedule.everies.push(RefreshEvery {
2953                        interval,
2954                        aligned_to: aligned_to_const,
2955                    });
2956                }
2957            }
2958        }
2959
2960        if on_commits_seen > 1 {
2961            sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2962        }
2963        if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2964            sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2965        }
2966
2967        if refresh_schedule == RefreshSchedule::default() {
2968            None
2969        } else {
2970            Some(refresh_schedule)
2971        }
2972    };
2973
2974    let as_of = stmt.as_of.map(Timestamp::from);
2975    let compaction_window = plan_retain_history_option(scx, retain_history)?;
2976    let mut non_null_assertions = assert_not_null
2977        .into_iter()
2978        .map(normalize::column_name)
2979        .map(|assertion_name| {
2980            column_names
2981                .iter()
2982                .position(|col| col == &assertion_name)
2983                .ok_or_else(|| {
2984                    sql_err!(
2985                        "column {} in ASSERT NOT NULL option not found",
2986                        assertion_name.quoted()
2987                    )
2988                })
2989        })
2990        .collect::<Result<Vec<_>, _>>()?;
2991    non_null_assertions.sort();
2992    if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2993        let dup = &column_names[*dup];
2994        sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2995    }
2996
2997    if let Some(dup) = column_names.iter().duplicates().next() {
2998        sql_bail!("column {} specified more than once", dup.quoted());
2999    }
3000
3001    // Override the statement-level IfExistsBehavior with Skip if this is
3002    // explicitly requested in the PlanContext (the default is `false`).
3003    let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
3004        Ok(true) => IfExistsBehavior::Skip,
3005        _ => stmt.if_exists,
3006    };
3007
3008    let mut replace = None;
3009    let mut if_not_exists = false;
3010    match if_exists {
3011        IfExistsBehavior::Replace => {
3012            let if_exists = true;
3013            let cascade = false;
3014            let replace_id = plan_drop_item(
3015                scx,
3016                ObjectType::MaterializedView,
3017                if_exists,
3018                partial_name.clone().into(),
3019                cascade,
3020            )?;
3021
3022            // Check if the new Materialized View depends on the item that we would be replacing.
3023            if let Some(id) = replace_id {
3024                let dependencies = expr.depends_on();
3025                let invalid_drop = scx
3026                    .get_item(&id)
3027                    .global_ids()
3028                    .any(|gid| dependencies.contains(&gid));
3029                if invalid_drop {
3030                    let item = scx.catalog.get_item(&id);
3031                    sql_bail!(
3032                        "cannot replace materialized view {0}: depended upon by new {0} definition",
3033                        scx.catalog.resolve_full_name(item.name())
3034                    );
3035                }
3036                replace = Some(id);
3037            }
3038        }
3039        IfExistsBehavior::Skip => if_not_exists = true,
3040        IfExistsBehavior::Error => (),
3041    }
3042    let drop_ids = replace
3043        .map(|id| {
3044            scx.catalog
3045                .item_dependents(id)
3046                .into_iter()
3047                .map(|id| id.unwrap_item_id())
3048                .collect()
3049        })
3050        .unwrap_or_default();
3051    let mut dependencies: BTreeSet<_> = expr
3052        .depends_on()
3053        .into_iter()
3054        .map(|gid| scx.catalog.resolve_item_id(&gid))
3055        .collect();
3056
3057    // Validate the replacement target, if one is given.
3058    let mut replacement_target = None;
3059    if let Some(target_name) = &stmt.replacement_for {
3060        scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
3061
3062        let target = scx.get_item_by_resolved_name(target_name)?;
3063        if target.item_type() != CatalogItemType::MaterializedView {
3064            return Err(PlanError::InvalidReplacement {
3065                item_type: target.item_type(),
3066                item_name: scx.catalog.minimal_qualification(target.name()),
3067                replacement_type: CatalogItemType::MaterializedView,
3068                replacement_name: partial_name,
3069            });
3070        }
3071        if target.id().is_system() {
3072            sql_bail!(
3073                "cannot replace {} because it is required by the database system",
3074                scx.catalog.minimal_qualification(target.name()),
3075            );
3076        }
3077
3078        // Check for dependency cycles.
3079        for dependent in scx.catalog.item_dependents(target.id()) {
3080            if let ObjectId::Item(id) = dependent
3081                && dependencies.contains(&id)
3082            {
3083                sql_bail!(
3084                    "replacement would cause {} to depend on itself",
3085                    scx.catalog.minimal_qualification(target.name()),
3086                );
3087            }
3088        }
3089
3090        dependencies.insert(target.id());
3091
3092        for use_id in target.used_by() {
3093            let use_item = scx.get_item(use_id);
3094            if use_item.replacement_target() == Some(target.id()) {
3095                sql_bail!(
3096                    "cannot replace {} because it already has a replacement: {}",
3097                    scx.catalog.minimal_qualification(target.name()),
3098                    scx.catalog.minimal_qualification(use_item.name()),
3099                );
3100            }
3101        }
3102
3103        replacement_target = Some(target.id());
3104    }
3105
3106    validate_view_dependencies(scx, &dependencies)?;
3107
3108    // Check for an object in the catalog with this same name
3109    let full_name = scx.catalog.resolve_full_name(&name);
3110    let partial_name = PartialItemName::from(full_name.clone());
3111    // For PostgreSQL compatibility, we need to prevent creating materialized
3112    // views when there is an existing object *or* type of the same name.
3113    if let (IfExistsBehavior::Error, Ok(item)) =
3114        (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
3115    {
3116        return Err(PlanError::ItemAlreadyExists {
3117            name: full_name.to_string(),
3118            item_type: item.item_type(),
3119        });
3120    }
3121
3122    Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
3123        name,
3124        materialized_view: MaterializedView {
3125            create_sql,
3126            expr,
3127            dependencies: DependencyIds(dependencies),
3128            column_names,
3129            replacement_target,
3130            cluster_id,
3131            target_replica,
3132            non_null_assertions,
3133            compaction_window,
3134            refresh_schedule,
3135            as_of,
3136        },
3137        replace,
3138        drop_ids,
3139        if_not_exists,
3140        ambiguous_columns: *scx.ambiguous_columns.borrow(),
3141    }))
3142}
3143
3144generate_extracted_config!(
3145    MaterializedViewOption,
3146    (AssertNotNull, Ident, AllowMultiple),
3147    (PartitionBy, Vec<Ident>),
3148    (RetainHistory, OptionalDuration),
3149    (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
3150);
3151
3152pub fn describe_create_sink(
3153    _: &StatementContext,
3154    _: CreateSinkStatement<Aug>,
3155) -> Result<StatementDesc, PlanError> {
3156    Ok(StatementDesc::new(None))
3157}
3158
3159generate_extracted_config!(
3160    CreateSinkOption,
3161    (Snapshot, bool),
3162    (PartitionStrategy, String),
3163    (Version, u64),
3164    (CommitInterval, Duration)
3165);
3166
3167pub fn plan_create_sink(
3168    scx: &StatementContext,
3169    stmt: CreateSinkStatement<Aug>,
3170) -> Result<Plan, PlanError> {
3171    // Check for an object in the catalog with this same name
3172    let Some(name) = stmt.name.clone() else {
3173        return Err(PlanError::MissingName(CatalogItemType::Sink));
3174    };
3175    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3176    let full_name = scx.catalog.resolve_full_name(&name);
3177    let partial_name = PartialItemName::from(full_name.clone());
3178    if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3179        return Err(PlanError::ItemAlreadyExists {
3180            name: full_name.to_string(),
3181            item_type: item.item_type(),
3182        });
3183    }
3184
3185    plan_sink(scx, stmt)
3186}
3187
3188/// This function will plan a sink as if it does not exist in the catalog. This is so the planning
3189/// logic is reused by both CREATE SINK and ALTER SINK planning. It is the responsibility of the
3190/// callers (plan_create_sink and plan_alter_sink) to check for name collisions if this is
3191/// important.
3192fn plan_sink(
3193    scx: &StatementContext,
3194    mut stmt: CreateSinkStatement<Aug>,
3195) -> Result<Plan, PlanError> {
3196    let CreateSinkStatement {
3197        name,
3198        in_cluster: _,
3199        from,
3200        connection,
3201        format,
3202        envelope,
3203        mode,
3204        if_not_exists,
3205        with_options,
3206    } = stmt.clone();
3207
3208    let Some(name) = name else {
3209        return Err(PlanError::MissingName(CatalogItemType::Sink));
3210    };
3211    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3212
3213    let envelope = match (&connection, envelope, mode) {
3214        // Kafka sinks use ENVELOPE
3215        (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Upsert), None) => {
3216            SinkEnvelope::Upsert
3217        }
3218        (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Debezium), None) => {
3219            SinkEnvelope::Debezium
3220        }
3221        (CreateSinkConnection::Kafka { .. }, None, None) => {
3222            sql_bail!("ENVELOPE clause is required")
3223        }
3224        (CreateSinkConnection::Kafka { .. }, _, Some(_)) => {
3225            sql_bail!("MODE is not supported for Kafka sinks, use ENVELOPE instead")
3226        }
3227        // Iceberg sinks use MODE
3228        (CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Upsert)) => {
3229            SinkEnvelope::Upsert
3230        }
3231        (CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Append)) => {
3232            SinkEnvelope::Append
3233        }
3234        (CreateSinkConnection::Iceberg { .. }, None, None) => {
3235            sql_bail!("MODE clause is required")
3236        }
3237        (CreateSinkConnection::Iceberg { .. }, Some(_), _) => {
3238            sql_bail!("ENVELOPE is not supported for Iceberg sinks, use MODE instead")
3239        }
3240    };
3241
3242    let from_name = &from;
3243    let from = scx.get_item_by_resolved_name(&from)?;
3244
3245    {
3246        use CatalogItemType::*;
3247        match from.item_type() {
3248            Table | Source | MaterializedView => {
3249                if from.replacement_target().is_some() {
3250                    let name = scx.catalog.minimal_qualification(from.name());
3251                    return Err(PlanError::InvalidSinkFrom {
3252                        name: name.to_string(),
3253                        item_type: format!("replacement {}", from.item_type()),
3254                    });
3255                }
3256            }
3257            Sink | View | Index | Type | Func | Secret | Connection => {
3258                let name = scx.catalog.minimal_qualification(from.name());
3259                return Err(PlanError::InvalidSinkFrom {
3260                    name: name.to_string(),
3261                    item_type: from.item_type().to_string(),
3262                });
3263            }
3264        }
3265    }
3266
3267    if from.id().is_system() {
3268        bail_unsupported!("creating a sink directly on a catalog object");
3269    }
3270
3271    let desc = from
3272        .relation_desc()
3273        .ok_or_else(|| sql_err!("item does not have a relation description"))?;
3274    let key_indices = match &connection {
3275        CreateSinkConnection::Kafka { key: Some(key), .. }
3276        | CreateSinkConnection::Iceberg { key: Some(key), .. } => {
3277            let key_columns = key
3278                .key_columns
3279                .clone()
3280                .into_iter()
3281                .map(normalize::column_name)
3282                .collect::<Vec<_>>();
3283            let mut uniq = BTreeSet::new();
3284            for col in key_columns.iter() {
3285                if !uniq.insert(col) {
3286                    sql_bail!("duplicate column referenced in KEY: {}", col);
3287                }
3288            }
3289            let indices = key_columns
3290                .iter()
3291                .map(|col| {
3292                    let name_idx =
3293                        desc.get_by_name(col)
3294                            .map(|(idx, _type)| idx)
3295                            .ok_or_else(|| {
3296                                sql_err!("column referenced in KEY does not exist: {}", col)
3297                            })?;
3298                    if desc.get_unambiguous_name(name_idx).is_none() {
3299                        sql_bail!("column referenced in KEY is ambiguous: {}", col);
3300                    }
3301                    Ok(name_idx)
3302                })
3303                .collect::<Result<Vec<_>, _>>()?;
3304
3305            // Iceberg equality deletes require primitive, non-float key columns.
3306            // Use an allow-list so that new types are rejected by default.
3307            if matches!(&connection, CreateSinkConnection::Iceberg { .. }) {
3308                let cols: Vec<_> = desc.iter().collect();
3309                for &idx in &indices {
3310                    let (col_name, col_type) = cols[idx];
3311                    let scalar = &col_type.scalar_type;
3312                    let is_valid = matches!(
3313                        scalar,
3314                        // integers
3315                        SqlScalarType::Bool
3316                            | SqlScalarType::Int16
3317                            | SqlScalarType::Int32
3318                            | SqlScalarType::Int64
3319                            | SqlScalarType::UInt16
3320                            | SqlScalarType::UInt32
3321                            | SqlScalarType::UInt64
3322                            // decimal / numeric
3323                            | SqlScalarType::Numeric { .. }
3324                            // date / time
3325                            | SqlScalarType::Date
3326                            | SqlScalarType::Time
3327                            | SqlScalarType::Timestamp { .. }
3328                            | SqlScalarType::TimestampTz { .. }
3329                            | SqlScalarType::Interval
3330                            | SqlScalarType::MzTimestamp
3331                            // string-like
3332                            | SqlScalarType::String
3333                            | SqlScalarType::Char { .. }
3334                            | SqlScalarType::VarChar { .. }
3335                            | SqlScalarType::PgLegacyChar
3336                            | SqlScalarType::PgLegacyName
3337                            | SqlScalarType::Bytes
3338                            | SqlScalarType::Jsonb
3339                            // identifiers
3340                            | SqlScalarType::Uuid
3341                            | SqlScalarType::Oid
3342                            | SqlScalarType::RegProc
3343                            | SqlScalarType::RegType
3344                            | SqlScalarType::RegClass
3345                            | SqlScalarType::MzAclItem
3346                            | SqlScalarType::AclItem
3347                            | SqlScalarType::Int2Vector
3348                    );
3349                    if !is_valid {
3350                        return Err(PlanError::IcebergSinkUnsupportedKeyType {
3351                            column: col_name.to_string(),
3352                            column_type: format!("{:?}", scalar),
3353                        });
3354                    }
3355                }
3356            }
3357
3358            let is_valid_key = desc
3359                .typ()
3360                .keys
3361                .iter()
3362                .any(|key_columns| key_columns.iter().all(|column| indices.contains(column)));
3363
3364            if !is_valid_key && envelope == SinkEnvelope::Upsert {
3365                if key.not_enforced {
3366                    scx.catalog
3367                        .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3368                            key: key_columns.clone(),
3369                            name: name.item.clone(),
3370                        })
3371                } else {
3372                    return Err(PlanError::UpsertSinkWithInvalidKey {
3373                        name: from_name.full_name_str(),
3374                        desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3375                        valid_keys: desc
3376                            .typ()
3377                            .keys
3378                            .iter()
3379                            .map(|key| {
3380                                key.iter()
3381                                    .map(|col| desc.get_name(*col).as_str().into())
3382                                    .collect()
3383                            })
3384                            .collect(),
3385                    });
3386                }
3387            }
3388            Some(indices)
3389        }
3390        CreateSinkConnection::Kafka { key: None, .. }
3391        | CreateSinkConnection::Iceberg { key: None, .. } => None,
3392    };
3393
3394    if key_indices.is_some() && envelope == SinkEnvelope::Append {
3395        sql_bail!("KEY is not supported for MODE APPEND Iceberg sinks");
3396    }
3397
3398    // Reject input columns that clash with the columns MODE APPEND adds to the Iceberg table.
3399    if envelope == SinkEnvelope::Append {
3400        if let CreateSinkConnection::Iceberg { .. } = &connection {
3401            use mz_storage_types::sinks::{
3402                ICEBERG_APPEND_DIFF_COLUMN, ICEBERG_APPEND_TIMESTAMP_COLUMN,
3403            };
3404            for (col_name, _) in desc.iter() {
3405                if col_name.as_str() == ICEBERG_APPEND_DIFF_COLUMN
3406                    || col_name.as_str() == ICEBERG_APPEND_TIMESTAMP_COLUMN
3407                {
3408                    sql_bail!(
3409                        "column {} conflicts with the system column that MODE APPEND \
3410                         adds to the Iceberg table",
3411                        col_name.quoted()
3412                    );
3413                }
3414            }
3415        }
3416    }
3417
3418    let headers_index = match &connection {
3419        CreateSinkConnection::Kafka {
3420            headers: Some(headers),
3421            ..
3422        } => {
3423            scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3424
3425            match envelope {
3426                SinkEnvelope::Upsert | SinkEnvelope::Append => (),
3427                SinkEnvelope::Debezium => {
3428                    sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3429                }
3430            };
3431
3432            let headers = normalize::column_name(headers.clone());
3433            let (idx, ty) = desc
3434                .get_by_name(&headers)
3435                .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3436
3437            if desc.get_unambiguous_name(idx).is_none() {
3438                sql_bail!("HEADERS column ({}) is ambiguous", headers);
3439            }
3440
3441            match &ty.scalar_type {
3442                SqlScalarType::Map { value_type, .. }
3443                    if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3444                _ => sql_bail!(
3445                    "HEADERS column must have type map[text => text] or map[text => bytea]"
3446                ),
3447            }
3448
3449            Some(idx)
3450        }
3451        _ => None,
3452    };
3453
3454    // pick the first valid natural relation key, if any
3455    let relation_key_indices = desc.typ().keys.get(0).cloned();
3456
3457    let key_desc_and_indices = key_indices.map(|key_indices| {
3458        let cols = desc
3459            .iter()
3460            .map(|(name, ty)| (name.clone(), ty.clone()))
3461            .collect::<Vec<_>>();
3462        let (names, types): (Vec<_>, Vec<_>) =
3463            key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3464        let typ = SqlRelationType::new(types);
3465        (RelationDesc::new(typ, names), key_indices)
3466    });
3467
3468    if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3469        return Err(PlanError::UpsertSinkWithoutKey);
3470    }
3471
3472    let CreateSinkOptionExtracted {
3473        snapshot,
3474        version,
3475        partition_strategy: _,
3476        seen: _,
3477        commit_interval,
3478    } = with_options.try_into()?;
3479
3480    let connection_builder = match connection {
3481        CreateSinkConnection::Kafka {
3482            connection,
3483            options,
3484            ..
3485        } => kafka_sink_builder(
3486            scx,
3487            connection,
3488            options,
3489            format,
3490            relation_key_indices,
3491            key_desc_and_indices,
3492            headers_index,
3493            desc.into_owned(),
3494            envelope,
3495            from.id(),
3496            commit_interval,
3497        )?,
3498        CreateSinkConnection::Iceberg {
3499            catalog_connection,
3500            aws_connection,
3501            options,
3502            ..
3503        } => iceberg_sink_builder(
3504            scx,
3505            catalog_connection,
3506            aws_connection,
3507            options,
3508            relation_key_indices,
3509            key_desc_and_indices,
3510            commit_interval,
3511            &desc,
3512        )?,
3513    };
3514
3515    // WITH SNAPSHOT defaults to true
3516    let with_snapshot = snapshot.unwrap_or(true);
3517    // VERSION defaults to 0
3518    let version = version.unwrap_or(0);
3519
3520    // We will rewrite the cluster if one is not provided, so we must use the
3521    // `in_cluster` value we plan to normalize when we canonicalize the create
3522    // statement.
3523    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3524    let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3525
3526    Ok(Plan::CreateSink(CreateSinkPlan {
3527        name,
3528        sink: Sink {
3529            create_sql,
3530            from: from.global_id(),
3531            connection: connection_builder,
3532            envelope,
3533            version,
3534            commit_interval,
3535        },
3536        with_snapshot,
3537        if_not_exists,
3538        in_cluster: in_cluster.id(),
3539    }))
3540}
3541
3542fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3543    let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3544
3545    let existing_keys = desc
3546        .typ()
3547        .keys
3548        .iter()
3549        .map(|key_columns| {
3550            key_columns
3551                .iter()
3552                .map(|col| desc.get_name(*col).as_str())
3553                .join(", ")
3554        })
3555        .join(", ");
3556
3557    sql_err!(
3558        "Key constraint ({}) conflicts with existing key ({})",
3559        user_keys,
3560        existing_keys
3561    )
3562}
3563
3564/// Creating this by hand instead of using generate_extracted_config! macro
3565/// because the macro doesn't support parameterized enums. See <https://github.com/MaterializeInc/database-issues/issues/6698>
3566#[derive(Debug, Default, PartialEq, Clone)]
3567pub struct CsrConfigOptionExtracted {
3568    seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3569    pub(crate) avro_key_fullname: Option<String>,
3570    pub(crate) avro_value_fullname: Option<String>,
3571    pub(crate) null_defaults: bool,
3572    pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3573    pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3574    pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3575    pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3576}
3577
3578impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3579    type Error = crate::plan::PlanError;
3580    fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3581        let mut extracted = CsrConfigOptionExtracted::default();
3582        let mut common_doc_comments = BTreeMap::new();
3583        for option in v {
3584            if !extracted.seen.insert(option.name.clone()) {
3585                return Err(PlanError::Unstructured({
3586                    format!("{} specified more than once", option.name)
3587                }));
3588            }
3589            let option_name = option.name.clone();
3590            let option_name_str = option_name.to_ast_string_simple();
3591            let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3592                option_name: option_name.to_ast_string_simple(),
3593                err: e.into(),
3594            };
3595            let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3596                val.map(|s| match s {
3597                    WithOptionValue::Value(Value::String(s)) => {
3598                        mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3599                    }
3600                    _ => Err("must be a string".to_string()),
3601                })
3602                .transpose()
3603                .map_err(PlanError::Unstructured)
3604                .map_err(better_error)
3605            };
3606            match option.name {
3607                CsrConfigOptionName::AvroKeyFullname => {
3608                    extracted.avro_key_fullname =
3609                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3610                }
3611                CsrConfigOptionName::AvroValueFullname => {
3612                    extracted.avro_value_fullname =
3613                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3614                }
3615                CsrConfigOptionName::NullDefaults => {
3616                    extracted.null_defaults =
3617                        <bool>::try_from_value(option.value).map_err(better_error)?;
3618                }
3619                CsrConfigOptionName::AvroDocOn(doc_on) => {
3620                    let value = String::try_from_value(option.value.ok_or_else(|| {
3621                        PlanError::InvalidOptionValue {
3622                            option_name: option_name_str,
3623                            err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3624                        }
3625                    })?)
3626                    .map_err(better_error)?;
3627                    let key = match doc_on.identifier {
3628                        DocOnIdentifier::Column(ast::ColumnName {
3629                            relation: ResolvedItemName::Item { id, .. },
3630                            column: ResolvedColumnReference::Column { name, index: _ },
3631                        }) => DocTarget::Field {
3632                            object_id: id,
3633                            column_name: name,
3634                        },
3635                        DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3636                            DocTarget::Type(id)
3637                        }
3638                        _ => sql_bail!("invalid DOC ON identifier"),
3639                    };
3640
3641                    match doc_on.for_schema {
3642                        DocOnSchema::KeyOnly => {
3643                            extracted.key_doc_options.insert(key, value);
3644                        }
3645                        DocOnSchema::ValueOnly => {
3646                            extracted.value_doc_options.insert(key, value);
3647                        }
3648                        DocOnSchema::All => {
3649                            common_doc_comments.insert(key, value);
3650                        }
3651                    }
3652                }
3653                CsrConfigOptionName::KeyCompatibilityLevel => {
3654                    extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3655                }
3656                CsrConfigOptionName::ValueCompatibilityLevel => {
3657                    extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3658                }
3659            }
3660        }
3661
3662        for (key, value) in common_doc_comments {
3663            if !extracted.key_doc_options.contains_key(&key) {
3664                extracted.key_doc_options.insert(key.clone(), value.clone());
3665            }
3666            if !extracted.value_doc_options.contains_key(&key) {
3667                extracted.value_doc_options.insert(key, value);
3668            }
3669        }
3670        Ok(extracted)
3671    }
3672}
3673
3674fn iceberg_sink_builder(
3675    scx: &StatementContext,
3676    catalog_connection: ResolvedItemName,
3677    storage_connection: Option<ResolvedItemName>,
3678    options: Vec<IcebergSinkConfigOption<Aug>>,
3679    relation_key_indices: Option<Vec<usize>>,
3680    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3681    commit_interval: Option<Duration>,
3682    desc: &RelationDesc,
3683) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3684    // Reject types that arrow-rs's parquet writer cannot handle, before
3685    // sink creation. Pass the iceberg overrides so types iceberg remaps
3686    // (e.g. interval -> string) don't trip the check.
3687    ArrowBuilder::validate_desc_for_parquet(desc, iceberg_type_overrides)
3688        .map_err(|e| sql_err!("{}", e))?;
3689
3690    let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
3691    let catalog_connection_id = catalog_connection_item.id();
3692    if !matches!(
3693        catalog_connection_item.connection()?,
3694        Connection::IcebergCatalog(_)
3695    ) {
3696        sql_bail!(
3697            "{} is not an iceberg catalog connection",
3698            scx.catalog
3699                .resolve_full_name(catalog_connection_item.name())
3700                .to_string()
3701                .quoted()
3702        );
3703    };
3704
3705    let storage_connection_item = storage_connection
3706        .map(|c| scx.get_item_by_resolved_name(&c))
3707        .transpose()?;
3708    let storage_connection_id = storage_connection_item.as_ref().map(|c| c.id());
3709    if let Some(c) = &storage_connection_item
3710        && !matches!(c.connection()?, Connection::Aws(_))
3711    {
3712        sql_bail!(
3713            "{} is not an AWS connection",
3714            scx.catalog.resolve_full_name(c.name()).to_string().quoted()
3715        );
3716    }
3717
3718    let IcebergSinkConfigOptionExtracted {
3719        table,
3720        namespace,
3721        seen: _,
3722    }: IcebergSinkConfigOptionExtracted = options.try_into()?;
3723
3724    let Some(table) = table else {
3725        sql_bail!("Iceberg sink must specify TABLE");
3726    };
3727    let Some(namespace) = namespace else {
3728        sql_bail!("Iceberg sink must specify NAMESPACE");
3729    };
3730    if commit_interval.is_none() {
3731        sql_bail!("Iceberg sink must specify COMMIT INTERVAL");
3732    }
3733
3734    Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
3735        catalog_connection_id,
3736        catalog_connection: catalog_connection_id,
3737        storage_connection_id,
3738        storage_connection: storage_connection_id,
3739        table,
3740        namespace,
3741        relation_key_indices,
3742        key_desc_and_indices,
3743    }))
3744}
3745
3746fn kafka_sink_builder(
3747    scx: &StatementContext,
3748    connection: ResolvedItemName,
3749    options: Vec<KafkaSinkConfigOption<Aug>>,
3750    format: Option<FormatSpecifier<Aug>>,
3751    relation_key_indices: Option<Vec<usize>>,
3752    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3753    headers_index: Option<usize>,
3754    value_desc: RelationDesc,
3755    envelope: SinkEnvelope,
3756    sink_from: CatalogItemId,
3757    commit_interval: Option<Duration>,
3758) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3759    // Get Kafka connection.
3760    let connection_item = scx.get_item_by_resolved_name(&connection)?;
3761    let connection_id = connection_item.id();
3762    match connection_item.connection()? {
3763        Connection::Kafka(_) => (),
3764        _ => sql_bail!(
3765            "{} is not a kafka connection",
3766            scx.catalog.resolve_full_name(connection_item.name())
3767        ),
3768    };
3769
3770    if commit_interval.is_some() {
3771        sql_bail!("COMMIT INTERVAL option is not supported with KAFKA sinks");
3772    }
3773
3774    let KafkaSinkConfigOptionExtracted {
3775        topic,
3776        compression_type,
3777        partition_by,
3778        progress_group_id_prefix,
3779        transactional_id_prefix,
3780        legacy_ids,
3781        topic_config,
3782        topic_metadata_refresh_interval,
3783        topic_partition_count,
3784        topic_replication_factor,
3785        seen: _,
3786    }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3787
3788    let transactional_id = match (transactional_id_prefix, legacy_ids) {
3789        (Some(_), Some(true)) => {
3790            sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3791        }
3792        (None, Some(true)) => KafkaIdStyle::Legacy,
3793        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3794    };
3795
3796    let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3797        (Some(_), Some(true)) => {
3798            sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3799        }
3800        (None, Some(true)) => KafkaIdStyle::Legacy,
3801        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3802    };
3803
3804    let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3805
3806    if topic_metadata_refresh_interval > MAX_KAFKA_TOPIC_METADATA_REFRESH_INTERVAL {
3807        // This is a librdkafka-enforced restriction that, if violated,
3808        // would result in a runtime error for the source.
3809        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3810    } else if topic_metadata_refresh_interval < MIN_KAFKA_TOPIC_METADATA_REFRESH_INTERVAL {
3811        // We enforce a minimum of 1 second here to prevent excessive refreshes, and ensure that
3812        // tokio::time::interval receives a valid (positive) duration.
3813        sql_bail!("TOPIC METADATA REFRESH INTERVAL must be at least 1 second");
3814    }
3815
3816    let assert_positive = |val: Option<i32>, name: &str| {
3817        if let Some(val) = val {
3818            if val <= 0 {
3819                sql_bail!("{} must be a positive integer", name);
3820            }
3821        }
3822        val.map(NonNeg::try_from)
3823            .transpose()
3824            .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3825    };
3826    let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3827    let topic_replication_factor =
3828        assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3829
3830    // Helper method to parse avro connection options for format specifiers that use avro
3831    // for either key or value encoding.
3832    let gen_avro_schema_options = |conn| {
3833        let CsrConnectionAvro {
3834            connection:
3835                CsrConnection {
3836                    connection,
3837                    options,
3838                },
3839            seed,
3840            key_strategy,
3841            value_strategy,
3842        } = conn;
3843        if seed.is_some() {
3844            sql_bail!("SEED option does not make sense with sinks");
3845        }
3846        if key_strategy.is_some() {
3847            sql_bail!("KEY STRATEGY option does not make sense with sinks");
3848        }
3849        if value_strategy.is_some() {
3850            sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3851        }
3852
3853        let item = scx.get_item_by_resolved_name(&connection)?;
3854        let csr_connection = match item.connection()? {
3855            Connection::Csr(_) => item.id(),
3856            _ => {
3857                sql_bail!(
3858                    "{} is not a schema registry connection",
3859                    scx.catalog
3860                        .resolve_full_name(item.name())
3861                        .to_string()
3862                        .quoted()
3863                )
3864            }
3865        };
3866        let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
3867
3868        if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
3869            sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
3870        }
3871
3872        if key_desc_and_indices.is_some()
3873            && (extracted_options.avro_key_fullname.is_some()
3874                ^ extracted_options.avro_value_fullname.is_some())
3875        {
3876            sql_bail!(
3877                "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
3878            );
3879        }
3880
3881        Ok((csr_connection, extracted_options))
3882    };
3883
3884    let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
3885        Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
3886        Format::Bytes if desc.arity() == 1 => {
3887            let col_type = &desc.typ().column_types[0].scalar_type;
3888            if !mz_pgrepr::Value::can_encode_binary(col_type) {
3889                bail_unsupported!(format!(
3890                    "BYTES format with non-encodable type: {:?}",
3891                    col_type
3892                ));
3893            }
3894
3895            Ok(KafkaSinkFormatType::Bytes)
3896        }
3897        Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
3898        Format::Bytes | Format::Text => {
3899            bail_unsupported!("BYTES or TEXT format with multiple columns")
3900        }
3901        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
3902        Format::Avro(AvroSchema::Csr { csr_connection }) => {
3903            let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
3904            let schema = if is_key {
3905                AvroSchemaGenerator::new(
3906                    desc.clone(),
3907                    false,
3908                    options.key_doc_options,
3909                    options.avro_key_fullname.as_deref().unwrap_or("row"),
3910                    options.null_defaults,
3911                    Some(sink_from),
3912                    false,
3913                )?
3914                .schema()
3915                .to_string()
3916            } else {
3917                AvroSchemaGenerator::new(
3918                    desc.clone(),
3919                    matches!(envelope, SinkEnvelope::Debezium),
3920                    options.value_doc_options,
3921                    options.avro_value_fullname.as_deref().unwrap_or("envelope"),
3922                    options.null_defaults,
3923                    Some(sink_from),
3924                    true,
3925                )?
3926                .schema()
3927                .to_string()
3928            };
3929            Ok(KafkaSinkFormatType::Avro {
3930                schema,
3931                compatibility_level: if is_key {
3932                    options.key_compatibility_level
3933                } else {
3934                    options.value_compatibility_level
3935                },
3936                wire_format: WireFormat::Confluent {
3937                    registry: Some(csr_connection),
3938                },
3939            })
3940        }
3941        format => bail_unsupported!(format!("sink format {:?}", format)),
3942    };
3943
3944    let partition_by = match &partition_by {
3945        Some(partition_by) => {
3946            let mut scope = Scope::from_source(None, value_desc.iter_names());
3947
3948            match envelope {
3949                SinkEnvelope::Upsert | SinkEnvelope::Append => (),
3950                SinkEnvelope::Debezium => {
3951                    let key_indices: HashSet<_> = key_desc_and_indices
3952                        .as_ref()
3953                        .map(|(_desc, indices)| indices.as_slice())
3954                        .unwrap_or_default()
3955                        .into_iter()
3956                        .collect();
3957                    for (i, item) in scope.items.iter_mut().enumerate() {
3958                        if !key_indices.contains(&i) {
3959                            item.error_if_referenced = Some(|_table, column| {
3960                                PlanError::InvalidPartitionByEnvelopeDebezium {
3961                                    column_name: column.to_string(),
3962                                }
3963                            });
3964                        }
3965                    }
3966                }
3967            };
3968
3969            let ecx = &ExprContext {
3970                qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
3971                name: "PARTITION BY",
3972                scope: &scope,
3973                relation_type: value_desc.typ(),
3974                allow_aggregates: false,
3975                allow_subqueries: false,
3976                allow_parameters: false,
3977                allow_windows: false,
3978            };
3979            let expr = plan_expr(ecx, partition_by)?.cast_to(
3980                ecx,
3981                CastContext::Assignment,
3982                &SqlScalarType::UInt64,
3983            )?;
3984            let expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
3985
3986            Some(expr)
3987        }
3988        _ => None,
3989    };
3990
3991    // Map from the format specifier of the statement to the individual key/value formats for the sink.
3992    let format = match format {
3993        Some(FormatSpecifier::KeyValue { key, value }) => {
3994            let key_format = match key_desc_and_indices.as_ref() {
3995                Some((desc, _indices)) => Some(map_format(key, desc, true)?),
3996                None => None,
3997            };
3998            KafkaSinkFormat {
3999                value_format: map_format(value, &value_desc, false)?,
4000                key_format,
4001            }
4002        }
4003        Some(FormatSpecifier::Bare(format)) => {
4004            let key_format = match key_desc_and_indices.as_ref() {
4005                Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
4006                None => None,
4007            };
4008            KafkaSinkFormat {
4009                value_format: map_format(format, &value_desc, false)?,
4010                key_format,
4011            }
4012        }
4013        None => bail_unsupported!("sink without format"),
4014    };
4015
4016    Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
4017        connection_id,
4018        connection: connection_id,
4019        format,
4020        topic: topic_name,
4021        relation_key_indices,
4022        key_desc_and_indices,
4023        headers_index,
4024        value_desc,
4025        partition_by,
4026        compression_type,
4027        progress_group_id,
4028        transactional_id,
4029        topic_options: KafkaTopicOptions {
4030            partition_count: topic_partition_count,
4031            replication_factor: topic_replication_factor,
4032            topic_config: topic_config.unwrap_or_default(),
4033        },
4034        topic_metadata_refresh_interval,
4035    }))
4036}
4037
4038pub fn describe_create_index(
4039    _: &StatementContext,
4040    _: CreateIndexStatement<Aug>,
4041) -> Result<StatementDesc, PlanError> {
4042    Ok(StatementDesc::new(None))
4043}
4044
4045pub fn plan_create_index(
4046    scx: &StatementContext,
4047    mut stmt: CreateIndexStatement<Aug>,
4048) -> Result<Plan, PlanError> {
4049    let CreateIndexStatement {
4050        name,
4051        on_name,
4052        in_cluster,
4053        key_parts,
4054        with_options,
4055        if_not_exists,
4056    } = &mut stmt;
4057    let on = scx.get_item_by_resolved_name(on_name)?;
4058
4059    {
4060        use CatalogItemType::*;
4061        match on.item_type() {
4062            Table | Source | View | MaterializedView => {
4063                if on.replacement_target().is_some() {
4064                    sql_bail!(
4065                        "index cannot be created on {} because it is a replacement {}",
4066                        on_name.full_name_str(),
4067                        on.item_type(),
4068                    );
4069                }
4070            }
4071            Sink | Index | Type | Func | Secret | Connection => {
4072                sql_bail!(
4073                    "index cannot be created on {} because it is a {}",
4074                    on_name.full_name_str(),
4075                    on.item_type(),
4076                );
4077            }
4078        }
4079    }
4080
4081    let on_desc = on
4082        .relation_desc()
4083        .ok_or_else(|| sql_err!("item does not have a relation description"))?;
4084
4085    let filled_key_parts = match key_parts {
4086        Some(kp) => kp.to_vec(),
4087        None => {
4088            // `key_parts` is None if we're creating a "default" index.
4089            // Precompute which column names are unambiguous in a single pass,
4090            // avoiding the O(n * k) cost of calling get_unambiguous_name per
4091            // key column.
4092            let mut name_counts = BTreeMap::new();
4093            for name in on_desc.iter_names() {
4094                *name_counts.entry(name).or_insert(0usize) += 1;
4095            }
4096            let key = on_desc.typ().default_key();
4097            key.iter()
4098                .map(|i| {
4099                    let name = on_desc.get_name(*i);
4100                    if name_counts.get(name).copied() == Some(1) {
4101                        Expr::Identifier(vec![name.clone().into()])
4102                    } else {
4103                        Expr::Value(Value::Number((i + 1).to_string()))
4104                    }
4105                })
4106                .collect()
4107        }
4108    };
4109    let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4110
4111    let index_name = if let Some(name) = name {
4112        QualifiedItemName {
4113            qualifiers: on.name().qualifiers.clone(),
4114            item: normalize::ident(name.clone()),
4115        }
4116    } else {
4117        let mut idx_name = QualifiedItemName {
4118            qualifiers: on.name().qualifiers.clone(),
4119            item: on.name().item.clone(),
4120        };
4121        if key_parts.is_none() {
4122            // We're trying to create the "default" index.
4123            idx_name.item += "_primary_idx";
4124        } else {
4125            // Use PG schema for automatically naming indexes:
4126            // `<table>_<_-separated indexed expressions>_idx`
4127            let index_name_col_suffix = keys
4128                .iter()
4129                .map(|k| match k {
4130                    mz_expr::MirScalarExpr::Column(i, name) => {
4131                        match (on_desc.get_unambiguous_name(*i), &name.0) {
4132                            (Some(col_name), _) => col_name.to_string(),
4133                            (None, Some(name)) => name.to_string(),
4134                            (None, None) => format!("{}", i + 1),
4135                        }
4136                    }
4137                    _ => "expr".to_string(),
4138                })
4139                .join("_");
4140            write!(idx_name.item, "_{index_name_col_suffix}_idx")
4141                .expect("write on strings cannot fail");
4142            idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4143        }
4144
4145        if !*if_not_exists {
4146            scx.catalog.find_available_name(idx_name)
4147        } else {
4148            idx_name
4149        }
4150    };
4151
4152    // Check for an object in the catalog with this same name
4153    let full_name = scx.catalog.resolve_full_name(&index_name);
4154    let partial_name = PartialItemName::from(full_name.clone());
4155    // For PostgreSQL compatibility, we need to prevent creating indexes when
4156    // there is an existing object *or* type of the same name.
4157    //
4158    // Technically, we only need to prevent coexistence of indexes and types
4159    // that have an associated relation (record types but not list/map types).
4160    // Enforcing that would be more complicated, though. It's backwards
4161    // compatible to weaken this restriction in the future.
4162    if let (Ok(item), false, false) = (
4163        scx.catalog.resolve_item_or_type(&partial_name),
4164        *if_not_exists,
4165        scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4166    ) {
4167        return Err(PlanError::ItemAlreadyExists {
4168            name: full_name.to_string(),
4169            item_type: item.item_type(),
4170        });
4171    }
4172
4173    let options = plan_index_options(scx, with_options.clone())?;
4174    let cluster_id = match in_cluster {
4175        None => scx.resolve_cluster(None)?.id(),
4176        Some(in_cluster) => in_cluster.id,
4177    };
4178
4179    *in_cluster = Some(ResolvedClusterName {
4180        id: cluster_id,
4181        print_name: None,
4182    });
4183
4184    // Normalize `stmt`.
4185    *name = Some(Ident::new(index_name.item.clone())?);
4186    *key_parts = Some(filled_key_parts);
4187    let if_not_exists = *if_not_exists;
4188
4189    let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4190    let compaction_window = options.iter().find_map(|o| {
4191        #[allow(irrefutable_let_patterns)]
4192        if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4193            Some(lcw.clone())
4194        } else {
4195            None
4196        }
4197    });
4198
4199    Ok(Plan::CreateIndex(CreateIndexPlan {
4200        name: index_name,
4201        index: Index {
4202            create_sql,
4203            on: on.global_id(),
4204            keys,
4205            cluster_id,
4206            compaction_window,
4207        },
4208        if_not_exists,
4209    }))
4210}
4211
4212pub fn describe_create_type(
4213    _: &StatementContext,
4214    _: CreateTypeStatement<Aug>,
4215) -> Result<StatementDesc, PlanError> {
4216    Ok(StatementDesc::new(None))
4217}
4218
4219pub fn plan_create_type(
4220    scx: &StatementContext,
4221    stmt: CreateTypeStatement<Aug>,
4222) -> Result<Plan, PlanError> {
4223    let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4224    let CreateTypeStatement { name, as_type, .. } = stmt;
4225
4226    fn validate_data_type(
4227        scx: &StatementContext,
4228        data_type: ResolvedDataType,
4229        as_type: &str,
4230        key: &str,
4231    ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4232        let (id, modifiers) = match data_type {
4233            ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4234            _ => sql_bail!(
4235                "CREATE TYPE ... AS {}option {} can only use named data types, but \
4236                        found unnamed data type {}. Use CREATE TYPE to create a named type first",
4237                as_type,
4238                key,
4239                data_type.human_readable_name(),
4240            ),
4241        };
4242
4243        let item = scx.catalog.get_item(&id);
4244        match item.type_details() {
4245            None => sql_bail!(
4246                "{} must be of class type, but received {} which is of class {}",
4247                key,
4248                scx.catalog.resolve_full_name(item.name()),
4249                item.item_type()
4250            ),
4251            Some(CatalogTypeDetails {
4252                typ: CatalogType::Char,
4253                ..
4254            }) => {
4255                bail_unsupported!("embedding char type in a list or map")
4256            }
4257            _ => {
4258                // Validate that the modifiers are actually valid.
4259                scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4260
4261                Ok((id, modifiers))
4262            }
4263        }
4264    }
4265
4266    let inner = match as_type {
4267        CreateTypeAs::List { options } => {
4268            let CreateTypeListOptionExtracted {
4269                element_type,
4270                seen: _,
4271            } = CreateTypeListOptionExtracted::try_from(options)?;
4272            let element_type =
4273                element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4274            let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4275            CatalogType::List {
4276                element_reference: id,
4277                element_modifiers: modifiers,
4278            }
4279        }
4280        CreateTypeAs::Map { options } => {
4281            let CreateTypeMapOptionExtracted {
4282                key_type,
4283                value_type,
4284                seen: _,
4285            } = CreateTypeMapOptionExtracted::try_from(options)?;
4286            let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4287            let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4288            let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4289            let (value_id, value_modifiers) =
4290                validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4291            CatalogType::Map {
4292                key_reference: key_id,
4293                key_modifiers,
4294                value_reference: value_id,
4295                value_modifiers,
4296            }
4297        }
4298        CreateTypeAs::Record { column_defs } => {
4299            let mut fields = vec![];
4300            for column_def in column_defs {
4301                let data_type = column_def.data_type;
4302                let key = ident(column_def.name.clone());
4303                let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4304                fields.push(CatalogRecordField {
4305                    name: ColumnName::from(key.clone()),
4306                    type_reference: id,
4307                    type_modifiers: modifiers,
4308                });
4309            }
4310            CatalogType::Record { fields }
4311        }
4312    };
4313
4314    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4315
4316    // Check for an object in the catalog with this same name
4317    let full_name = scx.catalog.resolve_full_name(&name);
4318    let partial_name = PartialItemName::from(full_name.clone());
4319    // For PostgreSQL compatibility, we need to prevent creating types when
4320    // there is an existing object *or* type of the same name.
4321    if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4322        if item.item_type().conflicts_with_type() {
4323            return Err(PlanError::ItemAlreadyExists {
4324                name: full_name.to_string(),
4325                item_type: item.item_type(),
4326            });
4327        }
4328    }
4329
4330    Ok(Plan::CreateType(CreateTypePlan {
4331        name,
4332        typ: Type { create_sql, inner },
4333    }))
4334}
4335
4336generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4337
4338generate_extracted_config!(
4339    CreateTypeMapOption,
4340    (KeyType, ResolvedDataType),
4341    (ValueType, ResolvedDataType)
4342);
4343
4344#[derive(Debug)]
4345pub enum PlannedAlterRoleOption {
4346    Attributes(PlannedRoleAttributes),
4347    Variable(PlannedRoleVariable),
4348}
4349
4350#[derive(Debug, Clone)]
4351pub struct PlannedRoleAttributes {
4352    pub inherit: Option<bool>,
4353    pub password: Option<Password>,
4354    pub scram_iterations: Option<NonZeroU32>,
4355    /// `nopassword` is set to true if the password is from the parser is None.
4356    /// This is semantically different than not supplying a password at all,
4357    /// to allow for unsetting a password.
4358    pub nopassword: Option<bool>,
4359    pub superuser: Option<bool>,
4360    pub login: Option<bool>,
4361}
4362
4363fn plan_role_attributes(
4364    options: Vec<RoleAttribute>,
4365    scx: &StatementContext,
4366) -> Result<PlannedRoleAttributes, PlanError> {
4367    let mut planned_attributes = PlannedRoleAttributes {
4368        inherit: None,
4369        password: None,
4370        scram_iterations: None,
4371        superuser: None,
4372        login: None,
4373        nopassword: None,
4374    };
4375
4376    for option in options {
4377        match option {
4378            RoleAttribute::Inherit | RoleAttribute::NoInherit
4379                if planned_attributes.inherit.is_some() =>
4380            {
4381                sql_bail!("conflicting or redundant options");
4382            }
4383            RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4384                bail_never_supported!(
4385                    "CREATECLUSTER attribute",
4386                    "sql/create-role/#details",
4387                    "Use system privileges instead."
4388                );
4389            }
4390            RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4391                bail_never_supported!(
4392                    "CREATEDB attribute",
4393                    "sql/create-role/#details",
4394                    "Use system privileges instead."
4395                );
4396            }
4397            RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4398                bail_never_supported!(
4399                    "CREATEROLE attribute",
4400                    "sql/create-role/#details",
4401                    "Use system privileges instead."
4402                );
4403            }
4404            RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4405                sql_bail!("conflicting or redundant options");
4406            }
4407
4408            RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4409            RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4410            RoleAttribute::Password(password) => {
4411                if let Some(password) = password {
4412                    planned_attributes.password = Some(password.into());
4413                    planned_attributes.scram_iterations =
4414                        Some(scx.catalog.system_vars().scram_iterations())
4415                } else {
4416                    planned_attributes.nopassword = Some(true);
4417                }
4418            }
4419            RoleAttribute::SuperUser => {
4420                if planned_attributes.superuser == Some(false) {
4421                    sql_bail!("conflicting or redundant options");
4422                }
4423                planned_attributes.superuser = Some(true);
4424            }
4425            RoleAttribute::NoSuperUser => {
4426                if planned_attributes.superuser == Some(true) {
4427                    sql_bail!("conflicting or redundant options");
4428                }
4429                planned_attributes.superuser = Some(false);
4430            }
4431            RoleAttribute::Login => {
4432                if planned_attributes.login == Some(false) {
4433                    sql_bail!("conflicting or redundant options");
4434                }
4435                planned_attributes.login = Some(true);
4436            }
4437            RoleAttribute::NoLogin => {
4438                if planned_attributes.login == Some(true) {
4439                    sql_bail!("conflicting or redundant options");
4440                }
4441                planned_attributes.login = Some(false);
4442            }
4443        }
4444    }
4445    if planned_attributes.inherit == Some(false) {
4446        bail_unsupported!("non inherit roles");
4447    }
4448
4449    Ok(planned_attributes)
4450}
4451
4452#[derive(Debug)]
4453pub enum PlannedRoleVariable {
4454    Set { name: String, value: VariableValue },
4455    Reset { name: String },
4456}
4457
4458impl PlannedRoleVariable {
4459    pub fn name(&self) -> &str {
4460        match self {
4461            PlannedRoleVariable::Set { name, .. } => name,
4462            PlannedRoleVariable::Reset { name } => name,
4463        }
4464    }
4465}
4466
4467fn plan_role_variable(
4468    scx: &StatementContext,
4469    variable: SetRoleVar,
4470) -> Result<PlannedRoleVariable, PlanError> {
4471    let plan = match variable {
4472        SetRoleVar::Set { name, value } => {
4473            let name = name.to_string();
4474            let value = scl::plan_set_variable_to(value)?;
4475            // Gate feature-flagged isolation levels, matching the `SET` and
4476            // connection-option paths in `SessionVars::set`.
4477            if let VariableValue::Values(values) = &value {
4478                vars::check_transaction_isolation_feature_flag(
4479                    &name,
4480                    VarInput::SqlSet(values),
4481                    scx.catalog.system_vars(),
4482                )?;
4483            }
4484            PlannedRoleVariable::Set { name, value }
4485        }
4486        SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4487            name: name.to_string(),
4488        },
4489    };
4490    Ok(plan)
4491}
4492
4493pub fn describe_create_role(
4494    _: &StatementContext,
4495    _: CreateRoleStatement,
4496) -> Result<StatementDesc, PlanError> {
4497    Ok(StatementDesc::new(None))
4498}
4499
4500pub fn plan_create_role(
4501    scx: &StatementContext,
4502    CreateRoleStatement { name, options }: CreateRoleStatement,
4503) -> Result<Plan, PlanError> {
4504    let attributes = plan_role_attributes(options, scx)?;
4505    Ok(Plan::CreateRole(CreateRolePlan {
4506        name: normalize::ident(name),
4507        attributes: attributes.into(),
4508    }))
4509}
4510
4511pub fn plan_create_network_policy(
4512    ctx: &StatementContext,
4513    CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4514) -> Result<Plan, PlanError> {
4515    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4516    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4517
4518    let Some(rule_defs) = policy_options.rules else {
4519        sql_bail!("RULES must be specified when creating network policies.");
4520    };
4521
4522    let mut rules = vec![];
4523    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4524        let NetworkPolicyRuleOptionExtracted {
4525            seen: _,
4526            direction,
4527            action,
4528            address,
4529        } = options.try_into()?;
4530        let (direction, action, address) = match (direction, action, address) {
4531            (Some(direction), Some(action), Some(address)) => (
4532                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4533                NetworkPolicyRuleAction::try_from(action.as_str())?,
4534                PolicyAddress::try_from(address.as_str())?,
4535            ),
4536            (_, _, _) => {
4537                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4538            }
4539        };
4540        rules.push(NetworkPolicyRule {
4541            name: normalize::ident(name),
4542            direction,
4543            action,
4544            address,
4545        });
4546    }
4547
4548    if rules.len()
4549        > ctx
4550            .catalog
4551            .system_vars()
4552            .max_rules_per_network_policy()
4553            .try_into()?
4554    {
4555        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4556    }
4557
4558    Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4559        name: normalize::ident(name),
4560        rules,
4561    }))
4562}
4563
4564pub fn plan_alter_network_policy(
4565    ctx: &StatementContext,
4566    AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4567) -> Result<Plan, PlanError> {
4568    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4569
4570    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4571    let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4572
4573    let Some(rule_defs) = policy_options.rules else {
4574        sql_bail!("RULES must be specified when creating network policies.");
4575    };
4576
4577    let mut rules = vec![];
4578    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4579        let NetworkPolicyRuleOptionExtracted {
4580            seen: _,
4581            direction,
4582            action,
4583            address,
4584        } = options.try_into()?;
4585
4586        let (direction, action, address) = match (direction, action, address) {
4587            (Some(direction), Some(action), Some(address)) => (
4588                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4589                NetworkPolicyRuleAction::try_from(action.as_str())?,
4590                PolicyAddress::try_from(address.as_str())?,
4591            ),
4592            (_, _, _) => {
4593                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4594            }
4595        };
4596        rules.push(NetworkPolicyRule {
4597            name: normalize::ident(name),
4598            direction,
4599            action,
4600            address,
4601        });
4602    }
4603    if rules.len()
4604        > ctx
4605            .catalog
4606            .system_vars()
4607            .max_rules_per_network_policy()
4608            .try_into()?
4609    {
4610        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4611    }
4612
4613    Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4614        id: policy.id(),
4615        name: normalize::ident(name),
4616        rules,
4617    }))
4618}
4619
4620pub fn describe_create_cluster(
4621    _: &StatementContext,
4622    _: CreateClusterStatement<Aug>,
4623) -> Result<StatementDesc, PlanError> {
4624    Ok(StatementDesc::new(None))
4625}
4626
4627// WARNING:
4628// DO NOT set any `Default` value here using the built-in mechanism of `generate_extracted_config`!
4629// These options are also used in ALTER CLUSTER, where not giving an option means that the value of
4630// that option stays the same. If you were to give a default value here, then not giving that option
4631// to ALTER CLUSTER would always reset the value of that option to the default.
4632generate_extracted_config!(
4633    ClusterOption,
4634    (AvailabilityZones, Vec<String>),
4635    (Disk, bool),
4636    (IntrospectionDebugging, bool),
4637    (IntrospectionInterval, OptionalDuration),
4638    (Managed, bool),
4639    (Replicas, Vec<ReplicaDefinition<Aug>>),
4640    (ReplicationFactor, u32),
4641    (Size, String),
4642    (Schedule, ClusterScheduleOptionValue),
4643    (WorkloadClass, OptionalString)
4644);
4645
4646generate_extracted_config!(
4647    NetworkPolicyOption,
4648    (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4649);
4650
4651generate_extracted_config!(
4652    NetworkPolicyRuleOption,
4653    (Direction, String),
4654    (Action, String),
4655    (Address, String)
4656);
4657
4658generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4659
4660generate_extracted_config!(
4661    ClusterAlterUntilReadyOption,
4662    (Timeout, Duration),
4663    (OnTimeout, String)
4664);
4665
4666generate_extracted_config!(
4667    ClusterFeature,
4668    (ReoptimizeImportedViews, Option<bool>, Default(None)),
4669    (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4670    (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4671    (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4672    (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4673    (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4674    (
4675        EnableProjectionPushdownAfterRelationCse,
4676        Option<bool>,
4677        Default(None)
4678    )
4679);
4680
4681/// Convert a [`CreateClusterStatement`] into a [`Plan`].
4682///
4683/// The reverse of [`unplan_create_cluster`].
4684pub fn plan_create_cluster(
4685    scx: &StatementContext,
4686    stmt: CreateClusterStatement<Aug>,
4687) -> Result<Plan, PlanError> {
4688    let plan = plan_create_cluster_inner(scx, stmt)?;
4689
4690    // Roundtrip through unplan and make sure that we end up with the same plan.
4691    if let CreateClusterVariant::Managed(_) = &plan.variant {
4692        let stmt = unplan_create_cluster(scx, plan.clone())
4693            .map_err(|e| PlanError::Replan(e.to_string()))?;
4694        let create_sql = stmt.to_ast_string_stable();
4695        let stmt = parse::parse(&create_sql)
4696            .map_err(|e| PlanError::Replan(e.to_string()))?
4697            .into_element()
4698            .ast;
4699        let (stmt, _resolved_ids) =
4700            names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4701        let stmt = match stmt {
4702            Statement::CreateCluster(stmt) => stmt,
4703            stmt => {
4704                return Err(PlanError::Replan(format!(
4705                    "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4706                )));
4707            }
4708        };
4709        let replan =
4710            plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4711        if plan != replan {
4712            return Err(PlanError::Replan(format!(
4713                "replan does not match: plan={plan:?}, replan={replan:?}"
4714            )));
4715        }
4716    }
4717
4718    Ok(Plan::CreateCluster(plan))
4719}
4720
4721pub fn plan_create_cluster_inner(
4722    scx: &StatementContext,
4723    CreateClusterStatement {
4724        name,
4725        options,
4726        features,
4727    }: CreateClusterStatement<Aug>,
4728) -> Result<CreateClusterPlan, PlanError> {
4729    let ClusterOptionExtracted {
4730        availability_zones,
4731        introspection_debugging,
4732        introspection_interval,
4733        managed,
4734        replicas,
4735        replication_factor,
4736        seen: _,
4737        size,
4738        disk,
4739        schedule,
4740        workload_class,
4741    }: ClusterOptionExtracted = options.try_into()?;
4742
4743    let managed = managed.unwrap_or_else(|| replicas.is_none());
4744
4745    if !scx.catalog.active_role_id().is_system() {
4746        if !features.is_empty() {
4747            sql_bail!("FEATURES not supported for non-system users");
4748        }
4749        if workload_class.is_some() {
4750            sql_bail!("WORKLOAD CLASS not supported for non-system users");
4751        }
4752    }
4753
4754    let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4755    let workload_class = workload_class.and_then(|v| v.0);
4756
4757    if managed {
4758        if replicas.is_some() {
4759            sql_bail!("REPLICAS not supported for managed clusters");
4760        }
4761        let Some(size) = size else {
4762            sql_bail!("SIZE must be specified for managed clusters");
4763        };
4764
4765        if disk.is_some() {
4766            // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
4767            // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
4768            // we'll be able to remove the `DISK` option entirely.
4769            if scx.catalog.is_cluster_size_cc(&size) {
4770                sql_bail!(
4771                    "DISK option not supported for modern cluster sizes because disk is always enabled"
4772                );
4773            }
4774
4775            scx.catalog
4776                .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4777        }
4778
4779        let compute = plan_compute_replica_config(
4780            introspection_interval,
4781            introspection_debugging.unwrap_or(false),
4782        )?;
4783
4784        let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4785            replication_factor.unwrap_or_else(|| {
4786                scx.catalog
4787                    .system_vars()
4788                    .default_cluster_replication_factor()
4789            })
4790        } else {
4791            scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4792            if replication_factor.is_some() {
4793                sql_bail!(
4794                    "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4795                );
4796            }
4797            // If we have a non-trivial schedule, then let's not have any replicas initially,
4798            // to avoid quickly going back and forth if the schedule doesn't want a replica
4799            // initially.
4800            0
4801        };
4802        let availability_zones = availability_zones.unwrap_or_default();
4803
4804        if !availability_zones.is_empty() {
4805            scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4806        }
4807
4808        // Plan OptimizerFeatureOverrides.
4809        let ClusterFeatureExtracted {
4810            reoptimize_imported_views,
4811            enable_eager_delta_joins,
4812            enable_new_outer_join_lowering,
4813            enable_variadic_left_join_lowering,
4814            enable_letrec_fixpoint_analysis,
4815            enable_join_prioritize_arranged,
4816            enable_projection_pushdown_after_relation_cse,
4817            seen: _,
4818        } = ClusterFeatureExtracted::try_from(features)?;
4819        let optimizer_feature_overrides = OptimizerFeatureOverrides {
4820            reoptimize_imported_views,
4821            enable_eager_delta_joins,
4822            enable_new_outer_join_lowering,
4823            enable_variadic_left_join_lowering,
4824            enable_letrec_fixpoint_analysis,
4825            enable_join_prioritize_arranged,
4826            enable_projection_pushdown_after_relation_cse,
4827            ..Default::default()
4828        };
4829
4830        let schedule = plan_cluster_schedule(schedule)?;
4831
4832        Ok(CreateClusterPlan {
4833            name: normalize::ident(name),
4834            variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4835                replication_factor,
4836                size,
4837                availability_zones,
4838                compute,
4839                optimizer_feature_overrides,
4840                schedule,
4841            }),
4842            workload_class,
4843        })
4844    } else {
4845        let Some(replica_defs) = replicas else {
4846            sql_bail!("REPLICAS must be specified for unmanaged clusters");
4847        };
4848        if availability_zones.is_some() {
4849            sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4850        }
4851        if replication_factor.is_some() {
4852            sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4853        }
4854        if introspection_debugging.is_some() {
4855            sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4856        }
4857        if introspection_interval.is_some() {
4858            sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4859        }
4860        if size.is_some() {
4861            sql_bail!("SIZE not supported for unmanaged clusters");
4862        }
4863        if disk.is_some() {
4864            sql_bail!("DISK not supported for unmanaged clusters");
4865        }
4866        if !features.is_empty() {
4867            sql_bail!("FEATURES not supported for unmanaged clusters");
4868        }
4869        if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4870            sql_bail!(
4871                "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4872            );
4873        }
4874
4875        let mut replicas = vec![];
4876        for ReplicaDefinition { name, options } in replica_defs {
4877            replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4878        }
4879
4880        Ok(CreateClusterPlan {
4881            name: normalize::ident(name),
4882            variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4883            workload_class,
4884        })
4885    }
4886}
4887
4888/// Convert a [`CreateClusterPlan`] into a [`CreateClusterStatement`].
4889///
4890/// The reverse of [`plan_create_cluster`].
4891pub fn unplan_create_cluster(
4892    scx: &StatementContext,
4893    CreateClusterPlan {
4894        name,
4895        variant,
4896        workload_class,
4897    }: CreateClusterPlan,
4898) -> Result<CreateClusterStatement<Aug>, PlanError> {
4899    match variant {
4900        CreateClusterVariant::Managed(CreateClusterManagedPlan {
4901            replication_factor,
4902            size,
4903            availability_zones,
4904            compute,
4905            optimizer_feature_overrides,
4906            schedule,
4907        }) => {
4908            let schedule = unplan_cluster_schedule(schedule);
4909            let OptimizerFeatureOverrides {
4910                enable_reduce_mfp_fusion: _,
4911                enable_cardinality_estimates: _,
4912                persist_fast_path_limit: _,
4913                reoptimize_imported_views,
4914                enable_eager_delta_joins,
4915                enable_new_outer_join_lowering,
4916                enable_variadic_left_join_lowering,
4917                enable_letrec_fixpoint_analysis,
4918                enable_join_prioritize_arranged,
4919                enable_projection_pushdown_after_relation_cse,
4920                enable_less_reduce_in_eqprop: _,
4921                enable_dequadratic_eqprop_map: _,
4922                enable_eq_classes_withholding_errors: _,
4923                enable_fast_path_plan_insights: _,
4924                enable_cast_elimination: _,
4925                enable_case_literal_transform: _,
4926                enable_simplify_quantified_comparisons: _,
4927                enable_coalesce_case_transform: _,
4928                enable_will_distinct_propagation: _,
4929            } = optimizer_feature_overrides;
4930            // The ones from above that don't occur below are not wired up to cluster features.
4931            let features_extracted = ClusterFeatureExtracted {
4932                // Seen is ignored when unplanning.
4933                seen: Default::default(),
4934                reoptimize_imported_views,
4935                enable_eager_delta_joins,
4936                enable_new_outer_join_lowering,
4937                enable_variadic_left_join_lowering,
4938                enable_letrec_fixpoint_analysis,
4939                enable_join_prioritize_arranged,
4940                enable_projection_pushdown_after_relation_cse,
4941            };
4942            let features = features_extracted.into_values(scx.catalog);
4943            let availability_zones = if availability_zones.is_empty() {
4944                None
4945            } else {
4946                Some(availability_zones)
4947            };
4948            let (introspection_interval, introspection_debugging) =
4949                unplan_compute_replica_config(compute);
4950            // Replication factor cannot be explicitly specified with a refresh schedule, it's
4951            // always 1 or less.
4952            let replication_factor = match &schedule {
4953                ClusterScheduleOptionValue::Manual => Some(replication_factor),
4954                ClusterScheduleOptionValue::Refresh { .. } => {
4955                    // A cluster with a refresh schedule is turned On/Off by the cluster scheduling
4956                    // policy, so its replication factor should always be 0 or 1, and CREATE/ALTER
4957                    // reject setting both a non-MANUAL schedule and a higher replication factor. If
4958                    // we nevertheless find one (e.g., a cluster left in an invalid state by an
4959                    // older version), log loudly rather than crashing the coordinator: the
4960                    // replication factor is omitted from the rendered statement regardless.
4961                    soft_assert_or_log!(
4962                        replication_factor <= 1,
4963                        "replication factor, {replication_factor:?}, must be <= 1 with a refresh schedule"
4964                    );
4965                    None
4966                }
4967            };
4968            let workload_class = workload_class.map(|s| OptionalString(Some(s)));
4969            let options_extracted = ClusterOptionExtracted {
4970                // Seen is ignored when unplanning.
4971                seen: Default::default(),
4972                availability_zones,
4973                disk: None,
4974                introspection_debugging: Some(introspection_debugging),
4975                introspection_interval,
4976                managed: Some(true),
4977                replicas: None,
4978                replication_factor,
4979                size: Some(size),
4980                schedule: Some(schedule),
4981                workload_class,
4982            };
4983            let options = options_extracted.into_values(scx.catalog);
4984            let name = Ident::new_unchecked(name);
4985            Ok(CreateClusterStatement {
4986                name,
4987                options,
4988                features,
4989            })
4990        }
4991        CreateClusterVariant::Unmanaged(_) => {
4992            bail_unsupported!("SHOW CREATE for unmanaged clusters")
4993        }
4994    }
4995}
4996
4997generate_extracted_config!(
4998    ReplicaOption,
4999    (AvailabilityZone, String),
5000    (BilledAs, String),
5001    (ComputeAddresses, Vec<String>),
5002    (ComputectlAddresses, Vec<String>),
5003    (Disk, bool),
5004    (Internal, bool, Default(false)),
5005    (IntrospectionDebugging, bool, Default(false)),
5006    (IntrospectionInterval, OptionalDuration),
5007    (Size, String),
5008    (StorageAddresses, Vec<String>),
5009    (StoragectlAddresses, Vec<String>),
5010    (Workers, u16)
5011);
5012
5013fn plan_replica_config(
5014    scx: &StatementContext,
5015    options: Vec<ReplicaOption<Aug>>,
5016) -> Result<ReplicaConfig, PlanError> {
5017    let ReplicaOptionExtracted {
5018        availability_zone,
5019        billed_as,
5020        computectl_addresses,
5021        disk,
5022        internal,
5023        introspection_debugging,
5024        introspection_interval,
5025        size,
5026        storagectl_addresses,
5027        ..
5028    }: ReplicaOptionExtracted = options.try_into()?;
5029
5030    let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
5031
5032    match (
5033        size,
5034        availability_zone,
5035        billed_as,
5036        storagectl_addresses,
5037        computectl_addresses,
5038    ) {
5039        // Common cases we expect end users to hit.
5040        (None, _, None, None, None) => {
5041            // We don't mention the unmanaged options in the error message
5042            // because they are only available in unsafe mode.
5043            sql_bail!("SIZE option must be specified");
5044        }
5045        (Some(size), availability_zone, billed_as, None, None) => {
5046            if disk.is_some() {
5047                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
5048                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
5049                // we'll be able to remove the `DISK` option entirely.
5050                if scx.catalog.is_cluster_size_cc(&size) {
5051                    sql_bail!(
5052                        "DISK option not supported for modern cluster sizes because disk is always enabled"
5053                    );
5054                }
5055
5056                scx.catalog
5057                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5058            }
5059
5060            Ok(ReplicaConfig::Orchestrated {
5061                size,
5062                availability_zone,
5063                compute,
5064                billed_as,
5065                internal,
5066            })
5067        }
5068
5069        (None, None, None, storagectl_addresses, computectl_addresses) => {
5070            scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
5071
5072            // When manually testing Materialize in unsafe mode, it's easy to
5073            // accidentally omit one of these options, so we try to produce
5074            // helpful error messages.
5075            let Some(storagectl_addrs) = storagectl_addresses else {
5076                sql_bail!("missing STORAGECTL ADDRESSES option");
5077            };
5078            let Some(computectl_addrs) = computectl_addresses else {
5079                sql_bail!("missing COMPUTECTL ADDRESSES option");
5080            };
5081
5082            if storagectl_addrs.len() != computectl_addrs.len() {
5083                sql_bail!(
5084                    "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
5085                );
5086            }
5087
5088            if disk.is_some() {
5089                sql_bail!("DISK can't be specified for unorchestrated clusters");
5090            }
5091
5092            Ok(ReplicaConfig::Unorchestrated {
5093                storagectl_addrs,
5094                computectl_addrs,
5095                compute,
5096            })
5097        }
5098        _ => {
5099            // We don't bother trying to produce a more helpful error message
5100            // here because no user is likely to hit this path.
5101            sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
5102        }
5103    }
5104}
5105
5106/// Convert an [`Option<OptionalDuration>`] and [`bool`] into a [`ComputeReplicaConfig`].
5107///
5108/// The reverse of [`unplan_compute_replica_config`].
5109fn plan_compute_replica_config(
5110    introspection_interval: Option<OptionalDuration>,
5111    introspection_debugging: bool,
5112) -> Result<ComputeReplicaConfig, PlanError> {
5113    let introspection_interval = introspection_interval
5114        .map(|OptionalDuration(i)| i)
5115        .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5116    let introspection = match introspection_interval {
5117        Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5118            interval,
5119            debugging: introspection_debugging,
5120        }),
5121        None if introspection_debugging => {
5122            sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5123        }
5124        None => None,
5125    };
5126    let compute = ComputeReplicaConfig { introspection };
5127    Ok(compute)
5128}
5129
5130/// Convert a [`ComputeReplicaConfig`] into an [`Option<OptionalDuration>`] and [`bool`].
5131///
5132/// The reverse of [`plan_compute_replica_config`].
5133fn unplan_compute_replica_config(
5134    compute_replica_config: ComputeReplicaConfig,
5135) -> (Option<OptionalDuration>, bool) {
5136    match compute_replica_config.introspection {
5137        Some(ComputeReplicaIntrospectionConfig {
5138            debugging,
5139            interval,
5140        }) => (Some(OptionalDuration(Some(interval))), debugging),
5141        None => (Some(OptionalDuration(None)), false),
5142    }
5143}
5144
5145/// Convert a [`ClusterScheduleOptionValue`] into a [`ClusterSchedule`].
5146///
5147/// The reverse of [`unplan_cluster_schedule`].
5148fn plan_cluster_schedule(
5149    schedule: ClusterScheduleOptionValue,
5150) -> Result<ClusterSchedule, PlanError> {
5151    Ok(match schedule {
5152        ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5153        // If `HYDRATION TIME ESTIMATE` is not explicitly given, we default to 0.
5154        ClusterScheduleOptionValue::Refresh {
5155            hydration_time_estimate: None,
5156        } => ClusterSchedule::Refresh {
5157            hydration_time_estimate: Duration::from_millis(0),
5158        },
5159        // Otherwise we convert the `IntervalValue` to a `Duration`.
5160        ClusterScheduleOptionValue::Refresh {
5161            hydration_time_estimate: Some(interval_value),
5162        } => {
5163            let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5164            if interval.as_microseconds() < 0 {
5165                sql_bail!(
5166                    "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5167                    interval
5168                );
5169            }
5170            if interval.months != 0 {
5171                // This limitation is because we want this interval to be cleanly convertable
5172                // to a unix epoch timestamp difference. When the interval involves months, then
5173                // this is not true anymore, because months have variable lengths.
5174                sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5175            }
5176            let duration = interval.duration()?;
5177            if u64::try_from(duration.as_millis()).is_err()
5178                || Interval::from_duration(&duration).is_err()
5179            {
5180                sql_bail!("HYDRATION TIME ESTIMATE too large");
5181            }
5182            ClusterSchedule::Refresh {
5183                hydration_time_estimate: duration,
5184            }
5185        }
5186    })
5187}
5188
5189/// Convert a [`ClusterSchedule`] into a [`ClusterScheduleOptionValue`].
5190///
5191/// The reverse of [`plan_cluster_schedule`].
5192fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5193    match schedule {
5194        ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5195        ClusterSchedule::Refresh {
5196            hydration_time_estimate,
5197        } => {
5198            let interval = Interval::from_duration(&hydration_time_estimate)
5199                .expect("planning ensured that this is convertible back to Interval");
5200            let interval_value = literal::unplan_interval(&interval);
5201            ClusterScheduleOptionValue::Refresh {
5202                hydration_time_estimate: Some(interval_value),
5203            }
5204        }
5205    }
5206}
5207
5208pub fn describe_create_cluster_replica(
5209    _: &StatementContext,
5210    _: CreateClusterReplicaStatement<Aug>,
5211) -> Result<StatementDesc, PlanError> {
5212    Ok(StatementDesc::new(None))
5213}
5214
5215pub fn plan_create_cluster_replica(
5216    scx: &StatementContext,
5217    CreateClusterReplicaStatement {
5218        definition: ReplicaDefinition { name, options },
5219        of_cluster,
5220    }: CreateClusterReplicaStatement<Aug>,
5221) -> Result<Plan, PlanError> {
5222    let cluster = scx
5223        .catalog
5224        .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5225
5226    let config = plan_replica_config(scx, options)?;
5227
5228    if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5229        if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5230            return Err(PlanError::MangedReplicaName(name.into_string()));
5231        }
5232    } else {
5233        ensure_cluster_is_not_managed(scx, cluster.id())?;
5234    }
5235
5236    Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5237        name: normalize::ident(name),
5238        cluster_id: cluster.id(),
5239        config,
5240    }))
5241}
5242
5243pub fn describe_create_secret(
5244    _: &StatementContext,
5245    _: CreateSecretStatement<Aug>,
5246) -> Result<StatementDesc, PlanError> {
5247    Ok(StatementDesc::new(None))
5248}
5249
5250pub fn plan_create_secret(
5251    scx: &StatementContext,
5252    stmt: CreateSecretStatement<Aug>,
5253) -> Result<Plan, PlanError> {
5254    let CreateSecretStatement {
5255        name,
5256        if_not_exists,
5257        value,
5258    } = &stmt;
5259
5260    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5261    let mut create_sql_statement = stmt.clone();
5262    create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5263    let create_sql =
5264        normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5265    let secret_as = query::plan_secret_as(scx, value.clone())?;
5266
5267    let secret = Secret {
5268        create_sql,
5269        secret_as,
5270    };
5271
5272    Ok(Plan::CreateSecret(CreateSecretPlan {
5273        name,
5274        secret,
5275        if_not_exists: *if_not_exists,
5276    }))
5277}
5278
5279pub fn describe_create_connection(
5280    _: &StatementContext,
5281    _: CreateConnectionStatement<Aug>,
5282) -> Result<StatementDesc, PlanError> {
5283    Ok(StatementDesc::new(None))
5284}
5285
5286generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5287
5288pub fn plan_create_connection(
5289    scx: &StatementContext,
5290    mut stmt: CreateConnectionStatement<Aug>,
5291) -> Result<Plan, PlanError> {
5292    let CreateConnectionStatement {
5293        name,
5294        connection_type,
5295        values,
5296        if_not_exists,
5297        with_options,
5298    } = stmt.clone();
5299    let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5300    let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5301    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5302
5303    let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5304    if options.validate.is_some() {
5305        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5306    }
5307    let validate = match options.validate {
5308        Some(val) => val,
5309        None => {
5310            scx.catalog
5311                .system_vars()
5312                .enable_default_connection_validation()
5313                && details.to_connection().validate_by_default()
5314        }
5315    };
5316
5317    // Check for an object in the catalog with this same name
5318    let full_name = scx.catalog.resolve_full_name(&name);
5319    let partial_name = PartialItemName::from(full_name.clone());
5320    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5321        return Err(PlanError::ItemAlreadyExists {
5322            name: full_name.to_string(),
5323            item_type: item.item_type(),
5324        });
5325    }
5326
5327    // For SSH connections, overwrite the public key options based on the
5328    // connection details, in case we generated new keys during planning.
5329    if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5330        stmt.values.retain(|v| {
5331            v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5332        });
5333        stmt.values.push(ConnectionOption {
5334            name: ConnectionOptionName::PublicKey1,
5335            value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5336        });
5337        stmt.values.push(ConnectionOption {
5338            name: ConnectionOptionName::PublicKey2,
5339            value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5340        });
5341    }
5342    let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5343
5344    let plan = CreateConnectionPlan {
5345        name,
5346        if_not_exists,
5347        connection: crate::plan::Connection {
5348            create_sql,
5349            details,
5350        },
5351        validate,
5352    };
5353    Ok(Plan::CreateConnection(plan))
5354}
5355
5356fn plan_drop_database(
5357    scx: &StatementContext,
5358    if_exists: bool,
5359    name: &UnresolvedDatabaseName,
5360    cascade: bool,
5361) -> Result<Option<DatabaseId>, PlanError> {
5362    Ok(match resolve_database(scx, name, if_exists)? {
5363        Some(database) => {
5364            if !cascade && database.has_schemas() {
5365                sql_bail!(
5366                    "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5367                    name,
5368                );
5369            }
5370            Some(database.id())
5371        }
5372        None => None,
5373    })
5374}
5375
5376pub fn describe_drop_objects(
5377    _: &StatementContext,
5378    _: DropObjectsStatement,
5379) -> Result<StatementDesc, PlanError> {
5380    Ok(StatementDesc::new(None))
5381}
5382
5383pub fn plan_drop_objects(
5384    scx: &mut StatementContext,
5385    DropObjectsStatement {
5386        object_type,
5387        if_exists,
5388        names,
5389        cascade,
5390    }: DropObjectsStatement,
5391) -> Result<Plan, PlanError> {
5392    if object_type == mz_sql_parser::ast::ObjectType::Func {
5393        bail_unsupported!("DROP FUNCTION");
5394    }
5395    let object_type = object_type.into();
5396
5397    let mut referenced_ids = Vec::new();
5398    for name in names {
5399        let id = match &name {
5400            UnresolvedObjectName::Cluster(name) => {
5401                plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5402            }
5403            UnresolvedObjectName::ClusterReplica(name) => {
5404                plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5405            }
5406            UnresolvedObjectName::Database(name) => {
5407                plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5408            }
5409            UnresolvedObjectName::Schema(name) => {
5410                plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5411            }
5412            UnresolvedObjectName::Role(name) => {
5413                plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5414            }
5415            UnresolvedObjectName::Item(name) => {
5416                // Defer the dependency check until all names are resolved, so a
5417                // dependent that is itself being dropped in this same statement
5418                // does not block a non-cascade drop.
5419                plan_drop_item_name(scx, object_type, if_exists, name.clone())?.map(ObjectId::Item)
5420            }
5421            UnresolvedObjectName::NetworkPolicy(name) => {
5422                plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5423            }
5424        };
5425        match id {
5426            Some(id) => referenced_ids.push(id),
5427            None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5428                name: name.to_ast_string_simple(),
5429                object_type,
5430            }),
5431        }
5432    }
5433
5434    // Now that the full set of explicitly-named items is known, run the
5435    // non-cascade dependency check. A dependent that is itself being dropped in
5436    // this statement does not block the drop, matching PostgreSQL.
5437    if !cascade {
5438        let dropped_items: BTreeSet<CatalogItemId> = referenced_ids
5439            .iter()
5440            .filter_map(|id| match id {
5441                ObjectId::Item(id) => Some(*id),
5442                _ => None,
5443            })
5444            .collect();
5445        for id in &dropped_items {
5446            let catalog_item = scx.catalog.get_item(id);
5447            ensure_no_blocking_dependents(scx, object_type, catalog_item, &dropped_items)?;
5448        }
5449    }
5450
5451    let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5452
5453    Ok(Plan::DropObjects(DropObjectsPlan {
5454        referenced_ids,
5455        drop_ids,
5456        object_type,
5457    }))
5458}
5459
5460fn plan_drop_schema(
5461    scx: &StatementContext,
5462    if_exists: bool,
5463    name: &UnresolvedSchemaName,
5464    cascade: bool,
5465) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5466    // Special case for mz_temp: with lazy temporary schema creation, the temp
5467    // schema may not exist yet, but we still need to return the correct error.
5468    // Check the schema name directly against MZ_TEMP_SCHEMA.
5469    let normalized = normalize::unresolved_schema_name(name.clone())?;
5470    if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
5471        sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5472    }
5473
5474    Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5475        Some((database_spec, schema_spec)) => {
5476            if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5477                sql_bail!(
5478                    "cannot drop schema {name} because it is required by the database system",
5479                );
5480            }
5481            if let SchemaSpecifier::Temporary = schema_spec {
5482                sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5483            }
5484            let schema = scx.get_schema(&database_spec, &schema_spec);
5485            if !cascade && schema.has_items() {
5486                let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5487                sql_bail!(
5488                    "schema '{}' cannot be dropped without CASCADE while it contains objects",
5489                    full_schema_name
5490                );
5491            }
5492            Some((database_spec, schema_spec))
5493        }
5494        None => None,
5495    })
5496}
5497
5498fn plan_drop_role(
5499    scx: &StatementContext,
5500    if_exists: bool,
5501    name: &Ident,
5502) -> Result<Option<RoleId>, PlanError> {
5503    match scx.catalog.resolve_role(name.as_str()) {
5504        Ok(role) => {
5505            let id = role.id();
5506            if &id == scx.catalog.active_role_id() {
5507                sql_bail!("current role cannot be dropped");
5508            }
5509            for role in scx.catalog.get_roles() {
5510                for (member_id, grantor_id) in role.membership() {
5511                    if &id == grantor_id {
5512                        let member_role = scx.catalog.get_role(member_id);
5513                        sql_bail!(
5514                            "cannot drop role {}: still depended up by membership of role {} in role {}",
5515                            name.as_str(),
5516                            role.name(),
5517                            member_role.name()
5518                        );
5519                    }
5520                }
5521            }
5522            Ok(Some(role.id()))
5523        }
5524        Err(_) if if_exists => Ok(None),
5525        Err(e) => Err(e.into()),
5526    }
5527}
5528
5529fn plan_drop_cluster(
5530    scx: &StatementContext,
5531    if_exists: bool,
5532    name: &Ident,
5533    cascade: bool,
5534) -> Result<Option<ClusterId>, PlanError> {
5535    Ok(match resolve_cluster(scx, name, if_exists)? {
5536        Some(cluster) => {
5537            if !cascade && !cluster.bound_objects().is_empty() {
5538                return Err(PlanError::DependentObjectsStillExist {
5539                    object_type: "cluster".to_string(),
5540                    object_name: cluster.name().to_string(),
5541                    dependents: Vec::new(),
5542                });
5543            }
5544            Some(cluster.id())
5545        }
5546        None => None,
5547    })
5548}
5549
5550fn plan_drop_network_policy(
5551    scx: &StatementContext,
5552    if_exists: bool,
5553    name: &Ident,
5554) -> Result<Option<NetworkPolicyId>, PlanError> {
5555    match scx.catalog.resolve_network_policy(name.as_str()) {
5556        Ok(policy) => {
5557            // TODO(network_policy): When we support role based network policies, check if any role
5558            // currently has the specified policy set.
5559            if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5560                Err(PlanError::NetworkPolicyInUse)
5561            } else {
5562                Ok(Some(policy.id()))
5563            }
5564        }
5565        Err(_) if if_exists => Ok(None),
5566        Err(e) => Err(e.into()),
5567    }
5568}
5569
5570fn plan_drop_cluster_replica(
5571    scx: &StatementContext,
5572    if_exists: bool,
5573    name: &QualifiedReplica,
5574) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5575    let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5576    Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5577}
5578
5579/// Returns the [`CatalogItemId`] of the item we should drop, if it exists.
5580fn plan_drop_item(
5581    scx: &StatementContext,
5582    object_type: ObjectType,
5583    if_exists: bool,
5584    name: UnresolvedItemName,
5585    cascade: bool,
5586) -> Result<Option<CatalogItemId>, PlanError> {
5587    let Some(id) = plan_drop_item_name(scx, object_type, if_exists, name)? else {
5588        return Ok(None);
5589    };
5590    if !cascade {
5591        let catalog_item = scx.catalog.get_item(&id);
5592        ensure_no_blocking_dependents(scx, object_type, catalog_item, &BTreeSet::new())?;
5593    }
5594    Ok(Some(id))
5595}
5596
5597/// Resolves `name` to the [`CatalogItemId`] of the item to drop, performing the
5598/// system-object check but *not* the dependency check. Returns `None` if the
5599/// item does not exist and `if_exists` is set.
5600fn plan_drop_item_name(
5601    scx: &StatementContext,
5602    object_type: ObjectType,
5603    if_exists: bool,
5604    name: UnresolvedItemName,
5605) -> Result<Option<CatalogItemId>, PlanError> {
5606    let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5607        Ok(r) => r,
5608        // Return a more helpful error on `DROP VIEW <materialized-view>`.
5609        Err(PlanError::MismatchedObjectType {
5610            name,
5611            is_type: ObjectType::MaterializedView,
5612            expected_type: ObjectType::View,
5613        }) => {
5614            return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5615        }
5616        e => e?,
5617    };
5618
5619    Ok(match resolved {
5620        Some(catalog_item) => {
5621            if catalog_item.id().is_system() {
5622                sql_bail!(
5623                    "cannot drop {} {} because it is required by the database system",
5624                    catalog_item.item_type(),
5625                    scx.catalog.minimal_qualification(catalog_item.name()),
5626                );
5627            }
5628            Some(catalog_item.id())
5629        }
5630        None => None,
5631    })
5632}
5633
5634/// Errors if dropping `catalog_item` would leave a dangling dependent, i.e. an
5635/// object that depends on it and is not itself being dropped. Dependents whose
5636/// ids are in `also_dropped` are ignored, since they are being dropped as part
5637/// of the same statement.
5638fn ensure_no_blocking_dependents(
5639    scx: &StatementContext,
5640    object_type: ObjectType,
5641    catalog_item: &dyn CatalogItem,
5642    also_dropped: &BTreeSet<CatalogItemId>,
5643) -> Result<(), PlanError> {
5644    for id in catalog_item.used_by() {
5645        if also_dropped.contains(id) {
5646            continue;
5647        }
5648        let dep = scx.catalog.get_item(id);
5649        if dependency_prevents_drop(object_type, dep) {
5650            return Err(PlanError::DependentObjectsStillExist {
5651                object_type: catalog_item.item_type().to_string(),
5652                object_name: scx
5653                    .catalog
5654                    .minimal_qualification(catalog_item.name())
5655                    .to_string(),
5656                dependents: vec![(
5657                    dep.item_type().to_string(),
5658                    scx.catalog.minimal_qualification(dep.name()).to_string(),
5659                )],
5660            });
5661        }
5662    }
5663    // TODO(jkosh44) It would be nice to also check if any active subscribe or pending peek
5664    //  relies on entry. Unfortunately, we don't have that information readily available.
5665    Ok(())
5666}
5667
5668/// Does the dependency `dep` prevent a drop of a non-cascade query?
5669fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5670    match object_type {
5671        ObjectType::Type => true,
5672        ObjectType::Table
5673        | ObjectType::View
5674        | ObjectType::MaterializedView
5675        | ObjectType::Source
5676        | ObjectType::Sink
5677        | ObjectType::Index
5678        | ObjectType::Role
5679        | ObjectType::Cluster
5680        | ObjectType::ClusterReplica
5681        | ObjectType::Secret
5682        | ObjectType::Connection
5683        | ObjectType::Database
5684        | ObjectType::Schema
5685        | ObjectType::Func
5686        | ObjectType::NetworkPolicy => match dep.item_type() {
5687            CatalogItemType::Func
5688            | CatalogItemType::Table
5689            | CatalogItemType::Source
5690            | CatalogItemType::View
5691            | CatalogItemType::MaterializedView
5692            | CatalogItemType::Sink
5693            | CatalogItemType::Type
5694            | CatalogItemType::Secret
5695            | CatalogItemType::Connection => true,
5696            CatalogItemType::Index => false,
5697        },
5698    }
5699}
5700
5701pub fn describe_alter_index_options(
5702    _: &StatementContext,
5703    _: AlterIndexStatement<Aug>,
5704) -> Result<StatementDesc, PlanError> {
5705    Ok(StatementDesc::new(None))
5706}
5707
5708pub fn describe_drop_owned(
5709    _: &StatementContext,
5710    _: DropOwnedStatement<Aug>,
5711) -> Result<StatementDesc, PlanError> {
5712    Ok(StatementDesc::new(None))
5713}
5714
5715pub fn plan_drop_owned(
5716    scx: &StatementContext,
5717    drop: DropOwnedStatement<Aug>,
5718) -> Result<Plan, PlanError> {
5719    let cascade = drop.cascade();
5720    let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5721    let mut drop_ids = Vec::new();
5722    let mut privilege_revokes = Vec::new();
5723    let mut default_privilege_revokes = Vec::new();
5724
5725    fn update_privilege_revokes(
5726        object_id: SystemObjectId,
5727        privileges: &PrivilegeMap,
5728        role_ids: &BTreeSet<RoleId>,
5729        privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5730    ) {
5731        privilege_revokes.extend(iter::zip(
5732            iter::repeat(object_id),
5733            privileges
5734                .all_values()
5735                .filter(|privilege| role_ids.contains(&privilege.grantee))
5736                .cloned(),
5737        ));
5738    }
5739
5740    // Replicas
5741    for replica in scx.catalog.get_cluster_replicas() {
5742        if role_ids.contains(&replica.owner_id()) {
5743            drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5744        }
5745    }
5746
5747    // Clusters
5748    for cluster in scx.catalog.get_clusters() {
5749        if role_ids.contains(&cluster.owner_id()) {
5750            // Note: CASCADE is not required for replicas.
5751            if !cascade {
5752                let non_owned_bound_objects: Vec<_> = cluster
5753                    .bound_objects()
5754                    .into_iter()
5755                    .map(|item_id| scx.catalog.get_item(item_id))
5756                    .filter(|item| !role_ids.contains(&item.owner_id()))
5757                    .collect();
5758                if !non_owned_bound_objects.is_empty() {
5759                    let names: Vec<_> = non_owned_bound_objects
5760                        .into_iter()
5761                        .map(|item| {
5762                            (
5763                                item.item_type().to_string(),
5764                                scx.catalog.resolve_full_name(item.name()).to_string(),
5765                            )
5766                        })
5767                        .collect();
5768                    return Err(PlanError::DependentObjectsStillExist {
5769                        object_type: "cluster".to_string(),
5770                        object_name: cluster.name().to_string(),
5771                        dependents: names,
5772                    });
5773                }
5774            }
5775            drop_ids.push(cluster.id().into());
5776        }
5777        update_privilege_revokes(
5778            SystemObjectId::Object(cluster.id().into()),
5779            cluster.privileges(),
5780            &role_ids,
5781            &mut privilege_revokes,
5782        );
5783    }
5784
5785    // Items
5786    for item in scx.catalog.get_items() {
5787        if role_ids.contains(&item.owner_id()) {
5788            if !cascade {
5789                // Checks if any items still depend on this one, returning an error if so.
5790                let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5791                    let non_owned_dependencies: Vec<_> = used_by
5792                        .into_iter()
5793                        .map(|item_id| scx.catalog.get_item(item_id))
5794                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5795                        .filter(|item| !role_ids.contains(&item.owner_id()))
5796                        .collect();
5797                    if !non_owned_dependencies.is_empty() {
5798                        let names: Vec<_> = non_owned_dependencies
5799                            .into_iter()
5800                            .map(|item| {
5801                                let item_typ = item.item_type().to_string();
5802                                let item_name =
5803                                    scx.catalog.resolve_full_name(item.name()).to_string();
5804                                (item_typ, item_name)
5805                            })
5806                            .collect();
5807                        Err(PlanError::DependentObjectsStillExist {
5808                            object_type: item.item_type().to_string(),
5809                            object_name: scx
5810                                .catalog
5811                                .resolve_full_name(item.name())
5812                                .to_string()
5813                                .to_string(),
5814                            dependents: names,
5815                        })
5816                    } else {
5817                        Ok(())
5818                    }
5819                };
5820
5821                // When this item gets dropped it will also drop its progress source, so we need to
5822                // check the users of those.
5823                if let Some(id) = item.progress_id() {
5824                    let progress_item = scx.catalog.get_item(&id);
5825                    check_if_dependents_exist(progress_item.used_by())?;
5826                }
5827                check_if_dependents_exist(item.used_by())?;
5828            }
5829            drop_ids.push(item.id().into());
5830        }
5831        update_privilege_revokes(
5832            SystemObjectId::Object(item.id().into()),
5833            item.privileges(),
5834            &role_ids,
5835            &mut privilege_revokes,
5836        );
5837    }
5838
5839    // Schemas
5840    for schema in scx.catalog.get_schemas() {
5841        if !schema.id().is_temporary() {
5842            if role_ids.contains(&schema.owner_id()) {
5843                if !cascade {
5844                    let non_owned_dependencies: Vec<_> = schema
5845                        .item_ids()
5846                        .map(|item_id| scx.catalog.get_item(&item_id))
5847                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5848                        .filter(|item| !role_ids.contains(&item.owner_id()))
5849                        .collect();
5850                    if !non_owned_dependencies.is_empty() {
5851                        let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5852                        sql_bail!(
5853                            "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5854                            full_schema_name.to_string().quoted()
5855                        );
5856                    }
5857                }
5858                drop_ids.push((*schema.database(), *schema.id()).into())
5859            }
5860            update_privilege_revokes(
5861                SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5862                schema.privileges(),
5863                &role_ids,
5864                &mut privilege_revokes,
5865            );
5866        }
5867    }
5868
5869    // Databases
5870    for database in scx.catalog.get_databases() {
5871        if role_ids.contains(&database.owner_id()) {
5872            if !cascade {
5873                let non_owned_schemas: Vec<_> = database
5874                    .schemas()
5875                    .into_iter()
5876                    .filter(|schema| !role_ids.contains(&schema.owner_id()))
5877                    .collect();
5878                if !non_owned_schemas.is_empty() {
5879                    sql_bail!(
5880                        "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5881                        database.name().quoted(),
5882                    );
5883                }
5884            }
5885            drop_ids.push(database.id().into());
5886        }
5887        update_privilege_revokes(
5888            SystemObjectId::Object(database.id().into()),
5889            database.privileges(),
5890            &role_ids,
5891            &mut privilege_revokes,
5892        );
5893    }
5894
5895    // Network policies
5896    for network_policy in scx.catalog.get_network_policies() {
5897        if role_ids.contains(&network_policy.owner_id()) {
5898            drop_ids.push(ObjectId::NetworkPolicy(network_policy.id()));
5899        }
5900        update_privilege_revokes(
5901            SystemObjectId::Object(ObjectId::NetworkPolicy(network_policy.id())),
5902            network_policy.privileges(),
5903            &role_ids,
5904            &mut privilege_revokes,
5905        );
5906    }
5907
5908    // System
5909    update_privilege_revokes(
5910        SystemObjectId::System,
5911        scx.catalog.get_system_privileges(),
5912        &role_ids,
5913        &mut privilege_revokes,
5914    );
5915
5916    for (default_privilege_object, default_privilege_acl_items) in
5917        scx.catalog.get_default_privileges()
5918    {
5919        for default_privilege_acl_item in default_privilege_acl_items {
5920            if role_ids.contains(&default_privilege_object.role_id)
5921                || role_ids.contains(&default_privilege_acl_item.grantee)
5922            {
5923                default_privilege_revokes.push((
5924                    default_privilege_object.clone(),
5925                    default_privilege_acl_item.clone(),
5926                ));
5927            }
5928        }
5929    }
5930
5931    let drop_ids = scx.catalog.object_dependents(&drop_ids);
5932
5933    let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5934    if !system_ids.is_empty() {
5935        let mut owners = system_ids
5936            .into_iter()
5937            .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5938            .collect::<BTreeSet<_>>()
5939            .into_iter()
5940            .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5941        sql_bail!(
5942            "cannot drop objects owned by role {} because they are required by the database system",
5943            owners.join(", "),
5944        );
5945    }
5946
5947    Ok(Plan::DropOwned(DropOwnedPlan {
5948        role_ids: role_ids.into_iter().collect(),
5949        drop_ids,
5950        privilege_revokes,
5951        default_privilege_revokes,
5952    }))
5953}
5954
5955fn plan_retain_history_option(
5956    scx: &StatementContext,
5957    retain_history: Option<OptionalDuration>,
5958) -> Result<Option<CompactionWindow>, PlanError> {
5959    if let Some(OptionalDuration(lcw)) = retain_history {
5960        Ok(Some(plan_retain_history(scx, lcw)?))
5961    } else {
5962        Ok(None)
5963    }
5964}
5965
5966// Convert a specified RETAIN HISTORY option into a compaction window. `None` corresponds to
5967// `DisableCompaction`. A zero duration will error. This is because the `OptionalDuration` type
5968// already converts the zero duration into `None`. This function must not be called in the `RESET
5969// (RETAIN HISTORY)` path, which should be handled by the outer `Option<OptionalDuration>` being
5970// `None`.
5971fn plan_retain_history(
5972    scx: &StatementContext,
5973    lcw: Option<Duration>,
5974) -> Result<CompactionWindow, PlanError> {
5975    scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
5976    match lcw {
5977        // A zero duration has already been converted to `None` by `OptionalDuration` (and means
5978        // disable compaction), and should never occur here. Furthermore, some things actually do
5979        // break when this is set to real zero:
5980        // https://github.com/MaterializeInc/database-issues/issues/3798.
5981        Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
5982            option_name: "RETAIN HISTORY".to_string(),
5983            err: Box::new(PlanError::Unstructured(
5984                "internal error: unexpectedly zero".to_string(),
5985            )),
5986        }),
5987        Some(duration) => {
5988            // Error if the duration is low and enable_unlimited_retain_history is not set (which
5989            // should only be possible during testing).
5990            if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
5991                && scx
5992                    .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5993                    .is_err()
5994            {
5995                return Err(PlanError::RetainHistoryLow {
5996                    limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
5997                });
5998            }
5999            Ok(duration.try_into()?)
6000        }
6001        // In the past `RETAIN HISTORY FOR '0'` meant disable compaction. Disabling compaction seems
6002        // to be a bad choice, so prevent it.
6003        None => {
6004            if scx
6005                .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
6006                .is_err()
6007            {
6008                Err(PlanError::RetainHistoryRequired)
6009            } else {
6010                Ok(CompactionWindow::DisableCompaction)
6011            }
6012        }
6013    }
6014}
6015
6016generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
6017
6018fn plan_index_options(
6019    scx: &StatementContext,
6020    with_opts: Vec<IndexOption<Aug>>,
6021) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
6022    if !with_opts.is_empty() {
6023        // Index options are not durable.
6024        scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
6025    }
6026
6027    let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
6028
6029    let mut out = Vec::with_capacity(1);
6030    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6031        out.push(crate::plan::IndexOption::RetainHistory(cw));
6032    }
6033    Ok(out)
6034}
6035
6036generate_extracted_config!(
6037    TableOption,
6038    (PartitionBy, Vec<Ident>),
6039    (RetainHistory, OptionalDuration),
6040    (RedactedTest, String)
6041);
6042
6043fn plan_table_options(
6044    scx: &StatementContext,
6045    desc: &RelationDesc,
6046    with_opts: Vec<TableOption<Aug>>,
6047) -> Result<Vec<crate::plan::TableOption>, PlanError> {
6048    let TableOptionExtracted {
6049        partition_by,
6050        retain_history,
6051        redacted_test,
6052        ..
6053    }: TableOptionExtracted = with_opts.try_into()?;
6054
6055    if let Some(partition_by) = partition_by {
6056        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
6057        check_partition_by(desc, partition_by)?;
6058    }
6059
6060    if redacted_test.is_some() {
6061        scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
6062    }
6063
6064    let mut out = Vec::with_capacity(1);
6065    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6066        out.push(crate::plan::TableOption::RetainHistory(cw));
6067    }
6068    Ok(out)
6069}
6070
6071pub fn plan_alter_index_options(
6072    scx: &mut StatementContext,
6073    AlterIndexStatement {
6074        index_name,
6075        if_exists,
6076        action,
6077    }: AlterIndexStatement<Aug>,
6078) -> Result<Plan, PlanError> {
6079    let object_type = ObjectType::Index;
6080    match action {
6081        AlterIndexAction::ResetOptions(options) => {
6082            let mut options = options.into_iter();
6083            if let Some(opt) = options.next() {
6084                match opt {
6085                    IndexOptionName::RetainHistory => {
6086                        if options.next().is_some() {
6087                            sql_bail!("RETAIN HISTORY must be only option");
6088                        }
6089                        return alter_retain_history(
6090                            scx,
6091                            object_type,
6092                            if_exists,
6093                            UnresolvedObjectName::Item(index_name),
6094                            None,
6095                        );
6096                    }
6097                }
6098            }
6099            sql_bail!("expected option");
6100        }
6101        AlterIndexAction::SetOptions(options) => {
6102            let mut options = options.into_iter();
6103            if let Some(opt) = options.next() {
6104                match opt.name {
6105                    IndexOptionName::RetainHistory => {
6106                        if options.next().is_some() {
6107                            sql_bail!("RETAIN HISTORY must be only option");
6108                        }
6109                        return alter_retain_history(
6110                            scx,
6111                            object_type,
6112                            if_exists,
6113                            UnresolvedObjectName::Item(index_name),
6114                            opt.value,
6115                        );
6116                    }
6117                }
6118            }
6119            sql_bail!("expected option");
6120        }
6121    }
6122}
6123
6124pub fn describe_alter_cluster_set_options(
6125    _: &StatementContext,
6126    _: AlterClusterStatement<Aug>,
6127) -> Result<StatementDesc, PlanError> {
6128    Ok(StatementDesc::new(None))
6129}
6130
6131pub fn plan_alter_cluster(
6132    scx: &mut StatementContext,
6133    AlterClusterStatement {
6134        name,
6135        action,
6136        if_exists,
6137    }: AlterClusterStatement<Aug>,
6138) -> Result<Plan, PlanError> {
6139    let cluster = match resolve_cluster(scx, &name, if_exists)? {
6140        Some(entry) => entry,
6141        None => {
6142            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6143                name: name.to_ast_string_simple(),
6144                object_type: ObjectType::Cluster,
6145            });
6146
6147            return Ok(Plan::AlterNoop(AlterNoopPlan {
6148                object_type: ObjectType::Cluster,
6149            }));
6150        }
6151    };
6152
6153    let mut options: PlanClusterOption = Default::default();
6154    let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
6155
6156    match action {
6157        AlterClusterAction::SetOptions {
6158            options: set_options,
6159            with_options,
6160        } => {
6161            let ClusterOptionExtracted {
6162                availability_zones,
6163                introspection_debugging,
6164                introspection_interval,
6165                managed,
6166                replicas: replica_defs,
6167                replication_factor,
6168                seen: _,
6169                size,
6170                disk,
6171                schedule,
6172                workload_class,
6173            }: ClusterOptionExtracted = set_options.try_into()?;
6174
6175            if !scx.catalog.active_role_id().is_system() {
6176                if workload_class.is_some() {
6177                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6178                }
6179            }
6180
6181            match managed.unwrap_or_else(|| cluster.is_managed()) {
6182                true => {
6183                    let alter_strategy_extracted =
6184                        ClusterAlterOptionExtracted::try_from(with_options)?;
6185                    alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6186
6187                    match alter_strategy {
6188                        AlterClusterPlanStrategy::None => {}
6189                        _ => {
6190                            scx.require_feature_flag(
6191                                &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6192                            )?;
6193                        }
6194                    }
6195
6196                    if replica_defs.is_some() {
6197                        sql_bail!("REPLICAS not supported for managed clusters");
6198                    }
6199                    if schedule.is_some()
6200                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6201                    {
6202                        scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6203
6204                        // A cluster with a non-MANUAL schedule is automatically turned On/Off by
6205                        // the cluster scheduling policy, which means its replication factor is
6206                        // always 0 or 1. If the cluster currently has a higher replication factor
6207                        // and the user is not lowering it in the same statement (which would be
6208                        // rejected just below), then reject the schedule change: otherwise we'd
6209                        // leave the cluster in an invalid state with both a non-MANUAL schedule and
6210                        // a replication factor > 1 (which would, e.g., make SHOW CREATE CLUSTER
6211                        // panic).
6212                        if replication_factor.is_none()
6213                            && cluster.replication_factor().is_some_and(|rf| rf > 1)
6214                        {
6215                            sql_bail!(
6216                                "SCHEDULE cannot be set to anything other than MANUAL while the \
6217                                cluster's REPLICATION FACTOR is greater than 1; \
6218                                set the REPLICATION FACTOR to 1 first"
6219                            );
6220                        }
6221                    }
6222
6223                    if replication_factor.is_some() {
6224                        if schedule.is_some()
6225                            && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6226                        {
6227                            sql_bail!(
6228                                "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6229                            );
6230                        }
6231                        if let Some(current_schedule) = cluster.schedule() {
6232                            if !matches!(current_schedule, ClusterSchedule::Manual) {
6233                                sql_bail!(
6234                                    "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6235                                );
6236                            }
6237                        }
6238                    }
6239                }
6240                false => {
6241                    if !alter_strategy.is_none() {
6242                        sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6243                    }
6244                    if availability_zones.is_some() {
6245                        sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6246                    }
6247                    if replication_factor.is_some() {
6248                        sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6249                    }
6250                    if introspection_debugging.is_some() {
6251                        sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6252                    }
6253                    if introspection_interval.is_some() {
6254                        sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6255                    }
6256                    if size.is_some() {
6257                        sql_bail!("SIZE not supported for unmanaged clusters");
6258                    }
6259                    if disk.is_some() {
6260                        sql_bail!("DISK not supported for unmanaged clusters");
6261                    }
6262                    if schedule.is_some()
6263                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6264                    {
6265                        sql_bail!(
6266                            "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6267                        );
6268                    }
6269                    if let Some(current_schedule) = cluster.schedule() {
6270                        if !matches!(current_schedule, ClusterSchedule::Manual)
6271                            && schedule.is_none()
6272                        {
6273                            sql_bail!(
6274                                "when switching a cluster to unmanaged, if the managed \
6275                                cluster's SCHEDULE is anything other than MANUAL, you have to \
6276                                explicitly set the SCHEDULE to MANUAL"
6277                            );
6278                        }
6279                    }
6280                }
6281            }
6282
6283            let mut replicas = vec![];
6284            for ReplicaDefinition { name, options } in
6285                replica_defs.into_iter().flat_map(Vec::into_iter)
6286            {
6287                replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6288            }
6289
6290            if let Some(managed) = managed {
6291                options.managed = AlterOptionParameter::Set(managed);
6292            }
6293            if let Some(replication_factor) = replication_factor {
6294                options.replication_factor = AlterOptionParameter::Set(replication_factor);
6295            }
6296            if let Some(size) = &size {
6297                options.size = AlterOptionParameter::Set(size.clone());
6298            }
6299            if let Some(availability_zones) = availability_zones {
6300                options.availability_zones = AlterOptionParameter::Set(availability_zones);
6301            }
6302            if let Some(introspection_debugging) = introspection_debugging {
6303                options.introspection_debugging =
6304                    AlterOptionParameter::Set(introspection_debugging);
6305            }
6306            if let Some(introspection_interval) = introspection_interval {
6307                options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6308            }
6309            if disk.is_some() {
6310                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
6311                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
6312                // we'll be able to remove the `DISK` option entirely.
6313                let size = match size.as_deref() {
6314                    Some(s) => s,
6315                    None => cluster
6316                        .managed_size()
6317                        .ok_or_else(|| sql_err!("cluster is not managed"))?,
6318                };
6319                if scx.catalog.is_cluster_size_cc(size) {
6320                    sql_bail!(
6321                        "DISK option not supported for modern cluster sizes because disk is always enabled"
6322                    );
6323                }
6324
6325                scx.catalog
6326                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6327            }
6328            if !replicas.is_empty() {
6329                options.replicas = AlterOptionParameter::Set(replicas);
6330            }
6331            if let Some(schedule) = schedule {
6332                options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6333            }
6334            if let Some(workload_class) = workload_class {
6335                options.workload_class = AlterOptionParameter::Set(workload_class.0);
6336            }
6337        }
6338        AlterClusterAction::ResetOptions(reset_options) => {
6339            use AlterOptionParameter::Reset;
6340            use ClusterOptionName::*;
6341
6342            if !scx.catalog.active_role_id().is_system() {
6343                if reset_options.contains(&WorkloadClass) {
6344                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6345                }
6346            }
6347
6348            for option in reset_options {
6349                match option {
6350                    AvailabilityZones => options.availability_zones = Reset,
6351                    Disk => scx
6352                        .catalog
6353                        .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6354                    IntrospectionInterval => options.introspection_interval = Reset,
6355                    IntrospectionDebugging => options.introspection_debugging = Reset,
6356                    Managed => options.managed = Reset,
6357                    Replicas => options.replicas = Reset,
6358                    ReplicationFactor => options.replication_factor = Reset,
6359                    Size => options.size = Reset,
6360                    Schedule => options.schedule = Reset,
6361                    WorkloadClass => options.workload_class = Reset,
6362                }
6363            }
6364        }
6365    }
6366    Ok(Plan::AlterCluster(AlterClusterPlan {
6367        id: cluster.id(),
6368        name: cluster.name().to_string(),
6369        options,
6370        strategy: alter_strategy,
6371    }))
6372}
6373
6374pub fn describe_alter_set_cluster(
6375    _: &StatementContext,
6376    _: AlterSetClusterStatement<Aug>,
6377) -> Result<StatementDesc, PlanError> {
6378    Ok(StatementDesc::new(None))
6379}
6380
6381pub fn plan_alter_item_set_cluster(
6382    scx: &StatementContext,
6383    AlterSetClusterStatement {
6384        if_exists,
6385        set_cluster: in_cluster_name,
6386        name,
6387        object_type,
6388    }: AlterSetClusterStatement<Aug>,
6389) -> Result<Plan, PlanError> {
6390    scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6391
6392    let object_type = object_type.into();
6393
6394    // Prevent access to `SET CLUSTER` for unsupported objects.
6395    match object_type {
6396        ObjectType::MaterializedView => {}
6397        ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6398            bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6399        }
6400        ObjectType::Table
6401        | ObjectType::View
6402        | ObjectType::Type
6403        | ObjectType::Role
6404        | ObjectType::Cluster
6405        | ObjectType::ClusterReplica
6406        | ObjectType::Secret
6407        | ObjectType::Connection
6408        | ObjectType::Database
6409        | ObjectType::Schema
6410        | ObjectType::Func
6411        | ObjectType::NetworkPolicy => {
6412            bail_never_supported!(
6413                format!("ALTER {object_type} SET CLUSTER"),
6414                "sql/alter-set-cluster/",
6415                format!("{object_type} has no associated cluster")
6416            )
6417        }
6418    }
6419
6420    let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6421
6422    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6423        Some(entry) => {
6424            let current_cluster = entry.cluster_id();
6425            let Some(current_cluster) = current_cluster else {
6426                sql_bail!("No cluster associated with {name}");
6427            };
6428
6429            if current_cluster == in_cluster.id() {
6430                Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6431            } else {
6432                Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6433                    id: entry.id(),
6434                    set_cluster: in_cluster.id(),
6435                }))
6436            }
6437        }
6438        None => {
6439            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6440                name: name.to_ast_string_simple(),
6441                object_type,
6442            });
6443
6444            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6445        }
6446    }
6447}
6448
6449pub fn describe_alter_object_rename(
6450    _: &StatementContext,
6451    _: AlterObjectRenameStatement,
6452) -> Result<StatementDesc, PlanError> {
6453    Ok(StatementDesc::new(None))
6454}
6455
6456pub fn plan_alter_object_rename(
6457    scx: &mut StatementContext,
6458    AlterObjectRenameStatement {
6459        name,
6460        object_type,
6461        to_item_name,
6462        if_exists,
6463    }: AlterObjectRenameStatement,
6464) -> Result<Plan, PlanError> {
6465    let object_type = object_type.into();
6466    match (object_type, name) {
6467        (
6468            ObjectType::View
6469            | ObjectType::MaterializedView
6470            | ObjectType::Table
6471            | ObjectType::Source
6472            | ObjectType::Index
6473            | ObjectType::Sink
6474            | ObjectType::Secret
6475            | ObjectType::Connection,
6476            UnresolvedObjectName::Item(name),
6477        ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6478        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6479            plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6480        }
6481        (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6482            plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6483        }
6484        (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6485            plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6486        }
6487        (object_type, name) => {
6488            // The earlier dispatch + name resolution should make this
6489            // combination impossible.
6490            bail_internal!("invalid object type '{object_type}' for ALTER RENAME with name {name}")
6491        }
6492    }
6493}
6494
6495pub fn plan_alter_schema_rename(
6496    scx: &mut StatementContext,
6497    name: UnresolvedSchemaName,
6498    to_schema_name: Ident,
6499    if_exists: bool,
6500) -> Result<Plan, PlanError> {
6501    // Special case for mz_temp: with lazy temporary schema creation, the temp
6502    // schema may not exist yet, but we still need to return the correct error.
6503    // Check the schema name directly against MZ_TEMP_SCHEMA.
6504    let normalized = normalize::unresolved_schema_name(name.clone())?;
6505    if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6506        sql_bail!(
6507            "cannot rename schemas in the ambient database: {:?}",
6508            mz_repr::namespaces::MZ_TEMP_SCHEMA
6509        );
6510    }
6511
6512    let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6513        let object_type = ObjectType::Schema;
6514        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6515            name: name.to_ast_string_simple(),
6516            object_type,
6517        });
6518        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6519    };
6520
6521    // Make sure the name is unique.
6522    if scx
6523        .resolve_schema_in_database(&db_spec, &to_schema_name)
6524        .is_ok()
6525    {
6526        return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6527            to_schema_name.clone().into_string(),
6528        )));
6529    }
6530
6531    // Prevent users from renaming system related schemas.
6532    let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6533    if schema.id().is_system() {
6534        bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6535    }
6536
6537    Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6538        cur_schema_spec: (db_spec, schema_spec),
6539        new_schema_name: to_schema_name.into_string(),
6540    }))
6541}
6542
6543pub fn plan_alter_schema_swap<F>(
6544    scx: &mut StatementContext,
6545    name_a: UnresolvedSchemaName,
6546    name_b: Ident,
6547    if_exists: bool,
6548    gen_temp_suffix: F,
6549) -> Result<Plan, PlanError>
6550where
6551    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6552{
6553    // Special case for mz_temp: with lazy temporary schema creation, the temp
6554    // schema may not exist yet, but we still need to return the correct error.
6555    // Check the schema name directly against MZ_TEMP_SCHEMA.
6556    let normalized_a = normalize::unresolved_schema_name(name_a.clone())?;
6557    if normalized_a.database.is_none() && normalized_a.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA
6558    {
6559        sql_bail!("cannot swap schemas that are in the ambient database");
6560    }
6561    // Also check name_b (the target schema name)
6562    let name_b_str = normalize::ident_ref(&name_b);
6563    if name_b_str == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6564        sql_bail!("cannot swap schemas that are in the ambient database");
6565    }
6566
6567    let schema_a = match scx.resolve_schema(name_a.clone()) {
6568        Ok(schema) => schema,
6569        Err(_) if if_exists => {
6570            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6571                name: name_a.to_ast_string_simple(),
6572                object_type: ObjectType::Schema,
6573            });
6574            return Ok(Plan::AlterNoop(AlterNoopPlan {
6575                object_type: ObjectType::Schema,
6576            }));
6577        }
6578        Err(e) => return Err(e),
6579    };
6580
6581    let db_spec = schema_a.database().clone();
6582    if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6583        sql_bail!("cannot swap schemas that are in the ambient database");
6584    };
6585    let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6586
6587    // We cannot swap system schemas.
6588    if schema_a.id().is_system() || schema_b.id().is_system() {
6589        bail_never_supported!("swapping a system schema".to_string())
6590    }
6591
6592    // Generate a temporary name we can swap schema_a to.
6593    //
6594    // 'check' returns if the temp schema name would be valid.
6595    const SCHEMA_SWAP_PREFIX: &str = "mz_schema_swap_";
6596    let check = |temp_suffix: &str| {
6597        let mut temp_name = ident!(SCHEMA_SWAP_PREFIX);
6598        temp_name.append_lossy(temp_suffix);
6599        scx.resolve_schema_in_database(&db_spec, &temp_name)
6600            .is_err()
6601    };
6602    let temp_suffix = gen_temp_suffix(&check)?;
6603    let name_temp = format!("{SCHEMA_SWAP_PREFIX}{temp_suffix}");
6604
6605    Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6606        schema_a_spec: (*schema_a.database(), *schema_a.id()),
6607        schema_a_name: schema_a.name().schema.to_string(),
6608        schema_b_spec: (*schema_b.database(), *schema_b.id()),
6609        schema_b_name: schema_b.name().schema.to_string(),
6610        name_temp,
6611    }))
6612}
6613
6614pub fn plan_alter_item_rename(
6615    scx: &mut StatementContext,
6616    object_type: ObjectType,
6617    name: UnresolvedItemName,
6618    to_item_name: Ident,
6619    if_exists: bool,
6620) -> Result<Plan, PlanError> {
6621    let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6622        Ok(r) => r,
6623        // Return a more helpful error on `DROP VIEW <materialized-view>`.
6624        Err(PlanError::MismatchedObjectType {
6625            name,
6626            is_type: ObjectType::MaterializedView,
6627            expected_type: ObjectType::View,
6628        }) => {
6629            return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6630        }
6631        e => e?,
6632    };
6633
6634    match resolved {
6635        Some(entry) => {
6636            let full_name = scx.catalog.resolve_full_name(entry.name());
6637            let item_type = entry.item_type();
6638
6639            let proposed_name = QualifiedItemName {
6640                qualifiers: entry.name().qualifiers.clone(),
6641                item: to_item_name.clone().into_string(),
6642            };
6643
6644            // For PostgreSQL compatibility, items and types cannot have
6645            // overlapping names in a variety of situations. See the comment on
6646            // `CatalogItemType::conflicts_with_type` for details.
6647            let conflicting_type_exists;
6648            let conflicting_item_exists;
6649            if item_type == CatalogItemType::Type {
6650                conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6651                conflicting_item_exists = scx
6652                    .catalog
6653                    .get_item_by_name(&proposed_name)
6654                    .map(|item| item.item_type().conflicts_with_type())
6655                    .unwrap_or(false);
6656            } else {
6657                conflicting_type_exists = item_type.conflicts_with_type()
6658                    && scx.catalog.get_type_by_name(&proposed_name).is_some();
6659                conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6660            };
6661            if conflicting_type_exists || conflicting_item_exists {
6662                sql_bail!("catalog item '{}' already exists", to_item_name);
6663            }
6664
6665            Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6666                id: entry.id(),
6667                current_full_name: full_name,
6668                to_name: normalize::ident(to_item_name),
6669                object_type,
6670            }))
6671        }
6672        None => {
6673            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6674                name: name.to_ast_string_simple(),
6675                object_type,
6676            });
6677
6678            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6679        }
6680    }
6681}
6682
6683pub fn plan_alter_cluster_rename(
6684    scx: &mut StatementContext,
6685    object_type: ObjectType,
6686    name: Ident,
6687    to_name: Ident,
6688    if_exists: bool,
6689) -> Result<Plan, PlanError> {
6690    match resolve_cluster(scx, &name, if_exists)? {
6691        Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6692            id: entry.id(),
6693            name: entry.name().to_string(),
6694            to_name: ident(to_name),
6695        })),
6696        None => {
6697            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6698                name: name.to_ast_string_simple(),
6699                object_type,
6700            });
6701
6702            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6703        }
6704    }
6705}
6706
6707pub fn plan_alter_cluster_swap<F>(
6708    scx: &mut StatementContext,
6709    name_a: Ident,
6710    name_b: Ident,
6711    if_exists: bool,
6712    gen_temp_suffix: F,
6713) -> Result<Plan, PlanError>
6714where
6715    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6716{
6717    let cluster_a = match scx.resolve_cluster(Some(&name_a)) {
6718        Ok(cluster) => cluster,
6719        Err(_) if if_exists => {
6720            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6721                name: name_a.to_ast_string_simple(),
6722                object_type: ObjectType::Cluster,
6723            });
6724            return Ok(Plan::AlterNoop(AlterNoopPlan {
6725                object_type: ObjectType::Cluster,
6726            }));
6727        }
6728        Err(e) => return Err(e),
6729    };
6730    let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6731
6732    const CLUSTER_SWAP_PREFIX: &str = "mz_cluster_swap_";
6733    let check = |temp_suffix: &str| {
6734        let mut temp_name = ident!(CLUSTER_SWAP_PREFIX);
6735        temp_name.append_lossy(temp_suffix);
6736        match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6737            // Temp name does not exist, so we can use it.
6738            Err(CatalogError::UnknownCluster(_)) => true,
6739            // Temp name already exists!
6740            Ok(_) | Err(_) => false,
6741        }
6742    };
6743    let temp_suffix = gen_temp_suffix(&check)?;
6744    let name_temp = format!("{CLUSTER_SWAP_PREFIX}{temp_suffix}");
6745
6746    Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6747        id_a: cluster_a.id(),
6748        id_b: cluster_b.id(),
6749        name_a: name_a.into_string(),
6750        name_b: name_b.into_string(),
6751        name_temp,
6752    }))
6753}
6754
6755pub fn plan_alter_cluster_replica_rename(
6756    scx: &mut StatementContext,
6757    object_type: ObjectType,
6758    name: QualifiedReplica,
6759    to_item_name: Ident,
6760    if_exists: bool,
6761) -> Result<Plan, PlanError> {
6762    match resolve_cluster_replica(scx, &name, if_exists)? {
6763        Some((cluster, replica)) => {
6764            ensure_cluster_is_not_managed(scx, cluster.id())?;
6765            Ok(Plan::AlterClusterReplicaRename(
6766                AlterClusterReplicaRenamePlan {
6767                    cluster_id: cluster.id(),
6768                    replica_id: replica,
6769                    name: QualifiedReplica {
6770                        cluster: Ident::new(cluster.name())?,
6771                        replica: name.replica,
6772                    },
6773                    to_name: normalize::ident(to_item_name),
6774                },
6775            ))
6776        }
6777        None => {
6778            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6779                name: name.to_ast_string_simple(),
6780                object_type,
6781            });
6782
6783            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6784        }
6785    }
6786}
6787
6788pub fn describe_alter_object_swap(
6789    _: &StatementContext,
6790    _: AlterObjectSwapStatement,
6791) -> Result<StatementDesc, PlanError> {
6792    Ok(StatementDesc::new(None))
6793}
6794
6795pub fn plan_alter_object_swap(
6796    scx: &mut StatementContext,
6797    stmt: AlterObjectSwapStatement,
6798) -> Result<Plan, PlanError> {
6799    scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6800
6801    let AlterObjectSwapStatement {
6802        object_type,
6803        if_exists,
6804        name_a,
6805        name_b,
6806    } = stmt;
6807    let object_type = object_type.into();
6808
6809    // We'll try 10 times to generate a temporary suffix.
6810    let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6811        let mut attempts = 0;
6812        let name_temp = loop {
6813            attempts += 1;
6814            if attempts > 10 {
6815                tracing::warn!("Unable to generate temp id for swapping");
6816                sql_bail!("unable to swap!");
6817            }
6818
6819            // Call the provided closure to make sure this name is unique!
6820            let short_id = mz_ore::id_gen::temp_id();
6821            if check_fn(&short_id) {
6822                break short_id;
6823            }
6824        };
6825
6826        Ok(name_temp)
6827    };
6828
6829    match (object_type, name_a, name_b) {
6830        (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6831            plan_alter_schema_swap(scx, name_a, name_b, if_exists, gen_temp_suffix)
6832        }
6833        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6834            plan_alter_cluster_swap(scx, name_a, name_b, if_exists, gen_temp_suffix)
6835        }
6836        (ObjectType::Schema | ObjectType::Cluster, _, _) => {
6837            bail_internal!("name type does not match object type for ALTER SWAP")
6838        }
6839        (
6840            ObjectType::Table
6841            | ObjectType::View
6842            | ObjectType::MaterializedView
6843            | ObjectType::Source
6844            | ObjectType::Sink
6845            | ObjectType::Index
6846            | ObjectType::Type
6847            | ObjectType::Role
6848            | ObjectType::ClusterReplica
6849            | ObjectType::Secret
6850            | ObjectType::Connection
6851            | ObjectType::Database
6852            | ObjectType::Func
6853            | ObjectType::NetworkPolicy,
6854            _,
6855            _,
6856        ) => Err(PlanError::Unsupported {
6857            feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6858            discussion_no: None,
6859        }),
6860    }
6861}
6862
6863pub fn describe_alter_retain_history(
6864    _: &StatementContext,
6865    _: AlterRetainHistoryStatement<Aug>,
6866) -> Result<StatementDesc, PlanError> {
6867    Ok(StatementDesc::new(None))
6868}
6869
6870pub fn plan_alter_retain_history(
6871    scx: &StatementContext,
6872    AlterRetainHistoryStatement {
6873        object_type,
6874        if_exists,
6875        name,
6876        history,
6877    }: AlterRetainHistoryStatement<Aug>,
6878) -> Result<Plan, PlanError> {
6879    alter_retain_history(scx, object_type.into(), if_exists, name, history)
6880}
6881
6882fn alter_retain_history(
6883    scx: &StatementContext,
6884    object_type: ObjectType,
6885    if_exists: bool,
6886    name: UnresolvedObjectName,
6887    history: Option<WithOptionValue<Aug>>,
6888) -> Result<Plan, PlanError> {
6889    let name = match (object_type, name) {
6890        (
6891            // View gets a special error below.
6892            ObjectType::View
6893            | ObjectType::MaterializedView
6894            | ObjectType::Table
6895            | ObjectType::Source
6896            | ObjectType::Index,
6897            UnresolvedObjectName::Item(name),
6898        ) => name,
6899        (object_type, _) => {
6900            bail_unsupported!(format!("RETAIN HISTORY on {object_type}"))
6901        }
6902    };
6903    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6904        Some(entry) => {
6905            let full_name = scx.catalog.resolve_full_name(entry.name());
6906            let item_type = entry.item_type();
6907
6908            // Return a more helpful error on `ALTER VIEW <materialized-view>`.
6909            if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6910                return Err(PlanError::AlterViewOnMaterializedView(
6911                    full_name.to_string(),
6912                ));
6913            } else if object_type == ObjectType::View {
6914                sql_bail!("{object_type} does not support RETAIN HISTORY")
6915            } else if object_type != item_type {
6916                sql_bail!(
6917                    "\"{}\" is a {} not a {}",
6918                    full_name,
6919                    entry.item_type(),
6920                    format!("{object_type}").to_lowercase()
6921                )
6922            }
6923
6924            // Save the original value so we can write it back down in the create_sql catalog item.
6925            let (value, lcw) = match &history {
6926                Some(WithOptionValue::RetainHistoryFor(value)) => {
6927                    let window = OptionalDuration::try_from_value(value.clone())?;
6928                    (Some(value.clone()), window.0)
6929                }
6930                // None is RESET, so use the default CW.
6931                None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6932                _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6933            };
6934            let window = plan_retain_history(scx, lcw)?;
6935
6936            Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6937                id: entry.id(),
6938                value,
6939                window,
6940                object_type,
6941            }))
6942        }
6943        None => {
6944            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6945                name: name.to_ast_string_simple(),
6946                object_type,
6947            });
6948
6949            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6950        }
6951    }
6952}
6953
6954fn alter_source_timestamp_interval(
6955    scx: &StatementContext,
6956    if_exists: bool,
6957    source_name: UnresolvedItemName,
6958    value: Option<WithOptionValue<Aug>>,
6959) -> Result<Plan, PlanError> {
6960    let object_type = ObjectType::Source;
6961    match resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)? {
6962        Some(entry) => {
6963            let full_name = scx.catalog.resolve_full_name(entry.name());
6964            if entry.item_type() != CatalogItemType::Source {
6965                sql_bail!(
6966                    "\"{}\" is a {} not a {}",
6967                    full_name,
6968                    entry.item_type(),
6969                    format!("{object_type}").to_lowercase()
6970                )
6971            }
6972
6973            match value {
6974                Some(val) => {
6975                    let val = match val {
6976                        WithOptionValue::Value(v) => v,
6977                        _ => sql_bail!("TIMESTAMP INTERVAL requires an interval value"),
6978                    };
6979                    let duration = Duration::try_from_value(val.clone())?;
6980
6981                    let min = scx.catalog.system_vars().min_timestamp_interval();
6982                    let max = scx.catalog.system_vars().max_timestamp_interval();
6983                    if duration < min || duration > max {
6984                        return Err(PlanError::InvalidTimestampInterval {
6985                            min,
6986                            max,
6987                            requested: duration,
6988                        });
6989                    }
6990
6991                    Ok(Plan::AlterSourceTimestampInterval(
6992                        AlterSourceTimestampIntervalPlan {
6993                            id: entry.id(),
6994                            value: Some(val),
6995                            interval: duration,
6996                        },
6997                    ))
6998                }
6999                None => {
7000                    let interval = scx.catalog.system_vars().default_timestamp_interval();
7001                    Ok(Plan::AlterSourceTimestampInterval(
7002                        AlterSourceTimestampIntervalPlan {
7003                            id: entry.id(),
7004                            value: None,
7005                            interval,
7006                        },
7007                    ))
7008                }
7009            }
7010        }
7011        None => {
7012            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7013                name: source_name.to_ast_string_simple(),
7014                object_type,
7015            });
7016
7017            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
7018        }
7019    }
7020}
7021
7022pub fn describe_alter_secret_options(
7023    _: &StatementContext,
7024    _: AlterSecretStatement<Aug>,
7025) -> Result<StatementDesc, PlanError> {
7026    Ok(StatementDesc::new(None))
7027}
7028
7029pub fn plan_alter_secret(
7030    scx: &mut StatementContext,
7031    stmt: AlterSecretStatement<Aug>,
7032) -> Result<Plan, PlanError> {
7033    let AlterSecretStatement {
7034        name,
7035        if_exists,
7036        value,
7037    } = stmt;
7038    let object_type = ObjectType::Secret;
7039    let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7040        Some(entry) => entry.id(),
7041        None => {
7042            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7043                name: name.to_string(),
7044                object_type,
7045            });
7046
7047            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7048        }
7049    };
7050
7051    let secret_as = query::plan_secret_as(scx, value)?;
7052
7053    Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
7054}
7055
7056pub fn describe_alter_connection(
7057    _: &StatementContext,
7058    _: AlterConnectionStatement<Aug>,
7059) -> Result<StatementDesc, PlanError> {
7060    Ok(StatementDesc::new(None))
7061}
7062
7063generate_extracted_config!(AlterConnectionOption, (Validate, bool));
7064
7065pub fn plan_alter_connection(
7066    scx: &StatementContext,
7067    stmt: AlterConnectionStatement<Aug>,
7068) -> Result<Plan, PlanError> {
7069    let AlterConnectionStatement {
7070        name,
7071        if_exists,
7072        actions,
7073        with_options,
7074    } = stmt;
7075    let conn_name = normalize::unresolved_item_name(name)?;
7076    let entry = match scx.catalog.resolve_item(&conn_name) {
7077        Ok(entry) => entry,
7078        Err(_) if if_exists => {
7079            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7080                name: conn_name.to_string(),
7081                object_type: ObjectType::Connection,
7082            });
7083
7084            return Ok(Plan::AlterNoop(AlterNoopPlan {
7085                object_type: ObjectType::Connection,
7086            }));
7087        }
7088        Err(e) => return Err(e.into()),
7089    };
7090
7091    let connection = entry.connection()?;
7092
7093    if actions
7094        .iter()
7095        .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
7096    {
7097        if actions.len() > 1 {
7098            sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
7099        }
7100
7101        if !with_options.is_empty() {
7102            sql_bail!(
7103                "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
7104                with_options
7105                    .iter()
7106                    .map(|o| o.to_ast_string_simple())
7107                    .join(", ")
7108            );
7109        }
7110
7111        if !matches!(connection, Connection::Ssh(_)) {
7112            sql_bail!(
7113                "{} is not an SSH connection",
7114                scx.catalog.resolve_full_name(entry.name())
7115            )
7116        }
7117
7118        return Ok(Plan::AlterConnection(AlterConnectionPlan {
7119            id: entry.id(),
7120            action: crate::plan::AlterConnectionAction::RotateKeys,
7121        }));
7122    }
7123
7124    let options = AlterConnectionOptionExtracted::try_from(with_options)?;
7125    if options.validate.is_some() {
7126        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
7127    }
7128
7129    let validate = match options.validate {
7130        Some(val) => val,
7131        None => {
7132            scx.catalog
7133                .system_vars()
7134                .enable_default_connection_validation()
7135                && connection.validate_by_default()
7136        }
7137    };
7138
7139    let connection_type = match connection {
7140        Connection::Aws(_) => CreateConnectionType::Aws,
7141        Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
7142        Connection::Gcp(_) => CreateConnectionType::Gcp,
7143        Connection::Kafka(_) => CreateConnectionType::Kafka,
7144        Connection::Csr(_) => CreateConnectionType::Csr,
7145        Connection::GlueSchemaRegistry(_) => CreateConnectionType::GlueSchemaRegistry,
7146        Connection::Postgres(_) => CreateConnectionType::Postgres,
7147        Connection::Ssh(_) => CreateConnectionType::Ssh,
7148        Connection::MySql(_) => CreateConnectionType::MySql,
7149        Connection::SqlServer(_) => CreateConnectionType::SqlServer,
7150        Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
7151    };
7152
7153    // Collect all options irrespective of action taken on them.
7154    let specified_options: BTreeSet<_> = actions
7155        .iter()
7156        .map(|action: &AlterConnectionAction<Aug>| match action {
7157            AlterConnectionAction::SetOption(option) => Ok(option.name.clone()),
7158            AlterConnectionAction::DropOption(name) => Ok(name.clone()),
7159            AlterConnectionAction::RotateKeys => {
7160                Err(internal_err!("RotateKeys is handled separately above"))
7161            }
7162        })
7163        .collect::<Result<_, PlanError>>()?;
7164
7165    for invalid in INALTERABLE_OPTIONS {
7166        if specified_options.contains(invalid) {
7167            sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
7168        }
7169    }
7170
7171    connection::validate_options_per_connection_type(connection_type, specified_options)?;
7172
7173    // Partition operations into set and drop.
7174    let mut set_options_vec: Vec<_> = Vec::new();
7175    let mut drop_options: BTreeSet<_> = BTreeSet::new();
7176    for action in actions {
7177        match action {
7178            AlterConnectionAction::SetOption(option) => set_options_vec.push(option),
7179            AlterConnectionAction::DropOption(name) => {
7180                drop_options.insert(name);
7181            }
7182            AlterConnectionAction::RotateKeys => {
7183                bail_internal!("RotateKeys is handled separately above")
7184            }
7185        }
7186    }
7187
7188    let set_options: BTreeMap<_, _> = set_options_vec
7189        .clone()
7190        .into_iter()
7191        .map(|option| (option.name, option.value))
7192        .collect();
7193
7194    // Type check values + avoid duplicates; we don't want to e.g. let users
7195    // drop and set the same option in the same statement, so treating drops as
7196    // sets here is fine.
7197    let connection_options_extracted =
7198        connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
7199
7200    let duplicates: Vec<_> = connection_options_extracted
7201        .seen
7202        .intersection(&drop_options)
7203        .collect();
7204
7205    if !duplicates.is_empty() {
7206        sql_bail!(
7207            "cannot both SET and DROP/RESET options {}",
7208            duplicates
7209                .iter()
7210                .map(|option| option.to_string())
7211                .join(", ")
7212        )
7213    }
7214
7215    for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
7216        let set_options_count = mutually_exclusive_options
7217            .iter()
7218            .filter(|o| set_options.contains_key(o))
7219            .count();
7220        let drop_options_count = mutually_exclusive_options
7221            .iter()
7222            .filter(|o| drop_options.contains(o))
7223            .count();
7224
7225        // Disallow setting _and_ resetting mutually exclusive options
7226        if set_options_count > 0 && drop_options_count > 0 {
7227            sql_bail!(
7228                "cannot both SET and DROP/RESET mutually exclusive {} options {}",
7229                connection_type,
7230                mutually_exclusive_options
7231                    .iter()
7232                    .map(|option| option.to_string())
7233                    .join(", ")
7234            )
7235        }
7236
7237        // If any option is either set or dropped, ensure all mutually exclusive
7238        // options are dropped. We do this "behind the scenes", even though we
7239        // disallow users from performing the same action because this is the
7240        // mechanism by which we overwrite values elsewhere in the code.
7241        if set_options_count > 0 || drop_options_count > 0 {
7242            drop_options.extend(mutually_exclusive_options.iter().cloned());
7243        }
7244
7245        // n.b. if mutually exclusive options are set, those will error when we
7246        // try to replan the connection.
7247    }
7248
7249    Ok(Plan::AlterConnection(AlterConnectionPlan {
7250        id: entry.id(),
7251        action: crate::plan::AlterConnectionAction::AlterOptions {
7252            set_options,
7253            drop_options,
7254            validate,
7255        },
7256    }))
7257}
7258
7259pub fn describe_alter_sink(
7260    _: &StatementContext,
7261    _: AlterSinkStatement<Aug>,
7262) -> Result<StatementDesc, PlanError> {
7263    Ok(StatementDesc::new(None))
7264}
7265
7266pub fn plan_alter_sink(
7267    scx: &mut StatementContext,
7268    stmt: AlterSinkStatement<Aug>,
7269) -> Result<Plan, PlanError> {
7270    let AlterSinkStatement {
7271        sink_name,
7272        if_exists,
7273        action,
7274    } = stmt;
7275
7276    let object_type = ObjectType::Sink;
7277    let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
7278
7279    let Some(item) = item else {
7280        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7281            name: sink_name.to_string(),
7282            object_type,
7283        });
7284
7285        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7286    };
7287    // Always ALTER objects from their latest version.
7288    let item = item.at_version(RelationVersionSelector::Latest);
7289
7290    match action {
7291        AlterSinkAction::ChangeRelation(new_from) => {
7292            // First we reconstruct the original CREATE SINK statement
7293            let create_sql = item.create_sql();
7294            let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
7295            let [stmt]: [StatementParseResult; 1] = stmts
7296                .try_into()
7297                .map_err(|_| internal_err!("create SQL of sink was not exactly one statement"))?;
7298            let Statement::CreateSink(stmt) = stmt.ast else {
7299                bail_internal!("create SQL of sink is not a CREATE SINK statement");
7300            };
7301
7302            // Then resolve and swap the resolved from relation to the new one
7303            let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7304            stmt.from = new_from;
7305
7306            // Finally re-plan the modified create sink statement to verify the new configuration is valid
7307            let Plan::CreateSink(mut plan) = plan_sink(scx, stmt)? else {
7308                bail_internal!("plan_sink did not produce a CreateSink plan");
7309            };
7310
7311            plan.sink.version += 1;
7312
7313            Ok(Plan::AlterSink(AlterSinkPlan {
7314                item_id: item.id(),
7315                global_id: item.global_id(),
7316                sink: plan.sink,
7317                with_snapshot: plan.with_snapshot,
7318                in_cluster: plan.in_cluster,
7319            }))
7320        }
7321        AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7322        AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7323    }
7324}
7325
7326pub fn describe_alter_source(
7327    _: &StatementContext,
7328    _: AlterSourceStatement<Aug>,
7329) -> Result<StatementDesc, PlanError> {
7330    // TODO: put the options here, right?
7331    Ok(StatementDesc::new(None))
7332}
7333
7334generate_extracted_config!(
7335    AlterSourceAddSubsourceOption,
7336    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7337    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7338    (Details, String)
7339);
7340
7341pub fn plan_alter_source(
7342    scx: &mut StatementContext,
7343    stmt: AlterSourceStatement<Aug>,
7344) -> Result<Plan, PlanError> {
7345    let AlterSourceStatement {
7346        source_name,
7347        if_exists,
7348        action,
7349    } = stmt;
7350    let object_type = ObjectType::Source;
7351
7352    if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7353        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7354            name: source_name.to_string(),
7355            object_type,
7356        });
7357
7358        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7359    }
7360
7361    match action {
7362        AlterSourceAction::SetOptions(options) => {
7363            let mut options = options.into_iter();
7364            let option = options
7365                .next()
7366                .ok_or_else(|| sql_err!("ALTER SOURCE SET requires at least one option"))?;
7367            if option.name == CreateSourceOptionName::RetainHistory {
7368                if options.next().is_some() {
7369                    sql_bail!("RETAIN HISTORY must be only option");
7370                }
7371                return alter_retain_history(
7372                    scx,
7373                    object_type,
7374                    if_exists,
7375                    UnresolvedObjectName::Item(source_name),
7376                    option.value,
7377                );
7378            }
7379            if option.name == CreateSourceOptionName::TimestampInterval {
7380                if options.next().is_some() {
7381                    sql_bail!("TIMESTAMP INTERVAL must be only option");
7382                }
7383                return alter_source_timestamp_interval(scx, if_exists, source_name, option.value);
7384            }
7385            // n.b we use this statement in purification in a way that cannot be
7386            // planned directly.
7387            sql_bail!(
7388                "Cannot modify the {} of a SOURCE.",
7389                option.name.to_ast_string_simple()
7390            );
7391        }
7392        AlterSourceAction::ResetOptions(reset) => {
7393            let mut options = reset.into_iter();
7394            let option = options
7395                .next()
7396                .ok_or_else(|| sql_err!("ALTER SOURCE RESET requires at least one option"))?;
7397            if option == CreateSourceOptionName::RetainHistory {
7398                if options.next().is_some() {
7399                    sql_bail!("RETAIN HISTORY must be only option");
7400                }
7401                return alter_retain_history(
7402                    scx,
7403                    object_type,
7404                    if_exists,
7405                    UnresolvedObjectName::Item(source_name),
7406                    None,
7407                );
7408            }
7409            if option == CreateSourceOptionName::TimestampInterval {
7410                if options.next().is_some() {
7411                    sql_bail!("TIMESTAMP INTERVAL must be only option");
7412                }
7413                return alter_source_timestamp_interval(scx, if_exists, source_name, None);
7414            }
7415            sql_bail!(
7416                "Cannot modify the {} of a SOURCE.",
7417                option.to_ast_string_simple()
7418            );
7419        }
7420        AlterSourceAction::DropSubsources { .. } => {
7421            sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7422        }
7423        AlterSourceAction::AddSubsources { .. } => {
7424            sql_bail!("ALTER SOURCE...ADD SUBSOURCE must be purified before planning")
7425        }
7426        AlterSourceAction::RefreshReferences => {
7427            sql_bail!("ALTER SOURCE...REFRESH REFERENCES must be purified before planning")
7428        }
7429    };
7430}
7431
7432pub fn describe_alter_system_set(
7433    _: &StatementContext,
7434    _: AlterSystemSetStatement,
7435) -> Result<StatementDesc, PlanError> {
7436    Ok(StatementDesc::new(None))
7437}
7438
7439pub fn plan_alter_system_set(
7440    _: &StatementContext,
7441    AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7442) -> Result<Plan, PlanError> {
7443    let name = name.to_string();
7444    Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7445        name,
7446        value: scl::plan_set_variable_to(to)?,
7447    }))
7448}
7449
7450pub fn describe_alter_system_reset(
7451    _: &StatementContext,
7452    _: AlterSystemResetStatement,
7453) -> Result<StatementDesc, PlanError> {
7454    Ok(StatementDesc::new(None))
7455}
7456
7457pub fn plan_alter_system_reset(
7458    _: &StatementContext,
7459    AlterSystemResetStatement { name }: AlterSystemResetStatement,
7460) -> Result<Plan, PlanError> {
7461    let name = name.to_string();
7462    Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7463}
7464
7465pub fn describe_alter_system_reset_all(
7466    _: &StatementContext,
7467    _: AlterSystemResetAllStatement,
7468) -> Result<StatementDesc, PlanError> {
7469    Ok(StatementDesc::new(None))
7470}
7471
7472pub fn plan_alter_system_reset_all(
7473    _: &StatementContext,
7474    _: AlterSystemResetAllStatement,
7475) -> Result<Plan, PlanError> {
7476    Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7477}
7478
7479pub fn describe_alter_role(
7480    _: &StatementContext,
7481    _: AlterRoleStatement<Aug>,
7482) -> Result<StatementDesc, PlanError> {
7483    Ok(StatementDesc::new(None))
7484}
7485
7486pub fn plan_alter_role(
7487    scx: &StatementContext,
7488    AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7489) -> Result<Plan, PlanError> {
7490    let option = match option {
7491        AlterRoleOption::Attributes(attrs) => {
7492            let attrs = plan_role_attributes(attrs, scx)?;
7493            PlannedAlterRoleOption::Attributes(attrs)
7494        }
7495        AlterRoleOption::Variable(variable) => {
7496            let var = plan_role_variable(scx, variable)?;
7497            PlannedAlterRoleOption::Variable(var)
7498        }
7499    };
7500
7501    Ok(Plan::AlterRole(AlterRolePlan {
7502        id: name.id,
7503        name: name.name,
7504        option,
7505    }))
7506}
7507
7508pub fn describe_alter_table_add_column(
7509    _: &StatementContext,
7510    _: AlterTableAddColumnStatement<Aug>,
7511) -> Result<StatementDesc, PlanError> {
7512    Ok(StatementDesc::new(None))
7513}
7514
7515pub fn plan_alter_table_add_column(
7516    scx: &StatementContext,
7517    stmt: AlterTableAddColumnStatement<Aug>,
7518) -> Result<Plan, PlanError> {
7519    let AlterTableAddColumnStatement {
7520        if_exists,
7521        name,
7522        if_col_not_exist,
7523        column_name,
7524        data_type,
7525    } = stmt;
7526    let object_type = ObjectType::Table;
7527
7528    scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7529
7530    let (relation_id, item_name, desc) =
7531        match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7532            Some(item) => {
7533                // Always add columns to the latest version of the item.
7534                let item_name = scx.catalog.resolve_full_name(item.name());
7535                let item = item.at_version(RelationVersionSelector::Latest);
7536                let desc = item
7537                    .relation_desc()
7538                    .ok_or_else(|| sql_err!("item does not have a relation description"))?
7539                    .into_owned();
7540                (item.id(), item_name, desc)
7541            }
7542            None => {
7543                scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7544                    name: name.to_ast_string_simple(),
7545                    object_type,
7546                });
7547                return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7548            }
7549        };
7550
7551    let column_name = ColumnName::from(column_name.as_str());
7552    if desc.get_by_name(&column_name).is_some() {
7553        if if_col_not_exist {
7554            scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7555                column_name: column_name.to_string(),
7556                object_name: item_name.item,
7557            });
7558            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7559        } else {
7560            return Err(PlanError::ColumnAlreadyExists {
7561                column_name,
7562                object_name: item_name.item,
7563            });
7564        }
7565    }
7566
7567    let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7568    // TODO(alter_table): Support non-nullable columns with default values.
7569    let column_type = scalar_type.nullable(true);
7570    // "unresolve" our data type so we can later update the persisted create_sql.
7571    let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7572
7573    Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7574        relation_id,
7575        column_name,
7576        column_type,
7577        raw_sql_type,
7578    }))
7579}
7580
7581pub fn describe_alter_materialized_view_apply_replacement(
7582    _: &StatementContext,
7583    _: AlterMaterializedViewApplyReplacementStatement,
7584) -> Result<StatementDesc, PlanError> {
7585    Ok(StatementDesc::new(None))
7586}
7587
7588pub fn plan_alter_materialized_view_apply_replacement(
7589    scx: &StatementContext,
7590    stmt: AlterMaterializedViewApplyReplacementStatement,
7591) -> Result<Plan, PlanError> {
7592    let AlterMaterializedViewApplyReplacementStatement {
7593        if_exists,
7594        name,
7595        replacement_name,
7596    } = stmt;
7597
7598    scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
7599
7600    let object_type = ObjectType::MaterializedView;
7601    let Some(mv) = resolve_item_or_type(scx, object_type, name.clone(), if_exists)? else {
7602        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7603            name: name.to_ast_string_simple(),
7604            object_type,
7605        });
7606        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7607    };
7608
7609    let replacement = resolve_item_or_type(scx, object_type, replacement_name, false)?
7610        .ok_or_else(|| sql_err!("replacement materialized view does not exist"))?;
7611
7612    if replacement.replacement_target() != Some(mv.id()) {
7613        return Err(PlanError::InvalidReplacement {
7614            item_type: mv.item_type(),
7615            item_name: scx.catalog.minimal_qualification(mv.name()),
7616            replacement_type: replacement.item_type(),
7617            replacement_name: scx.catalog.minimal_qualification(replacement.name()),
7618        });
7619    }
7620
7621    Ok(Plan::AlterMaterializedViewApplyReplacement(
7622        AlterMaterializedViewApplyReplacementPlan {
7623            id: mv.id(),
7624            replacement_id: replacement.id(),
7625        },
7626    ))
7627}
7628
7629pub fn describe_comment(
7630    _: &StatementContext,
7631    _: CommentStatement<Aug>,
7632) -> Result<StatementDesc, PlanError> {
7633    Ok(StatementDesc::new(None))
7634}
7635
7636pub fn plan_comment(
7637    scx: &mut StatementContext,
7638    stmt: CommentStatement<Aug>,
7639) -> Result<Plan, PlanError> {
7640    const MAX_COMMENT_LENGTH: usize = 1024;
7641
7642    let CommentStatement { object, comment } = stmt;
7643
7644    // TODO(parkmycar): Make max comment length configurable.
7645    if let Some(c) = &comment {
7646        if c.len() > 1024 {
7647            return Err(PlanError::CommentTooLong {
7648                length: c.len(),
7649                max_size: MAX_COMMENT_LENGTH,
7650            });
7651        }
7652    }
7653
7654    let (object_id, column_pos) = match &object {
7655        com_ty @ CommentObjectType::Table { name }
7656        | com_ty @ CommentObjectType::View { name }
7657        | com_ty @ CommentObjectType::MaterializedView { name }
7658        | com_ty @ CommentObjectType::Index { name }
7659        | com_ty @ CommentObjectType::Func { name }
7660        | com_ty @ CommentObjectType::Connection { name }
7661        | com_ty @ CommentObjectType::Source { name }
7662        | com_ty @ CommentObjectType::Sink { name }
7663        | com_ty @ CommentObjectType::Secret { name } => {
7664            let item = scx.get_item_by_resolved_name(name)?;
7665            match (com_ty, item.item_type()) {
7666                (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7667                    (CommentObjectId::Table(item.id()), None)
7668                }
7669                (CommentObjectType::View { .. }, CatalogItemType::View) => {
7670                    (CommentObjectId::View(item.id()), None)
7671                }
7672                (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7673                    (CommentObjectId::MaterializedView(item.id()), None)
7674                }
7675                (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7676                    (CommentObjectId::Index(item.id()), None)
7677                }
7678                (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7679                    (CommentObjectId::Func(item.id()), None)
7680                }
7681                (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7682                    (CommentObjectId::Connection(item.id()), None)
7683                }
7684                (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7685                    (CommentObjectId::Source(item.id()), None)
7686                }
7687                (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7688                    (CommentObjectId::Sink(item.id()), None)
7689                }
7690                (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7691                    (CommentObjectId::Secret(item.id()), None)
7692                }
7693                (com_ty, cat_ty) => {
7694                    let expected_type = match com_ty {
7695                        CommentObjectType::Table { .. } => ObjectType::Table,
7696                        CommentObjectType::View { .. } => ObjectType::View,
7697                        CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7698                        CommentObjectType::Index { .. } => ObjectType::Index,
7699                        CommentObjectType::Func { .. } => ObjectType::Func,
7700                        CommentObjectType::Connection { .. } => ObjectType::Connection,
7701                        CommentObjectType::Source { .. } => ObjectType::Source,
7702                        CommentObjectType::Sink { .. } => ObjectType::Sink,
7703                        CommentObjectType::Secret { .. } => ObjectType::Secret,
7704                        _ => sql_bail!("cannot comment on this object type"),
7705                    };
7706
7707                    return Err(PlanError::InvalidObjectType {
7708                        expected_type: SystemObjectType::Object(expected_type),
7709                        actual_type: SystemObjectType::Object(cat_ty.into()),
7710                        object_name: item.name().item.clone(),
7711                    });
7712                }
7713            }
7714        }
7715        CommentObjectType::Type { ty } => match ty {
7716            ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7717                sql_bail!("cannot comment on anonymous list or map type");
7718            }
7719            ResolvedDataType::Named { id, modifiers, .. } => {
7720                if !modifiers.is_empty() {
7721                    sql_bail!("cannot comment on type with modifiers");
7722                }
7723                (CommentObjectId::Type(*id), None)
7724            }
7725            ResolvedDataType::Error => bail_internal!("unresolved data type"),
7726        },
7727        CommentObjectType::Column { name } => {
7728            let (item, pos) = scx.get_column_by_resolved_name(name)?;
7729            match item.item_type() {
7730                CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7731                CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7732                CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7733                CatalogItemType::MaterializedView => {
7734                    (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7735                }
7736                CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7737                r => {
7738                    return Err(PlanError::Unsupported {
7739                        feature: format!("Specifying comments on a column of {r}"),
7740                        discussion_no: None,
7741                    });
7742                }
7743            }
7744        }
7745        CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7746        CommentObjectType::Database { name } => {
7747            (CommentObjectId::Database(*name.database_id()), None)
7748        }
7749        CommentObjectType::Schema { name } => {
7750            // Temporary schemas cannot have comments - they are connection-specific
7751            // and transient. With lazy temporary schema creation, the temp schema
7752            // may not exist yet, but we still need to return the correct error.
7753            if matches!(name.schema_spec(), SchemaSpecifier::Temporary) {
7754                sql_bail!(
7755                    "cannot comment on schema {} because it is a temporary schema",
7756                    mz_repr::namespaces::MZ_TEMP_SCHEMA
7757                );
7758            }
7759            (
7760                CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7761                None,
7762            )
7763        }
7764        CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7765        CommentObjectType::ClusterReplica { name } => {
7766            let replica = scx.catalog.resolve_cluster_replica(name)?;
7767            (
7768                CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7769                None,
7770            )
7771        }
7772        CommentObjectType::NetworkPolicy { name } => {
7773            (CommentObjectId::NetworkPolicy(name.id), None)
7774        }
7775    };
7776
7777    // Note: the `mz_comments` table uses an `Int4` for the column position, but in the catalog storage we
7778    // store a `usize` which would be a `Uint8`. We guard against a safe conversion here because
7779    // it's the easiest place to raise an error.
7780    //
7781    // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
7782    if let Some(p) = column_pos {
7783        i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7784            max_num_columns: MAX_NUM_COLUMNS,
7785            req_num_columns: p,
7786        })?;
7787    }
7788
7789    Ok(Plan::Comment(CommentPlan {
7790        object_id,
7791        sub_component: column_pos,
7792        comment,
7793    }))
7794}
7795
7796pub(crate) fn resolve_cluster<'a>(
7797    scx: &'a StatementContext,
7798    name: &'a Ident,
7799    if_exists: bool,
7800) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7801    match scx.resolve_cluster(Some(name)) {
7802        Ok(cluster) => Ok(Some(cluster)),
7803        Err(_) if if_exists => Ok(None),
7804        Err(e) => Err(e),
7805    }
7806}
7807
7808pub(crate) fn resolve_cluster_replica<'a>(
7809    scx: &'a StatementContext,
7810    name: &QualifiedReplica,
7811    if_exists: bool,
7812) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7813    match scx.resolve_cluster(Some(&name.cluster)) {
7814        Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7815            Some(replica_id) => Ok(Some((cluster, *replica_id))),
7816            None if if_exists => Ok(None),
7817            None => Err(sql_err!(
7818                "CLUSTER {} has no CLUSTER REPLICA named {}",
7819                cluster.name(),
7820                name.replica.as_str().quoted(),
7821            )),
7822        },
7823        Err(_) if if_exists => Ok(None),
7824        Err(e) => Err(e),
7825    }
7826}
7827
7828pub(crate) fn resolve_database<'a>(
7829    scx: &'a StatementContext,
7830    name: &'a UnresolvedDatabaseName,
7831    if_exists: bool,
7832) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7833    match scx.resolve_database(name) {
7834        Ok(database) => Ok(Some(database)),
7835        Err(_) if if_exists => Ok(None),
7836        Err(e) => Err(e),
7837    }
7838}
7839
7840pub(crate) fn resolve_schema<'a>(
7841    scx: &'a StatementContext,
7842    name: UnresolvedSchemaName,
7843    if_exists: bool,
7844) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7845    match scx.resolve_schema(name) {
7846        Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7847        Err(_) if if_exists => Ok(None),
7848        Err(e) => Err(e),
7849    }
7850}
7851
7852pub(crate) fn resolve_network_policy<'a>(
7853    scx: &'a StatementContext,
7854    name: Ident,
7855    if_exists: bool,
7856) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7857    match scx.catalog.resolve_network_policy(&name.to_string()) {
7858        Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7859            id: policy.id(),
7860            name: policy.name().to_string(),
7861        })),
7862        Err(_) if if_exists => Ok(None),
7863        Err(e) => Err(e.into()),
7864    }
7865}
7866
7867pub(crate) fn resolve_item_or_type<'a>(
7868    scx: &'a StatementContext,
7869    object_type: ObjectType,
7870    name: UnresolvedItemName,
7871    if_exists: bool,
7872) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7873    let name = normalize::unresolved_item_name(name)?;
7874    let catalog_item = match object_type {
7875        ObjectType::Type => scx.catalog.resolve_type(&name),
7876        ObjectType::Table
7877        | ObjectType::View
7878        | ObjectType::MaterializedView
7879        | ObjectType::Source
7880        | ObjectType::Sink
7881        | ObjectType::Index
7882        | ObjectType::Role
7883        | ObjectType::Cluster
7884        | ObjectType::ClusterReplica
7885        | ObjectType::Secret
7886        | ObjectType::Connection
7887        | ObjectType::Database
7888        | ObjectType::Schema
7889        | ObjectType::Func
7890        | ObjectType::NetworkPolicy => scx.catalog.resolve_item(&name),
7891    };
7892
7893    match catalog_item {
7894        Ok(item) => {
7895            let is_type = ObjectType::from(item.item_type());
7896            if object_type == is_type {
7897                Ok(Some(item))
7898            } else {
7899                Err(PlanError::MismatchedObjectType {
7900                    name: scx.catalog.minimal_qualification(item.name()),
7901                    is_type,
7902                    expected_type: object_type,
7903                })
7904            }
7905        }
7906        Err(_) if if_exists => Ok(None),
7907        Err(e) => Err(e.into()),
7908    }
7909}
7910
7911/// Returns an error if the given cluster is a managed cluster
7912fn ensure_cluster_is_not_managed(
7913    scx: &StatementContext,
7914    cluster_id: ClusterId,
7915) -> Result<(), PlanError> {
7916    let cluster = scx.catalog.get_cluster(cluster_id);
7917    if cluster.is_managed() {
7918        Err(PlanError::ManagedCluster {
7919            cluster_name: cluster.name().to_string(),
7920        })
7921    } else {
7922        Ok(())
7923    }
7924}