Skip to main content

mz_sql/plan/statement/
ddl.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Data definition language (DDL).
11//!
12//! This module houses the handlers for statements that modify the catalog, like
13//! `ALTER`, `CREATE`, and `DROP`.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::num::NonZeroU32;
19use std::time::Duration;
20
21use itertools::Itertools;
22use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
23use mz_arrow_util::builder::ArrowBuilder;
24use mz_auth::password::Password;
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_expr::{CollectionPlan, UnmaterializableFunc};
27use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
28use mz_ore::cast::{CastFrom, TryCastFrom};
29use mz_ore::collections::{CollectionExt, HashSet};
30use mz_ore::num::NonNeg;
31use mz_ore::str::StrExt;
32use mz_ore::{soft_assert_or_log, soft_panic_or_log};
33use mz_proto::RustType;
34use mz_repr::adt::interval::Interval;
35use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
36use mz_repr::network_policy_id::NetworkPolicyId;
37use mz_repr::optimize::OptimizerFeatureOverrides;
38use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
39use mz_repr::role_id::RoleId;
40use mz_repr::{
41    CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
42    SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
43    preserves_order, strconv,
44};
45use mz_sql_parser::ast::{
46    self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
47    AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
48    AlterMaterializedViewApplyReplacementStatement, AlterNetworkPolicyStatement,
49    AlterObjectRenameStatement, AlterObjectSwapStatement, AlterRetainHistoryStatement,
50    AlterRoleOption, AlterRoleStatement, AlterSecretStatement, AlterSetClusterStatement,
51    AlterSinkAction, AlterSinkStatement, AlterSourceAction, AlterSourceAddSubsourceOption,
52    AlterSourceAddSubsourceOptionName, AlterSourceStatement, AlterSystemResetAllStatement,
53    AlterSystemResetStatement, AlterSystemSetStatement, AlterTableAddColumnStatement, AvroSchema,
54    AvroSchemaOption, AvroSchemaOptionName, ClusterAlterOption, ClusterAlterOptionName,
55    ClusterAlterOptionValue, ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName,
56    ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName,
57    ClusterScheduleOptionValue, ColumnDef, ColumnOption, CommentObjectType, CommentStatement,
58    ConnectionOption, ConnectionOptionName, CreateClusterReplicaStatement, CreateClusterStatement,
59    CreateConnectionOption, CreateConnectionOptionName, CreateConnectionStatement,
60    CreateConnectionType, CreateDatabaseStatement, CreateIndexStatement,
61    CreateMaterializedViewStatement, CreateNetworkPolicyStatement, CreateRoleStatement,
62    CreateSchemaStatement, CreateSecretStatement, CreateSinkConnection, CreateSinkOption,
63    CreateSinkOptionName, CreateSinkStatement, CreateSourceConnection, CreateSourceOption,
64    CreateSourceOptionName, CreateSourceStatement, CreateSubsourceOption,
65    CreateSubsourceOptionName, CreateSubsourceStatement, CreateTableFromSourceStatement,
66    CreateTableStatement, CreateTypeAs, CreateTypeListOption, CreateTypeListOptionName,
67    CreateTypeMapOption, CreateTypeMapOptionName, CreateTypeStatement, CreateViewStatement,
68    CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
69    CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DeferredItemName,
70    DocOnIdentifier, DocOnSchema, DropObjectsStatement, DropOwnedStatement, Expr, Format,
71    FormatSpecifier, GlueAvroOption, GlueAvroOptionName, IcebergSinkConfigOption, Ident,
72    IfExistsBehavior, IndexOption, IndexOptionName, KafkaSinkConfigOption, KeyConstraint,
73    LoadGeneratorOption, LoadGeneratorOptionName, MaterializedViewOption,
74    MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName, NetworkPolicyOption,
75    NetworkPolicyOptionName, NetworkPolicyRuleDefinition, NetworkPolicyRuleOption,
76    NetworkPolicyRuleOptionName, PgConfigOption, PgConfigOptionName, ProtobufSchema,
77    QualifiedReplica, RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue,
78    ReplicaDefinition, ReplicaOption, ReplicaOptionName, RoleAttribute, SetRoleVar,
79    SourceErrorPolicy, SourceIncludeMetadata, SqlServerConfigOption, SqlServerConfigOptionName,
80    Statement, TableConstraint, TableFromSourceColumns, TableFromSourceOption,
81    TableFromSourceOptionName, TableOption, TableOptionName, UnresolvedDatabaseName,
82    UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName, Value, ViewDefinition,
83    WithOptionValue,
84};
85use mz_sql_parser::ident;
86use mz_sql_parser::parser::StatementParseResult;
87use mz_storage_types::connections::inline::ReferencedConnection;
88use mz_storage_types::connections::{Connection, KafkaTopicOptions};
89use mz_storage_types::sinks::{
90    IcebergSinkConnection, KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType,
91    SinkEnvelope, StorageSinkConnection, iceberg_type_overrides,
92};
93use mz_storage_types::sources::encoding::{
94    AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
95    SourceDataEncoding, included_column_desc,
96};
97use mz_storage_types::sources::envelope::{
98    KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
99};
100use mz_storage_types::sources::kafka::{
101    KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
102};
103use mz_storage_types::sources::load_generator::{
104    KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
105    LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
106};
107use mz_storage_types::sources::mysql::{
108    MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
109};
110use mz_storage_types::sources::postgres::{
111    PostgresSourceConnection, PostgresSourcePublicationDetails,
112    ProtoPostgresSourcePublicationDetails,
113};
114use mz_storage_types::sources::sql_server::{
115    ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
116};
117use mz_storage_types::sources::{
118    GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
119    ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
120    SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
121    SqlServerSourceExtras, Timeline,
122};
123use mz_storage_types::wire_format::WireFormat;
124use prost::Message;
125
126use crate::ast::display::AstDisplay;
127use crate::catalog::{
128    CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
129    CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
130};
131use crate::iceberg::IcebergSinkConfigOptionExtracted;
132use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
133use crate::names::{
134    Aug, CommentObjectId, DatabaseId, DependencyIds, ObjectId, PartialItemName, QualifiedItemName,
135    ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
136    ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
137};
138use crate::normalize::{self, ident};
139use crate::plan::error::PlanError;
140use crate::plan::query::{
141    ExprContext, QueryLifetime, plan_expr, scalar_type_from_catalog, scalar_type_from_sql,
142};
143use crate::plan::scope::Scope;
144use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
145use crate::plan::statement::{StatementContext, StatementDesc, scl};
146use crate::plan::typeconv::CastContext;
147use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
148use crate::plan::{
149    AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
150    AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
151    AlterMaterializedViewApplyReplacementPlan, AlterNetworkPolicyPlan, AlterNoopPlan,
152    AlterOptionParameter, AlterRetainHistoryPlan, AlterRolePlan, AlterSchemaRenamePlan,
153    AlterSchemaSwapPlan, AlterSecretPlan, AlterSetClusterPlan, AlterSinkPlan,
154    AlterSourceTimestampIntervalPlan, AlterSystemResetAllPlan, AlterSystemResetPlan,
155    AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig,
156    ComputeReplicaIntrospectionConfig, ConnectionDetails, CreateClusterManagedPlan,
157    CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant,
158    CreateConnectionPlan, CreateDatabasePlan, CreateIndexPlan, CreateMaterializedViewPlan,
159    CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan,
160    CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc,
161    DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index, MaterializedView, NetworkPolicyRule,
162    NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan, PlanClusterOption, PlanNotice,
163    PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink, Source, Table, TableDataSource, Type,
164    VariableValue, View, WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders,
165    WebhookValidation, literal, plan_utils, query, transform_ast,
166};
167use crate::session::vars::{
168    self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
169    ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
170    ENABLE_REPLICA_TARGETED_MATERIALIZED_VIEWS, VarInput,
171};
172use crate::{names, parse};
173
174mod connection;
175
176// TODO: Figure out what the maximum number of columns we can actually support is, and set that.
177//
178// The real max is probably higher than this, but it's easier to relax a constraint than make it
179// more strict.
180const MAX_NUM_COLUMNS: usize = 256;
181
182const MAX_KAFKA_TOPIC_METADATA_REFRESH_INTERVAL: Duration = Duration::from_secs(60 * 60);
183const MIN_KAFKA_TOPIC_METADATA_REFRESH_INTERVAL: Duration = Duration::from_secs(1);
184
185static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
186    std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
187
188/// Given a relation desc and a column list, checks that:
189/// - the column list is a prefix of the desc;
190/// - all the listed columns are types that have meaningful Persist-level ordering.
191fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
192    if partition_by.len() > desc.len() {
193        tracing::error!(
194            "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
195            desc.len(),
196            partition_by.len()
197        );
198        partition_by.truncate(desc.len());
199    }
200
201    let desc_prefix = desc.iter().take(partition_by.len());
202    for (idx, ((desc_name, desc_type), partition_name)) in
203        desc_prefix.zip_eq(partition_by).enumerate()
204    {
205        let partition_name = normalize::column_name(partition_name);
206        if *desc_name != partition_name {
207            sql_bail!(
208                "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
209            );
210        }
211        if !preserves_order(&desc_type.scalar_type) {
212            sql_bail!("PARTITION BY column {partition_name} has unsupported type");
213        }
214    }
215    Ok(())
216}
217
218pub fn describe_create_database(
219    _: &StatementContext,
220    _: CreateDatabaseStatement,
221) -> Result<StatementDesc, PlanError> {
222    Ok(StatementDesc::new(None))
223}
224
225pub fn plan_create_database(
226    _: &StatementContext,
227    CreateDatabaseStatement {
228        name,
229        if_not_exists,
230    }: CreateDatabaseStatement,
231) -> Result<Plan, PlanError> {
232    Ok(Plan::CreateDatabase(CreateDatabasePlan {
233        name: normalize::ident(name.0),
234        if_not_exists,
235    }))
236}
237
238pub fn describe_create_schema(
239    _: &StatementContext,
240    _: CreateSchemaStatement,
241) -> Result<StatementDesc, PlanError> {
242    Ok(StatementDesc::new(None))
243}
244
245pub fn plan_create_schema(
246    scx: &StatementContext,
247    CreateSchemaStatement {
248        mut name,
249        if_not_exists,
250    }: CreateSchemaStatement,
251) -> Result<Plan, PlanError> {
252    if name.0.len() > 2 {
253        sql_bail!("schema name {} has more than two components", name);
254    }
255    let schema_name = normalize::ident(
256        name.0
257            .pop()
258            .expect("names always have at least one component"),
259    );
260    let database_spec = match name.0.pop() {
261        None => match scx.catalog.active_database() {
262            Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
263            None => sql_bail!("no database specified and no active database"),
264        },
265        Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
266            Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
267            Err(_) => sql_bail!("invalid database {}", n.as_str()),
268        },
269    };
270    Ok(Plan::CreateSchema(CreateSchemaPlan {
271        database_spec,
272        schema_name,
273        if_not_exists,
274    }))
275}
276
277pub fn describe_create_table(
278    _: &StatementContext,
279    _: CreateTableStatement<Aug>,
280) -> Result<StatementDesc, PlanError> {
281    Ok(StatementDesc::new(None))
282}
283
284pub fn plan_create_table(
285    scx: &StatementContext,
286    stmt: CreateTableStatement<Aug>,
287) -> Result<Plan, PlanError> {
288    let CreateTableStatement {
289        name,
290        columns,
291        constraints,
292        if_not_exists,
293        temporary,
294        with_options,
295    } = &stmt;
296
297    let names: Vec<_> = columns
298        .iter()
299        .filter(|c| {
300            // This set of `names` is used to create the initial RelationDesc.
301            // Columns that have been added at later versions of the table will
302            // get added further below.
303            let is_versioned = c
304                .options
305                .iter()
306                .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
307            !is_versioned
308        })
309        .map(|c| normalize::column_name(c.name.clone()))
310        .collect();
311
312    if let Some(dup) = names.iter().duplicates().next() {
313        sql_bail!("column {} specified more than once", dup.quoted());
314    }
315
316    // Build initial relation type that handles declared data types
317    // and NOT NULL constraints.
318    let mut column_types = Vec::with_capacity(columns.len());
319    let mut defaults = Vec::with_capacity(columns.len());
320    let mut changes = BTreeMap::new();
321    let mut keys = Vec::new();
322
323    for (i, c) in columns.into_iter().enumerate() {
324        let aug_data_type = &c.data_type;
325        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
326        let mut nullable = true;
327        let mut default = Expr::null();
328        let mut versioned = false;
329        for option in &c.options {
330            match &option.option {
331                ColumnOption::NotNull => nullable = false,
332                ColumnOption::Default(expr) => {
333                    // Ensure expression can be planned and yields the correct
334                    // type.
335                    let mut expr = expr.clone();
336                    transform_ast::transform(scx, &mut expr)?;
337                    let _ = query::plan_default_expr(scx, &expr, &ty)?;
338                    default = expr.clone();
339                }
340                ColumnOption::Unique { is_primary } => {
341                    keys.push(vec![i]);
342                    if *is_primary {
343                        nullable = false;
344                    }
345                }
346                ColumnOption::Versioned { action, version } => {
347                    let version = RelationVersion::from(*version);
348                    versioned = true;
349
350                    let name = normalize::column_name(c.name.clone());
351                    let typ = ty.clone().nullable(nullable);
352
353                    changes.insert(version, (action.clone(), name, typ));
354                }
355                other => {
356                    bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
357                }
358            }
359        }
360        // TODO(alter_table): This assumes all versioned columns are at the
361        // end. This will no longer be true when we support dropping columns.
362        if !versioned {
363            column_types.push(ty.nullable(nullable));
364        }
365        defaults.push(default);
366    }
367
368    let mut seen_primary = false;
369    'c: for constraint in constraints {
370        match constraint {
371            TableConstraint::Unique {
372                name: _,
373                columns,
374                is_primary,
375                nulls_not_distinct,
376            } => {
377                if seen_primary && *is_primary {
378                    sql_bail!(
379                        "multiple primary keys for table {} are not allowed",
380                        name.to_ast_string_stable()
381                    );
382                }
383                seen_primary = *is_primary || seen_primary;
384
385                let mut key = vec![];
386                for column in columns {
387                    let column = normalize::column_name(column.clone());
388                    match names.iter().position(|name| *name == column) {
389                        None => sql_bail!("unknown column in constraint: {}", column),
390                        Some(i) => {
391                            let nullable = &mut column_types[i].nullable;
392                            if *is_primary {
393                                if *nulls_not_distinct {
394                                    sql_bail!(
395                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
396                                    );
397                                }
398
399                                *nullable = false;
400                            } else if !(*nulls_not_distinct || !*nullable) {
401                                // Non-primary key unique constraints are only keys if all of their
402                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
403                                break 'c;
404                            }
405
406                            key.push(i);
407                        }
408                    }
409                }
410
411                if *is_primary {
412                    keys.insert(0, key);
413                } else {
414                    keys.push(key);
415                }
416            }
417            TableConstraint::ForeignKey { .. } => {
418                // Foreign key constraints are not presently enforced. We allow
419                // them with feature flags for sqllogictest's sake.
420                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
421            }
422            TableConstraint::Check { .. } => {
423                // Check constraints are not presently enforced. We allow them
424                // with feature flags for sqllogictest's sake.
425                scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
426            }
427        }
428    }
429
430    if !keys.is_empty() {
431        // Unique constraints are not presently enforced. We allow them with feature flags for
432        // sqllogictest's sake.
433        scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
434    }
435
436    let typ = SqlRelationType::new(column_types).with_keys(keys);
437
438    let temporary = *temporary;
439    let name = if temporary {
440        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
441    } else {
442        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
443    };
444
445    // Check for an object in the catalog with this same name
446    let full_name = scx.catalog.resolve_full_name(&name);
447    let partial_name = PartialItemName::from(full_name.clone());
448    // For PostgreSQL compatibility, we need to prevent creating tables when
449    // there is an existing object *or* type of the same name.
450    if let (false, Ok(item)) = (
451        if_not_exists,
452        scx.catalog.resolve_item_or_type(&partial_name),
453    ) {
454        return Err(PlanError::ItemAlreadyExists {
455            name: full_name.to_string(),
456            item_type: item.item_type(),
457        });
458    }
459
460    let desc = RelationDesc::new(typ, names);
461    let mut desc = VersionedRelationDesc::new(desc);
462    for (version, (_action, name, typ)) in changes.into_iter() {
463        let new_version = desc.add_column(name, typ);
464        if version != new_version {
465            return Err(PlanError::InvalidTable {
466                name: full_name.item,
467            });
468        }
469    }
470
471    let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
472
473    // Table options should only consider the original columns, since those
474    // were the only ones in scope when the table was created.
475    //
476    // TODO(alter_table): Will need to reconsider this when we support ALTERing
477    // the PARTITION BY columns.
478    let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
479    let options = plan_table_options(scx, &original_desc, with_options.clone())?;
480
481    let compaction_window = options.iter().find_map(|o| {
482        #[allow(irrefutable_let_patterns)]
483        if let crate::plan::TableOption::RetainHistory(lcw) = o {
484            Some(lcw.clone())
485        } else {
486            None
487        }
488    });
489
490    let table = Table {
491        create_sql,
492        desc,
493        temporary,
494        compaction_window,
495        data_source: TableDataSource::TableWrites { defaults },
496    };
497    Ok(Plan::CreateTable(CreateTablePlan {
498        name,
499        table,
500        if_not_exists: *if_not_exists,
501    }))
502}
503
504pub fn describe_create_table_from_source(
505    _: &StatementContext,
506    _: CreateTableFromSourceStatement<Aug>,
507) -> Result<StatementDesc, PlanError> {
508    Ok(StatementDesc::new(None))
509}
510
511pub fn describe_create_webhook_source(
512    _: &StatementContext,
513    _: CreateWebhookSourceStatement<Aug>,
514) -> Result<StatementDesc, PlanError> {
515    Ok(StatementDesc::new(None))
516}
517
518pub fn describe_create_source(
519    _: &StatementContext,
520    _: CreateSourceStatement<Aug>,
521) -> Result<StatementDesc, PlanError> {
522    Ok(StatementDesc::new(None))
523}
524
525pub fn describe_create_subsource(
526    _: &StatementContext,
527    _: CreateSubsourceStatement<Aug>,
528) -> Result<StatementDesc, PlanError> {
529    Ok(StatementDesc::new(None))
530}
531
532generate_extracted_config!(
533    CreateSourceOption,
534    (TimestampInterval, Duration),
535    (RetainHistory, OptionalDuration)
536);
537
538generate_extracted_config!(
539    PgConfigOption,
540    (Details, String),
541    (Publication, String),
542    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
543    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
544);
545
546generate_extracted_config!(
547    MySqlConfigOption,
548    (Details, String),
549    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
550    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
551);
552
553generate_extracted_config!(
554    SqlServerConfigOption,
555    (Details, String),
556    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
557    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
558);
559
560pub fn plan_create_webhook_source(
561    scx: &StatementContext,
562    mut stmt: CreateWebhookSourceStatement<Aug>,
563) -> Result<Plan, PlanError> {
564    if stmt.is_table {
565        scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
566    }
567
568    // We will rewrite the cluster if one is not provided, so we must use the `in_cluster` value
569    // we plan to normalize when we canonicalize the create statement.
570    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
571    let create_sql =
572        normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
573
574    let CreateWebhookSourceStatement {
575        name,
576        if_not_exists,
577        body_format,
578        include_headers,
579        validate_using,
580        is_table,
581        // We resolved `in_cluster` above, so we want to ignore it here.
582        in_cluster: _,
583    } = stmt;
584
585    let validate_using = validate_using
586        .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
587        .transpose()?;
588    if let Some(WebhookValidation { expression, .. }) = &validate_using {
589        // If the validation expression doesn't reference any part of the request, then we should
590        // return an error because it's almost definitely wrong.
591        if !expression.contains_column() {
592            return Err(PlanError::WebhookValidationDoesNotUseColumns);
593        }
594        // Validation expressions cannot contain unmaterializable functions, except `now()`. We
595        // allow calls to `now()` because some webhook providers recommend rejecting requests that
596        // are older than a certain threshold.
597        if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
598            return Err(PlanError::WebhookValidationNonDeterministic);
599        }
600    }
601
602    let body_format = match body_format {
603        Format::Bytes => WebhookBodyFormat::Bytes,
604        Format::Json { array } => WebhookBodyFormat::Json { array },
605        Format::Text => WebhookBodyFormat::Text,
606        // TODO(parkmycar): Make an issue to support more types, or change this to NeverSupported.
607        ty => {
608            return Err(PlanError::Unsupported {
609                feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
610                discussion_no: None,
611            });
612        }
613    };
614
615    let mut column_ty = vec![
616        // Always include the body of the request as the first column.
617        SqlColumnType {
618            scalar_type: SqlScalarType::from(body_format),
619            nullable: false,
620        },
621    ];
622    let mut column_names = vec!["body".to_string()];
623
624    let mut headers = WebhookHeaders::default();
625
626    // Include a `headers` column, possibly filtered.
627    if let Some(filters) = include_headers.column {
628        column_ty.push(SqlColumnType {
629            scalar_type: SqlScalarType::Map {
630                value_type: Box::new(SqlScalarType::String),
631                custom_id: None,
632            },
633            nullable: false,
634        });
635        column_names.push("headers".to_string());
636
637        let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
638            filters.into_iter().partition_map(|filter| {
639                if filter.block {
640                    itertools::Either::Right(filter.header_name)
641                } else {
642                    itertools::Either::Left(filter.header_name)
643                }
644            });
645        headers.header_column = Some(WebhookHeaderFilters { allow, block });
646    }
647
648    // Map headers to specific columns.
649    for header in include_headers.mappings {
650        let scalar_type = header
651            .use_bytes
652            .then_some(SqlScalarType::Bytes)
653            .unwrap_or(SqlScalarType::String);
654        column_ty.push(SqlColumnType {
655            scalar_type,
656            nullable: true,
657        });
658        column_names.push(header.column_name.into_string());
659
660        let column_idx = column_ty.len() - 1;
661        // Double check we're consistent with column names.
662        assert_eq!(
663            column_idx,
664            column_names.len() - 1,
665            "header column names and types don't match"
666        );
667        headers
668            .mapped_headers
669            .insert(column_idx, (header.header_name, header.use_bytes));
670    }
671
672    // Validate our columns.
673    let mut unique_check = HashSet::with_capacity(column_names.len());
674    for name in &column_names {
675        if !unique_check.insert(name) {
676            return Err(PlanError::AmbiguousColumn(name.clone().into()));
677        }
678    }
679    if column_names.len() > MAX_NUM_COLUMNS {
680        return Err(PlanError::TooManyColumns {
681            max_num_columns: MAX_NUM_COLUMNS,
682            req_num_columns: column_names.len(),
683        });
684    }
685
686    let typ = SqlRelationType::new(column_ty);
687    let desc = RelationDesc::new(typ, column_names);
688
689    // Check for an object in the catalog with this same name
690    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
691    let full_name = scx.catalog.resolve_full_name(&name);
692    let partial_name = PartialItemName::from(full_name.clone());
693    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
694        return Err(PlanError::ItemAlreadyExists {
695            name: full_name.to_string(),
696            item_type: item.item_type(),
697        });
698    }
699
700    // Note(parkmycar): We don't currently support specifying a timeline for Webhook sources. As
701    // such, we always use a default of EpochMilliseconds.
702    let timeline = Timeline::EpochMilliseconds;
703
704    let plan = if is_table {
705        let data_source = DataSourceDesc::Webhook {
706            validate_using,
707            body_format,
708            headers,
709            cluster_id: Some(in_cluster.id()),
710        };
711        let data_source = TableDataSource::DataSource {
712            desc: data_source,
713            timeline,
714        };
715        Plan::CreateTable(CreateTablePlan {
716            name,
717            if_not_exists,
718            table: Table {
719                create_sql,
720                desc: VersionedRelationDesc::new(desc),
721                temporary: false,
722                compaction_window: None,
723                data_source,
724            },
725        })
726    } else {
727        let data_source = DataSourceDesc::Webhook {
728            validate_using,
729            body_format,
730            headers,
731            // Important: The cluster is set at the `Source` level.
732            cluster_id: None,
733        };
734        Plan::CreateSource(CreateSourcePlan {
735            name,
736            source: Source {
737                create_sql,
738                data_source,
739                desc,
740                compaction_window: None,
741            },
742            if_not_exists,
743            timeline,
744            in_cluster: Some(in_cluster.id()),
745        })
746    };
747
748    Ok(plan)
749}
750
751pub fn plan_create_source(
752    scx: &StatementContext,
753    mut stmt: CreateSourceStatement<Aug>,
754) -> Result<Plan, PlanError> {
755    let CreateSourceStatement {
756        name,
757        in_cluster: _,
758        col_names,
759        connection: source_connection,
760        envelope,
761        if_not_exists,
762        format,
763        key_constraint,
764        include_metadata,
765        with_options,
766        external_references: referenced_subsources,
767        progress_subsource,
768    } = &stmt;
769
770    mz_ore::soft_assert_or_log!(
771        referenced_subsources.is_none(),
772        "referenced subsources must be cleared in purification"
773    );
774
775    let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
776        && scx.catalog.system_vars().force_source_table_syntax();
777
778    // If the new source table syntax is forced all the options related to the primary
779    // source output should be un-set.
780    if force_source_table_syntax {
781        if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
782            Err(PlanError::UseTablesForSources(
783                "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
784            ))?;
785        }
786    }
787
788    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
789
790    if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
791        && include_metadata
792            .iter()
793            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
794    {
795        // TODO(guswynn): should this be `bail_unsupported!`?
796        sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
797    }
798    if !matches!(
799        source_connection,
800        CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
801    ) && !include_metadata.is_empty()
802    {
803        bail_unsupported!("INCLUDE metadata with non-Kafka sources");
804    }
805
806    if !include_metadata.is_empty()
807        && !matches!(
808            envelope,
809            ast::SourceEnvelope::Upsert { .. }
810                | ast::SourceEnvelope::None
811                | ast::SourceEnvelope::Debezium
812        )
813    {
814        sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
815    }
816
817    let external_connection =
818        plan_generic_source_connection(scx, source_connection, include_metadata)?;
819
820    let CreateSourceOptionExtracted {
821        timestamp_interval,
822        retain_history,
823        seen: _,
824    } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
825
826    let metadata_columns_desc = match external_connection {
827        GenericSourceConnection::Kafka(KafkaSourceConnection {
828            ref metadata_columns,
829            ..
830        }) => kafka_metadata_columns_desc(metadata_columns),
831        _ => vec![],
832    };
833
834    // Generate the relation description for the primary export of the source.
835    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
836        scx,
837        &envelope,
838        format,
839        Some(external_connection.default_key_desc()),
840        external_connection.default_value_desc(),
841        include_metadata,
842        metadata_columns_desc,
843        &external_connection,
844    )?;
845    plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
846
847    let names: Vec<_> = desc.iter_names().cloned().collect();
848    if let Some(dup) = names.iter().duplicates().next() {
849        sql_bail!("column {} specified more than once", dup.quoted());
850    }
851
852    // Apply user-specified key constraint
853    if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
854        // Don't remove this without addressing
855        // https://github.com/MaterializeInc/database-issues/issues/4371.
856        scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
857
858        let key_columns = columns
859            .into_iter()
860            .map(normalize::column_name)
861            .collect::<Vec<_>>();
862
863        let mut uniq = BTreeSet::new();
864        for col in key_columns.iter() {
865            if !uniq.insert(col) {
866                sql_bail!("Repeated column name in source key constraint: {}", col);
867            }
868        }
869
870        let key_indices = key_columns
871            .iter()
872            .map(|col| {
873                let name_idx = desc
874                    .get_by_name(col)
875                    .map(|(idx, _type)| idx)
876                    .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
877                if desc.get_unambiguous_name(name_idx).is_none() {
878                    sql_bail!("Ambiguous column in source key constraint: {}", col);
879                }
880                Ok(name_idx)
881            })
882            .collect::<Result<Vec<_>, _>>()?;
883
884        if !desc.typ().keys.is_empty() {
885            return Err(key_constraint_err(&desc, &key_columns));
886        } else {
887            desc = desc.with_key(key_indices);
888        }
889    }
890
891    let timestamp_interval = match timestamp_interval {
892        Some(duration) => {
893            // Only validate bounds for new statements (pcx is Some), not during
894            // catalog deserialization (pcx is None). Previously persisted sources
895            // may have intervals that no longer fall within the current bounds.
896            if scx.pcx.is_some() {
897                let min = scx.catalog.system_vars().min_timestamp_interval();
898                let max = scx.catalog.system_vars().max_timestamp_interval();
899                if duration < min || duration > max {
900                    return Err(PlanError::InvalidTimestampInterval {
901                        min,
902                        max,
903                        requested: duration,
904                    });
905                }
906            }
907            duration
908        }
909        None => scx.catalog.system_vars().default_timestamp_interval(),
910    };
911
912    let (desc, data_source) = match progress_subsource {
913        Some(name) => {
914            let DeferredItemName::Named(name) = name else {
915                sql_bail!("[internal error] progress subsource must be named during purification");
916            };
917            let ResolvedItemName::Item { id, .. } = name else {
918                sql_bail!("[internal error] invalid target id");
919            };
920
921            let details = match external_connection {
922                GenericSourceConnection::Kafka(ref c) => {
923                    SourceExportDetails::Kafka(KafkaSourceExportDetails {
924                        metadata_columns: c.metadata_columns.clone(),
925                    })
926                }
927                GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
928                    LoadGenerator::Auction
929                    | LoadGenerator::Marketing
930                    | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
931                    LoadGenerator::Counter { .. }
932                    | LoadGenerator::Clock
933                    | LoadGenerator::Datums
934                    | LoadGenerator::KeyValue(_) => {
935                        SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
936                            output: LoadGeneratorOutput::Default,
937                        })
938                    }
939                },
940                GenericSourceConnection::Postgres(_)
941                | GenericSourceConnection::MySql(_)
942                | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
943            };
944
945            let data_source = DataSourceDesc::OldSyntaxIngestion {
946                desc: SourceDesc {
947                    connection: external_connection,
948                    timestamp_interval,
949                },
950                progress_subsource: *id,
951                data_config: SourceExportDataConfig {
952                    encoding,
953                    envelope: envelope.clone(),
954                },
955                details,
956            };
957            (desc, data_source)
958        }
959        None => {
960            let desc = external_connection.timestamp_desc();
961            let data_source = DataSourceDesc::Ingestion(SourceDesc {
962                connection: external_connection,
963                timestamp_interval,
964            });
965            (desc, data_source)
966        }
967    };
968
969    let if_not_exists = *if_not_exists;
970    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
971
972    // Check for an object in the catalog with this same name
973    let full_name = scx.catalog.resolve_full_name(&name);
974    let partial_name = PartialItemName::from(full_name.clone());
975    // For PostgreSQL compatibility, we need to prevent creating sources when
976    // there is an existing object *or* type of the same name.
977    if let (false, Ok(item)) = (
978        if_not_exists,
979        scx.catalog.resolve_item_or_type(&partial_name),
980    ) {
981        return Err(PlanError::ItemAlreadyExists {
982            name: full_name.to_string(),
983            item_type: item.item_type(),
984        });
985    }
986
987    // We will rewrite the cluster if one is not provided, so we must use the
988    // `in_cluster` value we plan to normalize when we canonicalize the create
989    // statement.
990    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
991
992    let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
993
994    // Determine a default timeline for the source.
995    let timeline = match envelope {
996        SourceEnvelope::CdcV2 => {
997            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
998        }
999        _ => Timeline::EpochMilliseconds,
1000    };
1001
1002    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1003
1004    let source = Source {
1005        create_sql,
1006        data_source,
1007        desc,
1008        compaction_window,
1009    };
1010
1011    Ok(Plan::CreateSource(CreateSourcePlan {
1012        name,
1013        source,
1014        if_not_exists,
1015        timeline,
1016        in_cluster: Some(in_cluster.id()),
1017    }))
1018}
1019
1020pub fn plan_generic_source_connection(
1021    scx: &StatementContext<'_>,
1022    source_connection: &CreateSourceConnection<Aug>,
1023    include_metadata: &Vec<SourceIncludeMetadata>,
1024) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1025    Ok(match source_connection {
1026        CreateSourceConnection::Kafka {
1027            connection,
1028            options,
1029        } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1030            scx,
1031            connection,
1032            options,
1033            include_metadata,
1034        )?),
1035        CreateSourceConnection::Postgres {
1036            connection,
1037            options,
1038        } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1039            scx, connection, options,
1040        )?),
1041        CreateSourceConnection::SqlServer {
1042            connection,
1043            options,
1044        } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1045            scx, connection, options,
1046        )?),
1047        CreateSourceConnection::MySql {
1048            connection,
1049            options,
1050        } => {
1051            GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1052        }
1053        CreateSourceConnection::LoadGenerator { generator, options } => {
1054            GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1055                scx,
1056                generator,
1057                options,
1058                include_metadata,
1059            )?)
1060        }
1061    })
1062}
1063
1064fn plan_load_generator_source_connection(
1065    scx: &StatementContext<'_>,
1066    generator: &ast::LoadGenerator,
1067    options: &Vec<LoadGeneratorOption<Aug>>,
1068    include_metadata: &Vec<SourceIncludeMetadata>,
1069) -> Result<LoadGeneratorSourceConnection, PlanError> {
1070    let load_generator =
1071        load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1072    let LoadGeneratorOptionExtracted {
1073        tick_interval,
1074        as_of,
1075        up_to,
1076        ..
1077    } = options.clone().try_into()?;
1078    let tick_micros = match tick_interval {
1079        Some(interval) => Some(interval.as_micros().try_into()?),
1080        None => None,
1081    };
1082    if up_to < as_of {
1083        sql_bail!("UP TO cannot be less than AS OF");
1084    }
1085    Ok(LoadGeneratorSourceConnection {
1086        load_generator,
1087        tick_micros,
1088        as_of,
1089        up_to,
1090    })
1091}
1092
1093fn plan_mysql_source_connection(
1094    scx: &StatementContext<'_>,
1095    connection: &ResolvedItemName,
1096    options: &Vec<MySqlConfigOption<Aug>>,
1097) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1098    let connection_item = scx.get_item_by_resolved_name(connection)?;
1099    match connection_item.connection()? {
1100        Connection::MySql(connection) => connection,
1101        _ => sql_bail!(
1102            "{} is not a MySQL connection",
1103            scx.catalog.resolve_full_name(connection_item.name())
1104        ),
1105    };
1106    let MySqlConfigOptionExtracted {
1107        details,
1108        // text/exclude columns are already part of the source-exports and are only included
1109        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1110        // be removed once we drop support for implicitly created subsources.
1111        text_columns: _,
1112        exclude_columns: _,
1113        seen: _,
1114    } = options.clone().try_into()?;
1115    let details = details
1116        .as_ref()
1117        .ok_or_else(|| internal_err!("MySQL source missing details"))?;
1118    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1119    let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1120    let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1121    Ok(MySqlSourceConnection {
1122        connection: connection_item.id(),
1123        connection_id: connection_item.id(),
1124        details,
1125    })
1126}
1127
1128fn plan_sqlserver_source_connection(
1129    scx: &StatementContext<'_>,
1130    connection: &ResolvedItemName,
1131    options: &Vec<SqlServerConfigOption<Aug>>,
1132) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1133    let connection_item = scx.get_item_by_resolved_name(connection)?;
1134    match connection_item.connection()? {
1135        Connection::SqlServer(connection) => connection,
1136        _ => sql_bail!(
1137            "{} is not a SQL Server connection",
1138            scx.catalog.resolve_full_name(connection_item.name())
1139        ),
1140    };
1141    let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1142    let details = details
1143        .as_ref()
1144        .ok_or_else(|| internal_err!("SQL Server source missing details"))?;
1145    let extras = hex::decode(details)
1146        .map_err(|e| sql_err!("{e}"))
1147        .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1148        .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1149    Ok(SqlServerSourceConnection {
1150        connection_id: connection_item.id(),
1151        connection: connection_item.id(),
1152        extras,
1153    })
1154}
1155
1156fn plan_postgres_source_connection(
1157    scx: &StatementContext<'_>,
1158    connection: &ResolvedItemName,
1159    options: &Vec<PgConfigOption<Aug>>,
1160) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1161    let connection_item = scx.get_item_by_resolved_name(connection)?;
1162    let PgConfigOptionExtracted {
1163        details,
1164        publication,
1165        // text columns are already part of the source-exports and are only included
1166        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1167        // be removed once we drop support for implicitly created subsources.
1168        text_columns: _,
1169        // exclude columns are already part of the source-exports and are only included
1170        // in these options for round-tripping of a `CREATE SOURCE` statement. This should
1171        // be removed once we drop support for implicitly created subsources.
1172        exclude_columns: _,
1173        seen: _,
1174    } = options.clone().try_into()?;
1175    let details = details
1176        .as_ref()
1177        .ok_or_else(|| internal_err!("Postgres source missing details"))?;
1178    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1179    let details =
1180        ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1181    let publication_details =
1182        PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1183    Ok(PostgresSourceConnection {
1184        connection: connection_item.id(),
1185        connection_id: connection_item.id(),
1186        // Validated during purification.
1187        publication: publication.ok_or_else(|| internal_err!("PUBLICATION option is required"))?,
1188        publication_details,
1189    })
1190}
1191
1192fn plan_kafka_source_connection(
1193    scx: &StatementContext<'_>,
1194    connection_name: &ResolvedItemName,
1195    options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1196    include_metadata: &Vec<SourceIncludeMetadata>,
1197) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1198    let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1199    if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1200        sql_bail!(
1201            "{} is not a kafka connection",
1202            scx.catalog.resolve_full_name(connection_item.name())
1203        )
1204    }
1205    let KafkaSourceConfigOptionExtracted {
1206        group_id_prefix,
1207        topic,
1208        mut topic_metadata_refresh_interval,
1209        start_timestamp: _, // purified into `start_offset`
1210        start_offset,
1211        seen: _,
1212    }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1213    // Validated during purification.
1214    let topic = topic.ok_or_else(|| internal_err!("TOPIC option is required"))?;
1215    let mut start_offsets = BTreeMap::new();
1216    if let Some(offsets) = start_offset {
1217        for (part, offset) in offsets.iter().enumerate() {
1218            if *offset < 0 {
1219                sql_bail!("START OFFSET must be a nonnegative integer");
1220            }
1221            start_offsets.insert(i32::try_from(part)?, *offset);
1222        }
1223    }
1224    if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1225        // This is a librdkafka-enforced restriction that, if violated,
1226        // would result in a runtime error for the source.
1227        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1228    }
1229    if topic_metadata_refresh_interval < MIN_KAFKA_TOPIC_METADATA_REFRESH_INTERVAL {
1230        // We enforce a minimum of 1 second refresh interval to prevent overloading the topic
1231        topic_metadata_refresh_interval = MIN_KAFKA_TOPIC_METADATA_REFRESH_INTERVAL;
1232    }
1233    let metadata_columns = include_metadata
1234        .into_iter()
1235        .flat_map(|item| match item {
1236            SourceIncludeMetadata::Timestamp { alias } => {
1237                let name = match alias {
1238                    Some(name) => name.to_string(),
1239                    None => "timestamp".to_owned(),
1240                };
1241                Some((name, KafkaMetadataKind::Timestamp))
1242            }
1243            SourceIncludeMetadata::Partition { alias } => {
1244                let name = match alias {
1245                    Some(name) => name.to_string(),
1246                    None => "partition".to_owned(),
1247                };
1248                Some((name, KafkaMetadataKind::Partition))
1249            }
1250            SourceIncludeMetadata::Offset { alias } => {
1251                let name = match alias {
1252                    Some(name) => name.to_string(),
1253                    None => "offset".to_owned(),
1254                };
1255                Some((name, KafkaMetadataKind::Offset))
1256            }
1257            SourceIncludeMetadata::Headers { alias } => {
1258                let name = match alias {
1259                    Some(name) => name.to_string(),
1260                    None => "headers".to_owned(),
1261                };
1262                Some((name, KafkaMetadataKind::Headers))
1263            }
1264            SourceIncludeMetadata::Header {
1265                alias,
1266                key,
1267                use_bytes,
1268            } => Some((
1269                alias.to_string(),
1270                KafkaMetadataKind::Header {
1271                    key: key.clone(),
1272                    use_bytes: *use_bytes,
1273                },
1274            )),
1275            SourceIncludeMetadata::Key { .. } => {
1276                // handled below
1277                None
1278            }
1279        })
1280        .collect();
1281    Ok(KafkaSourceConnection {
1282        connection: connection_item.id(),
1283        connection_id: connection_item.id(),
1284        topic,
1285        start_offsets,
1286        group_id_prefix,
1287        topic_metadata_refresh_interval,
1288        metadata_columns,
1289    })
1290}
1291
1292fn apply_source_envelope_encoding(
1293    scx: &StatementContext,
1294    envelope: &ast::SourceEnvelope,
1295    format: &Option<FormatSpecifier<Aug>>,
1296    key_desc: Option<RelationDesc>,
1297    value_desc: RelationDesc,
1298    include_metadata: &[SourceIncludeMetadata],
1299    metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1300    source_connection: &GenericSourceConnection<ReferencedConnection>,
1301) -> Result<
1302    (
1303        RelationDesc,
1304        SourceEnvelope,
1305        Option<SourceDataEncoding<ReferencedConnection>>,
1306    ),
1307    PlanError,
1308> {
1309    let encoding = match format {
1310        Some(format) => Some(get_encoding(scx, format, envelope)?),
1311        None => None,
1312    };
1313
1314    let (key_desc, value_desc) = match &encoding {
1315        Some(encoding) => {
1316            // If we are applying an encoding we need to ensure that the incoming value_desc is a
1317            // single column of type bytes.
1318            match value_desc.typ().columns() {
1319                [typ] => match typ.scalar_type {
1320                    SqlScalarType::Bytes => {}
1321                    _ => sql_bail!(
1322                        "The schema produced by the source is incompatible with format decoding"
1323                    ),
1324                },
1325                _ => sql_bail!(
1326                    "The schema produced by the source is incompatible with format decoding"
1327                ),
1328            }
1329
1330            let (key_desc, value_desc) = encoding.desc()?;
1331
1332            // TODO(petrosagg): This piece of code seems to be making a statement about the
1333            // nullability of the NONE envelope when the source is Kafka. As written, the code
1334            // misses opportunities to mark columns as not nullable and is over conservative. For
1335            // example in the case of `FORMAT BYTES ENVELOPE NONE` the output is indeed
1336            // non-nullable but we will mark it as nullable anyway. This kind of crude reasoning
1337            // should be replaced with precise type-level reasoning.
1338            let key_desc = key_desc.map(|desc| {
1339                let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1340                let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1341                if is_kafka && is_envelope_none {
1342                    RelationDesc::from_names_and_types(
1343                        desc.into_iter()
1344                            .map(|(name, typ)| (name, typ.nullable(true))),
1345                    )
1346                } else {
1347                    desc
1348                }
1349            });
1350            (key_desc, value_desc)
1351        }
1352        None => (key_desc, value_desc),
1353    };
1354
1355    // KEY VALUE load generators are the only UPSERT source that
1356    // has no encoding but defaults to `INCLUDE KEY`.
1357    //
1358    // As discussed
1359    // <https://github.com/MaterializeInc/materialize/pull/26246#issuecomment-2023558097>,
1360    // removing this special case amounts to deciding how to handle null keys
1361    // from sources, in a holistic way. We aren't yet prepared to do this, so we leave
1362    // this special case in.
1363    //
1364    // Note that this is safe because this generator is
1365    // 1. The only source with no encoding that can have its key included.
1366    // 2. Never produces null keys (or values, for that matter).
1367    let key_envelope_no_encoding = matches!(
1368        source_connection,
1369        GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1370            load_generator: LoadGenerator::KeyValue(_),
1371            ..
1372        })
1373    );
1374    let mut key_envelope = get_key_envelope(
1375        include_metadata,
1376        encoding.as_ref(),
1377        key_envelope_no_encoding,
1378    )?;
1379
1380    match (&envelope, &key_envelope) {
1381        (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1382        (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1383            "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1384        ),
1385        _ => {}
1386    };
1387
1388    // Not all source envelopes are compatible with all source connections.
1389    // Whoever constructs the source ingestion pipeline is responsible for
1390    // choosing compatible envelopes and connections.
1391    //
1392    // TODO(guswynn): ambiguously assert which connections and envelopes are
1393    // compatible in typechecking
1394    //
1395    // TODO: remove bails as more support for upsert is added.
1396    let envelope = match &envelope {
1397        // TODO: fixup key envelope
1398        ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1399        ast::SourceEnvelope::Debezium => {
1400            //TODO check that key envelope is not set
1401            let after_idx = match typecheck_debezium(&value_desc) {
1402                Ok((_before_idx, after_idx)) => Ok(after_idx),
1403                Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1404                    Some(DataEncoding::Avro(_)) => Err(type_err),
1405                    _ => Err(sql_err!(
1406                        "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1407                    )),
1408                },
1409            }?;
1410
1411            UnplannedSourceEnvelope::Upsert {
1412                style: UpsertStyle::Debezium { after_idx },
1413            }
1414        }
1415        ast::SourceEnvelope::Upsert {
1416            value_decode_err_policy,
1417        } => {
1418            let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1419                None => {
1420                    if !key_envelope_no_encoding {
1421                        bail_unsupported!(format!(
1422                            "UPSERT requires a key/value format: {:?}",
1423                            format
1424                        ))
1425                    }
1426                    None
1427                }
1428                Some(key_encoding) => Some(key_encoding),
1429            };
1430            // `ENVELOPE UPSERT` implies `INCLUDE KEY`, if it is not explicitly
1431            // specified.
1432            if key_envelope == KeyEnvelope::None {
1433                key_envelope = get_unnamed_key_envelope(key_encoding)?;
1434            }
1435            // If the value decode error policy is not set we use the default upsert style.
1436            let style = match value_decode_err_policy.as_slice() {
1437                [] => UpsertStyle::Default(key_envelope),
1438                [SourceErrorPolicy::Inline { alias }] => {
1439                    scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1440                    UpsertStyle::ValueErrInline {
1441                        key_envelope,
1442                        error_column: alias
1443                            .as_ref()
1444                            .map_or_else(|| "error".to_string(), |a| a.to_string()),
1445                    }
1446                }
1447                _ => {
1448                    bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1449                }
1450            };
1451
1452            UnplannedSourceEnvelope::Upsert { style }
1453        }
1454        ast::SourceEnvelope::CdcV2 => {
1455            scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1456            //TODO check that key envelope is not set
1457            match format {
1458                Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1459                _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1460            }
1461            UnplannedSourceEnvelope::CdcV2
1462        }
1463    };
1464
1465    let metadata_desc = included_column_desc(metadata_columns_desc);
1466    let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1467
1468    Ok((desc, envelope, encoding))
1469}
1470
1471/// Plans the RelationDesc for a source export (subsource or table) that has a defined list
1472/// of columns and constraints.
1473fn plan_source_export_desc(
1474    scx: &StatementContext,
1475    name: &UnresolvedItemName,
1476    columns: &Vec<ColumnDef<Aug>>,
1477    constraints: &Vec<TableConstraint<Aug>>,
1478) -> Result<RelationDesc, PlanError> {
1479    let names: Vec<_> = columns
1480        .iter()
1481        .map(|c| normalize::column_name(c.name.clone()))
1482        .collect();
1483
1484    if let Some(dup) = names.iter().duplicates().next() {
1485        sql_bail!("column {} specified more than once", dup.quoted());
1486    }
1487
1488    // Build initial relation type that handles declared data types
1489    // and NOT NULL constraints.
1490    let mut column_types = Vec::with_capacity(columns.len());
1491    let mut keys = Vec::new();
1492
1493    for (i, c) in columns.into_iter().enumerate() {
1494        let aug_data_type = &c.data_type;
1495        let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1496        let mut nullable = true;
1497        for option in &c.options {
1498            match &option.option {
1499                ColumnOption::NotNull => nullable = false,
1500                ColumnOption::Default(_) => {
1501                    bail_unsupported!("Source export with default value")
1502                }
1503                ColumnOption::Unique { is_primary } => {
1504                    keys.push(vec![i]);
1505                    if *is_primary {
1506                        nullable = false;
1507                    }
1508                }
1509                other => {
1510                    bail_unsupported!(format!("Source export with column constraint: {}", other))
1511                }
1512            }
1513        }
1514        column_types.push(ty.nullable(nullable));
1515    }
1516
1517    let mut seen_primary = false;
1518    'c: for constraint in constraints {
1519        match constraint {
1520            TableConstraint::Unique {
1521                name: _,
1522                columns,
1523                is_primary,
1524                nulls_not_distinct,
1525            } => {
1526                if seen_primary && *is_primary {
1527                    sql_bail!(
1528                        "multiple primary keys for source export {} are not allowed",
1529                        name.to_ast_string_stable()
1530                    );
1531                }
1532                seen_primary = *is_primary || seen_primary;
1533
1534                let mut key = vec![];
1535                for column in columns {
1536                    let column = normalize::column_name(column.clone());
1537                    match names.iter().position(|name| *name == column) {
1538                        None => sql_bail!("unknown column in constraint: {}", column),
1539                        Some(i) => {
1540                            let nullable = &mut column_types[i].nullable;
1541                            if *is_primary {
1542                                if *nulls_not_distinct {
1543                                    sql_bail!(
1544                                        "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1545                                    );
1546                                }
1547                                *nullable = false;
1548                            } else if !(*nulls_not_distinct || !*nullable) {
1549                                // Non-primary key unique constraints are only keys if all of their
1550                                // columns are `NOT NULL` or the constraint is `NULLS NOT DISTINCT`.
1551                                break 'c;
1552                            }
1553
1554                            key.push(i);
1555                        }
1556                    }
1557                }
1558
1559                if *is_primary {
1560                    keys.insert(0, key);
1561                } else {
1562                    keys.push(key);
1563                }
1564            }
1565            TableConstraint::ForeignKey { .. } => {
1566                bail_unsupported!("Source export with a foreign key")
1567            }
1568            TableConstraint::Check { .. } => {
1569                bail_unsupported!("Source export with a check constraint")
1570            }
1571        }
1572    }
1573
1574    let typ = SqlRelationType::new(column_types).with_keys(keys);
1575    let desc = RelationDesc::new(typ, names);
1576    Ok(desc)
1577}
1578
1579generate_extracted_config!(
1580    CreateSubsourceOption,
1581    (Progress, bool, Default(false)),
1582    (ExternalReference, UnresolvedItemName),
1583    (RetainHistory, OptionalDuration),
1584    (TextColumns, Vec::<Ident>, Default(vec![])),
1585    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1586    (Details, String)
1587);
1588
1589pub fn plan_create_subsource(
1590    scx: &StatementContext,
1591    stmt: CreateSubsourceStatement<Aug>,
1592) -> Result<Plan, PlanError> {
1593    let CreateSubsourceStatement {
1594        name,
1595        columns,
1596        of_source,
1597        constraints,
1598        if_not_exists,
1599        with_options,
1600    } = &stmt;
1601
1602    let CreateSubsourceOptionExtracted {
1603        progress,
1604        retain_history,
1605        external_reference,
1606        text_columns,
1607        exclude_columns,
1608        details,
1609        seen: _,
1610    } = with_options.clone().try_into()?;
1611
1612    // This invariant is enforced during purification; we are responsible for
1613    // creating the AST for subsources as a response to CREATE SOURCE
1614    // statements, so this would fire in integration testing if we failed to
1615    // uphold it.
1616    if !(progress ^ (external_reference.is_some() && of_source.is_some())) {
1617        bail_internal!(
1618            "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1619        );
1620    }
1621
1622    let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1623
1624    let data_source = if let Some(source_reference) = of_source {
1625        // If the new source table syntax is forced we should not be creating any non-progress
1626        // subsources.
1627        if scx.catalog.system_vars().enable_create_table_from_source()
1628            && scx.catalog.system_vars().force_source_table_syntax()
1629        {
1630            Err(PlanError::UseTablesForSources(
1631                "CREATE SUBSOURCE".to_string(),
1632            ))?;
1633        }
1634
1635        // This is a subsource with the "natural" dependency order, i.e. it is
1636        // not a legacy subsource with the inverted structure.
1637        let ingestion_id = *source_reference.item_id();
1638        let external_reference = external_reference.ok_or_else(|| {
1639            sql_err!("CREATE SUBSOURCE with REFERENCES requires EXTERNAL REFERENCE option")
1640        })?;
1641
1642        // Decode the details option stored on the subsource statement, which contains information
1643        // created during the purification process.
1644        let details = details
1645            .as_ref()
1646            .ok_or_else(|| internal_err!("source-export subsource missing details"))?;
1647        let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1648        let details =
1649            ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1650        let details =
1651            SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1652        let details = match details {
1653            SourceExportStatementDetails::Postgres { table } => {
1654                SourceExportDetails::Postgres(PostgresSourceExportDetails {
1655                    column_casts: crate::pure::postgres::generate_column_casts(
1656                        scx,
1657                        &table,
1658                        &text_columns,
1659                    )?,
1660                    table,
1661                })
1662            }
1663            SourceExportStatementDetails::MySql {
1664                table,
1665                initial_gtid_set,
1666                binlog_full_metadata,
1667            } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1668                table,
1669                initial_gtid_set,
1670                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1671                exclude_columns: exclude_columns
1672                    .into_iter()
1673                    .map(|c| c.into_string())
1674                    .collect(),
1675                binlog_full_metadata,
1676            }),
1677            SourceExportStatementDetails::SqlServer {
1678                table,
1679                capture_instance,
1680                initial_lsn,
1681            } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1682                capture_instance,
1683                table,
1684                initial_lsn,
1685                text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1686                exclude_columns: exclude_columns
1687                    .into_iter()
1688                    .map(|c| c.into_string())
1689                    .collect(),
1690            }),
1691            SourceExportStatementDetails::LoadGenerator { output } => {
1692                SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1693            }
1694            SourceExportStatementDetails::Kafka {} => {
1695                bail_unsupported!("subsources cannot reference Kafka sources")
1696            }
1697        };
1698        DataSourceDesc::IngestionExport {
1699            ingestion_id,
1700            external_reference,
1701            details,
1702            // Subsources don't currently support non-default envelopes / encoding
1703            data_config: SourceExportDataConfig {
1704                envelope: SourceEnvelope::None(NoneEnvelope {
1705                    key_envelope: KeyEnvelope::None,
1706                    key_arity: 0,
1707                }),
1708                encoding: None,
1709            },
1710        }
1711    } else if progress {
1712        DataSourceDesc::Progress
1713    } else {
1714        sql_bail!("CREATE SUBSOURCE must specify one of PROGRESS or REFERENCES option")
1715    };
1716
1717    let if_not_exists = *if_not_exists;
1718    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1719
1720    let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1721
1722    let compaction_window = plan_retain_history_option(scx, retain_history)?;
1723    let source = Source {
1724        create_sql,
1725        data_source,
1726        desc,
1727        compaction_window,
1728    };
1729
1730    Ok(Plan::CreateSource(CreateSourcePlan {
1731        name,
1732        source,
1733        if_not_exists,
1734        timeline: Timeline::EpochMilliseconds,
1735        in_cluster: None,
1736    }))
1737}
1738
1739generate_extracted_config!(
1740    TableFromSourceOption,
1741    (TextColumns, Vec::<Ident>, Default(vec![])),
1742    (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1743    (PartitionBy, Vec<Ident>),
1744    (RetainHistory, OptionalDuration),
1745    (Details, String)
1746);
1747
1748pub fn plan_create_table_from_source(
1749    scx: &StatementContext,
1750    stmt: CreateTableFromSourceStatement<Aug>,
1751) -> Result<Plan, PlanError> {
1752    if !scx.catalog.system_vars().enable_create_table_from_source() {
1753        sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1754    }
1755
1756    let CreateTableFromSourceStatement {
1757        name,
1758        columns,
1759        constraints,
1760        if_not_exists,
1761        source,
1762        external_reference,
1763        envelope,
1764        format,
1765        include_metadata,
1766        with_options,
1767    } = &stmt;
1768
1769    let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1770
1771    let TableFromSourceOptionExtracted {
1772        text_columns,
1773        exclude_columns,
1774        retain_history,
1775        partition_by,
1776        details,
1777        seen: _,
1778    } = with_options.clone().try_into()?;
1779
1780    let source_item = scx.get_item_by_resolved_name(source)?;
1781    let ingestion_id = source_item.id();
1782
1783    // Decode the details option stored on the statement, which contains information
1784    // created during the purification process.
1785    let details = details
1786        .as_ref()
1787        .ok_or_else(|| internal_err!("source-export missing details"))?;
1788    let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1789    let details =
1790        ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1791    let details =
1792        SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1793
1794    if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1795        && include_metadata
1796            .iter()
1797            .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1798    {
1799        // TODO(guswynn): should this be `bail_unsupported!`?
1800        sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1801    }
1802    if !matches!(
1803        details,
1804        SourceExportStatementDetails::Kafka { .. }
1805            | SourceExportStatementDetails::LoadGenerator { .. }
1806    ) && !include_metadata.is_empty()
1807    {
1808        bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1809    }
1810
1811    let details = match details {
1812        SourceExportStatementDetails::Postgres { table } => {
1813            SourceExportDetails::Postgres(PostgresSourceExportDetails {
1814                column_casts: crate::pure::postgres::generate_column_casts(
1815                    scx,
1816                    &table,
1817                    &text_columns,
1818                )?,
1819                table,
1820            })
1821        }
1822        SourceExportStatementDetails::MySql {
1823            table,
1824            initial_gtid_set,
1825            binlog_full_metadata,
1826        } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1827            table,
1828            initial_gtid_set,
1829            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1830            exclude_columns: exclude_columns
1831                .into_iter()
1832                .map(|c| c.into_string())
1833                .collect(),
1834            binlog_full_metadata,
1835        }),
1836        SourceExportStatementDetails::SqlServer {
1837            table,
1838            capture_instance,
1839            initial_lsn,
1840        } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1841            table,
1842            capture_instance,
1843            initial_lsn,
1844            text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1845            exclude_columns: exclude_columns
1846                .into_iter()
1847                .map(|c| c.into_string())
1848                .collect(),
1849        }),
1850        SourceExportStatementDetails::LoadGenerator { output } => {
1851            SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1852        }
1853        SourceExportStatementDetails::Kafka {} => {
1854            if !include_metadata.is_empty()
1855                && !matches!(
1856                    envelope,
1857                    ast::SourceEnvelope::Upsert { .. }
1858                        | ast::SourceEnvelope::None
1859                        | ast::SourceEnvelope::Debezium
1860                )
1861            {
1862                // TODO(guswynn): should this be `bail_unsupported!`?
1863                sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1864            }
1865
1866            let metadata_columns = include_metadata
1867                .into_iter()
1868                .flat_map(|item| match item {
1869                    SourceIncludeMetadata::Timestamp { alias } => {
1870                        let name = match alias {
1871                            Some(name) => name.to_string(),
1872                            None => "timestamp".to_owned(),
1873                        };
1874                        Some((name, KafkaMetadataKind::Timestamp))
1875                    }
1876                    SourceIncludeMetadata::Partition { alias } => {
1877                        let name = match alias {
1878                            Some(name) => name.to_string(),
1879                            None => "partition".to_owned(),
1880                        };
1881                        Some((name, KafkaMetadataKind::Partition))
1882                    }
1883                    SourceIncludeMetadata::Offset { alias } => {
1884                        let name = match alias {
1885                            Some(name) => name.to_string(),
1886                            None => "offset".to_owned(),
1887                        };
1888                        Some((name, KafkaMetadataKind::Offset))
1889                    }
1890                    SourceIncludeMetadata::Headers { alias } => {
1891                        let name = match alias {
1892                            Some(name) => name.to_string(),
1893                            None => "headers".to_owned(),
1894                        };
1895                        Some((name, KafkaMetadataKind::Headers))
1896                    }
1897                    SourceIncludeMetadata::Header {
1898                        alias,
1899                        key,
1900                        use_bytes,
1901                    } => Some((
1902                        alias.to_string(),
1903                        KafkaMetadataKind::Header {
1904                            key: key.clone(),
1905                            use_bytes: *use_bytes,
1906                        },
1907                    )),
1908                    SourceIncludeMetadata::Key { .. } => {
1909                        // handled below
1910                        None
1911                    }
1912                })
1913                .collect();
1914
1915            SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1916        }
1917    };
1918
1919    let source_connection = &source_item
1920        .source_desc()?
1921        .ok_or_else(|| sql_err!("item is not a source"))?
1922        .connection;
1923
1924    // Some source-types (e.g. postgres, mysql, multi-output load-gen sources) define a value_schema
1925    // during purification and define the `columns` and `constraints` fields for the statement,
1926    // whereas other source-types (e.g. kafka, single-output load-gen sources) do not, so instead
1927    // we use the source connection's default schema.
1928    let (key_desc, value_desc) =
1929        if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1930            let columns = match columns {
1931                TableFromSourceColumns::Defined(columns) => columns,
1932                _ => bail_internal!("expected column definitions to be present"),
1933            };
1934            let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1935            (None, desc)
1936        } else {
1937            let key_desc = source_connection.default_key_desc();
1938            let value_desc = source_connection.default_value_desc();
1939            (Some(key_desc), value_desc)
1940        };
1941
1942    let metadata_columns_desc = match &details {
1943        SourceExportDetails::Kafka(KafkaSourceExportDetails {
1944            metadata_columns, ..
1945        }) => kafka_metadata_columns_desc(metadata_columns),
1946        _ => vec![],
1947    };
1948
1949    let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1950        scx,
1951        &envelope,
1952        format,
1953        key_desc,
1954        value_desc,
1955        include_metadata,
1956        metadata_columns_desc,
1957        source_connection,
1958    )?;
1959    if let TableFromSourceColumns::Named(col_names) = columns {
1960        plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1961    }
1962
1963    let names: Vec<_> = desc.iter_names().cloned().collect();
1964    if let Some(dup) = names.iter().duplicates().next() {
1965        sql_bail!("column {} specified more than once", dup.quoted());
1966    }
1967
1968    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1969
1970    // Allow users to specify a timeline. If they do not, determine a default
1971    // timeline for the source.
1972    let timeline = match envelope {
1973        SourceEnvelope::CdcV2 => {
1974            Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1975        }
1976        _ => Timeline::EpochMilliseconds,
1977    };
1978
1979    if let Some(partition_by) = partition_by {
1980        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1981        check_partition_by(&desc, partition_by)?;
1982    }
1983
1984    let data_source = DataSourceDesc::IngestionExport {
1985        ingestion_id,
1986        // Populated during purification.
1987        external_reference: external_reference
1988            .as_ref()
1989            .ok_or_else(|| sql_err!("EXTERNAL REFERENCE is required"))?
1990            .clone(),
1991        details,
1992        data_config: SourceExportDataConfig { envelope, encoding },
1993    };
1994
1995    let if_not_exists = *if_not_exists;
1996
1997    let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1998
1999    let compaction_window = plan_retain_history_option(scx, retain_history)?;
2000    let table = Table {
2001        create_sql,
2002        desc: VersionedRelationDesc::new(desc),
2003        temporary: false,
2004        compaction_window,
2005        data_source: TableDataSource::DataSource {
2006            desc: data_source,
2007            timeline,
2008        },
2009    };
2010
2011    Ok(Plan::CreateTable(CreateTablePlan {
2012        name,
2013        table,
2014        if_not_exists,
2015    }))
2016}
2017
2018generate_extracted_config!(
2019    LoadGeneratorOption,
2020    (TickInterval, Duration),
2021    (AsOf, u64, Default(0_u64)),
2022    (UpTo, u64, Default(u64::MAX)),
2023    (ScaleFactor, f64),
2024    (MaxCardinality, u64),
2025    (Keys, u64),
2026    (SnapshotRounds, u64),
2027    (TransactionalSnapshot, bool),
2028    (ValueSize, u64),
2029    (Seed, u64),
2030    (Partitions, u64),
2031    (BatchSize, u64)
2032);
2033
2034impl LoadGeneratorOptionExtracted {
2035    pub(super) fn ensure_only_valid_options(
2036        &self,
2037        loadgen: &ast::LoadGenerator,
2038    ) -> Result<(), PlanError> {
2039        use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2040
2041        let mut options = self.seen.clone();
2042
2043        let permitted_options: &[_] = match loadgen {
2044            ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2045            ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2046            ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2047            ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2048            ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2049            ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2050            ast::LoadGenerator::KeyValue => &[
2051                TickInterval,
2052                Keys,
2053                SnapshotRounds,
2054                TransactionalSnapshot,
2055                ValueSize,
2056                Seed,
2057                Partitions,
2058                BatchSize,
2059            ],
2060        };
2061
2062        for o in permitted_options {
2063            options.remove(o);
2064        }
2065
2066        if !options.is_empty() {
2067            sql_bail!(
2068                "{} load generators do not support {} values",
2069                loadgen,
2070                options.iter().join(", ")
2071            )
2072        }
2073
2074        Ok(())
2075    }
2076}
2077
2078pub(crate) fn load_generator_ast_to_generator(
2079    scx: &StatementContext,
2080    loadgen: &ast::LoadGenerator,
2081    options: &[LoadGeneratorOption<Aug>],
2082    include_metadata: &[SourceIncludeMetadata],
2083) -> Result<LoadGenerator, PlanError> {
2084    let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2085    extracted.ensure_only_valid_options(loadgen)?;
2086
2087    if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2088        sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2089    }
2090
2091    let load_generator = match loadgen {
2092        ast::LoadGenerator::Auction => LoadGenerator::Auction,
2093        ast::LoadGenerator::Clock => {
2094            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_CLOCK)?;
2095            LoadGenerator::Clock
2096        }
2097        ast::LoadGenerator::Counter => {
2098            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_COUNTER)?;
2099            let LoadGeneratorOptionExtracted {
2100                max_cardinality, ..
2101            } = extracted;
2102            LoadGenerator::Counter { max_cardinality }
2103        }
2104        ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2105        ast::LoadGenerator::Datums => {
2106            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_DATUMS)?;
2107            LoadGenerator::Datums
2108        }
2109        ast::LoadGenerator::Tpch => {
2110            let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2111
2112            // Default to 0.01 scale factor (=10MB).
2113            let sf: f64 = scale_factor.unwrap_or(0.01);
2114            if !sf.is_finite() || sf < 0.0 {
2115                sql_bail!("unsupported scale factor {sf}");
2116            }
2117
2118            let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2119                let total = (sf * multiplier).floor();
2120                let mut i = i64::try_cast_from(total)
2121                    .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2122                if i < 1 {
2123                    i = 1;
2124                }
2125                Ok(i)
2126            };
2127
2128            // The multiplications here are safely unchecked because they will
2129            // overflow to infinity, which will be caught by f64_to_i64.
2130            let count_supplier = f_to_i(10_000f64)?;
2131            let count_part = f_to_i(200_000f64)?;
2132            let count_customer = f_to_i(150_000f64)?;
2133            let count_orders = f_to_i(150_000f64 * 10f64)?;
2134            let count_clerk = f_to_i(1_000f64)?;
2135
2136            LoadGenerator::Tpch {
2137                count_supplier,
2138                count_part,
2139                count_customer,
2140                count_orders,
2141                count_clerk,
2142            }
2143        }
2144        mz_sql_parser::ast::LoadGenerator::KeyValue => {
2145            scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2146            let LoadGeneratorOptionExtracted {
2147                keys,
2148                snapshot_rounds,
2149                transactional_snapshot,
2150                value_size,
2151                tick_interval,
2152                seed,
2153                partitions,
2154                batch_size,
2155                ..
2156            } = extracted;
2157
2158            let mut include_offset = None;
2159            for im in include_metadata {
2160                match im {
2161                    SourceIncludeMetadata::Offset { alias } => {
2162                        include_offset = match alias {
2163                            Some(alias) => Some(alias.to_string()),
2164                            None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2165                        }
2166                    }
2167                    SourceIncludeMetadata::Key { .. } => continue,
2168
2169                    _ => {
2170                        sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2171                    }
2172                };
2173            }
2174
2175            let lgkv = KeyValueLoadGenerator {
2176                keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2177                snapshot_rounds: snapshot_rounds
2178                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2179                // Defaults to true.
2180                transactional_snapshot: transactional_snapshot.unwrap_or(true),
2181                value_size: value_size
2182                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2183                partitions: partitions
2184                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2185                tick_interval,
2186                batch_size: batch_size
2187                    .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2188                seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2189                include_offset,
2190            };
2191
2192            if lgkv.keys == 0
2193                || lgkv.partitions == 0
2194                || lgkv.value_size == 0
2195                || lgkv.batch_size == 0
2196            {
2197                sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2198            }
2199
2200            if lgkv.keys % lgkv.partitions != 0 {
2201                sql_bail!("KEYS must be a multiple of PARTITIONS")
2202            }
2203
2204            if lgkv.batch_size > lgkv.keys {
2205                sql_bail!("KEYS must be larger than BATCH SIZE")
2206            }
2207
2208            // This constraints simplifies the source implementation.
2209            // We can lift it later.
2210            if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2211                sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2212            }
2213
2214            if lgkv.snapshot_rounds == 0 {
2215                sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2216            }
2217
2218            LoadGenerator::KeyValue(lgkv)
2219        }
2220    };
2221
2222    Ok(load_generator)
2223}
2224
2225fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2226    let before = value_desc.get_by_name(&"before".into());
2227    let (after_idx, after_ty) = value_desc
2228        .get_by_name(&"after".into())
2229        .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2230    let before_idx = if let Some((before_idx, before_ty)) = before {
2231        if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2232            sql_bail!("'before' column must be of type record");
2233        }
2234        if before_ty != after_ty {
2235            sql_bail!("'before' type differs from 'after' column");
2236        }
2237        Some(before_idx)
2238    } else {
2239        None
2240    };
2241    Ok((before_idx, after_idx))
2242}
2243
2244fn get_encoding(
2245    scx: &StatementContext,
2246    format: &FormatSpecifier<Aug>,
2247    envelope: &ast::SourceEnvelope,
2248) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2249    let encoding = match format {
2250        FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2251        FormatSpecifier::KeyValue { key, value } => {
2252            let key = {
2253                let encoding = get_encoding_inner(scx, key)?;
2254                Some(encoding.key.unwrap_or(encoding.value))
2255            };
2256            let value = get_encoding_inner(scx, value)?.value;
2257            SourceDataEncoding { key, value }
2258        }
2259    };
2260
2261    let requires_keyvalue = matches!(
2262        envelope,
2263        ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2264    );
2265    let is_keyvalue = encoding.key.is_some();
2266    if requires_keyvalue && !is_keyvalue {
2267        sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2268    };
2269
2270    Ok(encoding)
2271}
2272
2273/// Determine the cluster ID to use for this item.
2274///
2275/// If `in_cluster` is `None` we will update it to refer to the default cluster.
2276/// Because of this, do not normalize/canonicalize the create SQL statement
2277/// until after calling this function.
2278fn source_sink_cluster_config<'a, 'ctx>(
2279    scx: &'a StatementContext<'ctx>,
2280    in_cluster: &mut Option<ResolvedClusterName>,
2281) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2282    let cluster = match in_cluster {
2283        None => {
2284            let cluster = scx.catalog.resolve_cluster(None)?;
2285            *in_cluster = Some(ResolvedClusterName {
2286                id: cluster.id(),
2287                print_name: None,
2288            });
2289            cluster
2290        }
2291        Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2292    };
2293
2294    Ok(cluster)
2295}
2296
2297generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2298
2299generate_extracted_config!(GlueAvroOption, (SchemaName, String));
2300
2301#[derive(Debug)]
2302pub struct Schema {
2303    pub key_schema: Option<String>,
2304    pub value_schema: String,
2305    /// Reference schemas for the key schema, in dependency order.
2306    pub key_reference_schemas: Vec<String>,
2307    /// Reference schemas for the value schema, in dependency order.
2308    pub value_reference_schemas: Vec<String>,
2309    /// Wire-format dispatch and the registry connection to fetch writer
2310    /// schemas from. Built directly by each `AvroSchema` variant rather
2311    /// than reconstructed from a (csr_connection, bool) pair downstream.
2312    pub wire_format: WireFormat<ReferencedConnection>,
2313}
2314
2315fn get_encoding_inner(
2316    scx: &StatementContext,
2317    format: &Format<Aug>,
2318) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2319    let value = match format {
2320        Format::Bytes => DataEncoding::Bytes,
2321        Format::Avro(schema) => {
2322            let Schema {
2323                key_schema,
2324                value_schema,
2325                key_reference_schemas,
2326                value_reference_schemas,
2327                wire_format,
2328            } = match schema {
2329                // TODO(jldlaughlin): we need a way to pass in primary key information
2330                // when building a source from a string or file.
2331                AvroSchema::InlineSchema {
2332                    schema: ast::Schema { schema },
2333                    with_options,
2334                } => {
2335                    let AvroSchemaOptionExtracted {
2336                        confluent_wire_format,
2337                        ..
2338                    } = with_options.clone().try_into()?;
2339                    let wire_format = if confluent_wire_format {
2340                        WireFormat::Confluent { registry: None }
2341                    } else {
2342                        WireFormat::None
2343                    };
2344                    Schema {
2345                        key_schema: None,
2346                        value_schema: schema.clone(),
2347                        key_reference_schemas: vec![],
2348                        value_reference_schemas: vec![],
2349                        wire_format,
2350                    }
2351                }
2352                AvroSchema::Csr {
2353                    csr_connection:
2354                        CsrConnectionAvro {
2355                            connection,
2356                            seed,
2357                            key_strategy: _,
2358                            value_strategy: _,
2359                        },
2360                } => {
2361                    let item = scx.get_item_by_resolved_name(&connection.connection)?;
2362                    let csr_connection = match item.connection()? {
2363                        Connection::Csr(_) => item.id(),
2364                        _ => {
2365                            sql_bail!(
2366                                "{} is not a Confluent Schema Registry connection",
2367                                scx.catalog
2368                                    .resolve_full_name(item.name())
2369                                    .to_string()
2370                                    .quoted()
2371                            )
2372                        }
2373                    };
2374
2375                    if let Some(seed) = seed {
2376                        Schema {
2377                            key_schema: seed.key_schema.clone(),
2378                            value_schema: seed.value_schema.clone(),
2379                            key_reference_schemas: seed.key_reference_schemas.clone(),
2380                            value_reference_schemas: seed.value_reference_schemas.clone(),
2381                            wire_format: WireFormat::Confluent {
2382                                registry: Some(csr_connection),
2383                            },
2384                        }
2385                    } else {
2386                        sql_bail!("Avro CSR seed resolution has not been performed")
2387                    }
2388                }
2389                AvroSchema::Glue {
2390                    connection,
2391                    with_options: _,
2392                    seed,
2393                } => {
2394                    let item = scx.get_item_by_resolved_name(connection)?;
2395                    let glue_connection = match item.connection()? {
2396                        Connection::GlueSchemaRegistry(_) => item.id(),
2397                        _ => {
2398                            sql_bail!(
2399                                "{} is not an AWS Glue Schema Registry connection",
2400                                scx.catalog
2401                                    .resolve_full_name(item.name())
2402                                    .to_string()
2403                                    .quoted()
2404                            )
2405                        }
2406                    };
2407
2408                    // `SCHEMA NAME` requiredness is enforced during
2409                    // purification, which also populates `seed`. By the time
2410                    // planning runs the option is guaranteed present.
2411                    let Some(seed) = seed else {
2412                        sql_bail!("Avro Glue seed resolution has not been performed");
2413                    };
2414
2415                    Schema {
2416                        key_schema: None,
2417                        value_schema: seed.value_schema.clone(),
2418                        key_reference_schemas: vec![],
2419                        value_reference_schemas: vec![],
2420                        wire_format: WireFormat::Glue {
2421                            registry: Some(glue_connection),
2422                        },
2423                    }
2424                }
2425            };
2426
2427            if let Some(key_schema) = key_schema {
2428                return Ok(SourceDataEncoding {
2429                    key: Some(DataEncoding::Avro(AvroEncoding {
2430                        schema: key_schema,
2431                        reference_schemas: key_reference_schemas,
2432                        wire_format: wire_format.clone(),
2433                    })),
2434                    value: DataEncoding::Avro(AvroEncoding {
2435                        schema: value_schema,
2436                        reference_schemas: value_reference_schemas,
2437                        wire_format,
2438                    }),
2439                });
2440            } else {
2441                DataEncoding::Avro(AvroEncoding {
2442                    schema: value_schema,
2443                    reference_schemas: value_reference_schemas,
2444                    wire_format,
2445                })
2446            }
2447        }
2448        Format::Protobuf(schema) => match schema {
2449            ProtobufSchema::Csr {
2450                csr_connection:
2451                    CsrConnectionProtobuf {
2452                        connection:
2453                            CsrConnection {
2454                                connection,
2455                                options,
2456                            },
2457                        seed,
2458                    },
2459            } => {
2460                if let Some(CsrSeedProtobuf { key, value }) = seed {
2461                    let item = scx.get_item_by_resolved_name(connection)?;
2462                    let _ = match item.connection()? {
2463                        Connection::Csr(connection) => connection,
2464                        _ => {
2465                            sql_bail!(
2466                                "{} is not a schema registry connection",
2467                                scx.catalog
2468                                    .resolve_full_name(item.name())
2469                                    .to_string()
2470                                    .quoted()
2471                            )
2472                        }
2473                    };
2474
2475                    if !options.is_empty() {
2476                        sql_bail!("Protobuf CSR connections do not support any options");
2477                    }
2478
2479                    let value = DataEncoding::Protobuf(ProtobufEncoding {
2480                        descriptors: strconv::parse_bytes(&value.schema)?,
2481                        message_name: value.message_name.clone(),
2482                        confluent_wire_format: true,
2483                    });
2484                    if let Some(key) = key {
2485                        return Ok(SourceDataEncoding {
2486                            key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2487                                descriptors: strconv::parse_bytes(&key.schema)?,
2488                                message_name: key.message_name.clone(),
2489                                confluent_wire_format: true,
2490                            })),
2491                            value,
2492                        });
2493                    }
2494                    value
2495                } else {
2496                    sql_bail!("Protobuf CSR seed resolution has not been performed")
2497                }
2498            }
2499            ProtobufSchema::InlineSchema {
2500                message_name,
2501                schema: ast::Schema { schema },
2502            } => {
2503                let descriptors = strconv::parse_bytes(schema)?;
2504
2505                DataEncoding::Protobuf(ProtobufEncoding {
2506                    descriptors,
2507                    message_name: message_name.to_owned(),
2508                    confluent_wire_format: false,
2509                })
2510            }
2511        },
2512        Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2513            regex: mz_repr::adt::regex::Regex::new(regex, false)
2514                .map_err(|e| sql_err!("parsing regex: {e}"))?,
2515        }),
2516        Format::Csv { columns, delimiter } => {
2517            let columns = match columns {
2518                CsvColumns::Header { names } => {
2519                    if names.is_empty() {
2520                        sql_bail!("[internal error] column spec should get names in purify")
2521                    }
2522                    ColumnSpec::Header {
2523                        names: names.iter().cloned().map(|n| n.into_string()).collect(),
2524                    }
2525                }
2526                CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2527            };
2528            DataEncoding::Csv(CsvEncoding {
2529                columns,
2530                delimiter: u8::try_from(*delimiter)
2531                    .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2532            })
2533        }
2534        Format::Json { array: false } => DataEncoding::Json,
2535        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2536        Format::Text => DataEncoding::Text,
2537    };
2538    Ok(SourceDataEncoding { key: None, value })
2539}
2540
2541/// Extract the key envelope, if it is requested
2542fn get_key_envelope(
2543    included_items: &[SourceIncludeMetadata],
2544    encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2545    key_envelope_no_encoding: bool,
2546) -> Result<KeyEnvelope, PlanError> {
2547    let key_definition = included_items
2548        .iter()
2549        .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2550    if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2551        match (alias, encoding.and_then(|e| e.key.as_ref())) {
2552            (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2553            (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2554            (Some(name), _) if key_envelope_no_encoding => {
2555                Ok(KeyEnvelope::Named(name.as_str().to_string()))
2556            }
2557            (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2558            (_, None) => {
2559                // `kd.alias` == `None` means `INCLUDE KEY`
2560                // `kd.alias` == `Some(_) means INCLUDE KEY AS ___`
2561                // These both make sense with the same error message
2562                sql_bail!(
2563                    "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2564                        got bare FORMAT"
2565                );
2566            }
2567        }
2568    } else {
2569        Ok(KeyEnvelope::None)
2570    }
2571}
2572
2573/// Gets the key envelope for a given key encoding when no name for the key has
2574/// been requested by the user.
2575fn get_unnamed_key_envelope(
2576    key: Option<&DataEncoding<ReferencedConnection>>,
2577) -> Result<KeyEnvelope, PlanError> {
2578    // If the key is requested but comes from an unnamed type then it gets the name "key"
2579    //
2580    // Otherwise it gets the names of the columns in the type
2581    let is_composite = match key {
2582        Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2583        Some(
2584            DataEncoding::Avro(_)
2585            | DataEncoding::Csv(_)
2586            | DataEncoding::Protobuf(_)
2587            | DataEncoding::Regex { .. },
2588        ) => true,
2589        None => false,
2590    };
2591
2592    if is_composite {
2593        Ok(KeyEnvelope::Flattened)
2594    } else {
2595        Ok(KeyEnvelope::Named("key".to_string()))
2596    }
2597}
2598
2599pub fn describe_create_view(
2600    _: &StatementContext,
2601    _: CreateViewStatement<Aug>,
2602) -> Result<StatementDesc, PlanError> {
2603    Ok(StatementDesc::new(None))
2604}
2605
2606pub fn plan_view(
2607    scx: &StatementContext,
2608    def: &mut ViewDefinition<Aug>,
2609    temporary: bool,
2610) -> Result<(QualifiedItemName, View), PlanError> {
2611    let create_sql = normalize::create_statement(
2612        scx,
2613        Statement::CreateView(CreateViewStatement {
2614            if_exists: IfExistsBehavior::Error,
2615            temporary,
2616            definition: def.clone(),
2617        }),
2618    )?;
2619
2620    let ViewDefinition {
2621        name,
2622        columns,
2623        query,
2624    } = def;
2625
2626    let query::PlannedRootQuery {
2627        expr,
2628        mut desc,
2629        finishing,
2630        scope: _,
2631    } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2632    // We get back a trivial finishing, because `plan_root_query` applies the given finishing.
2633    // Note: Earlier, we were thinking to maybe persist the finishing information with the view
2634    // here to help with database-issues#236. However, in the meantime, there might be a better
2635    // approach to solve database-issues#236:
2636    // https://github.com/MaterializeInc/database-issues/issues/236#issuecomment-1688293709
2637    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2638        &finishing,
2639        expr.arity()
2640    ));
2641    if expr.contains_parameters()? {
2642        return Err(PlanError::ParameterNotAllowed("views".to_string()));
2643    }
2644
2645    let dependencies = expr
2646        .depends_on()
2647        .into_iter()
2648        .map(|gid| scx.catalog.resolve_item_id(&gid))
2649        .collect();
2650
2651    let name = if temporary {
2652        scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2653    } else {
2654        scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2655    };
2656
2657    plan_utils::maybe_rename_columns(
2658        format!("view {}", scx.catalog.resolve_full_name(&name)),
2659        &mut desc,
2660        columns,
2661    )?;
2662    let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2663
2664    if let Some(dup) = names.iter().duplicates().next() {
2665        sql_bail!("column {} specified more than once", dup.quoted());
2666    }
2667
2668    let view = View {
2669        create_sql,
2670        expr,
2671        dependencies,
2672        column_names: names,
2673        temporary,
2674    };
2675
2676    Ok((name, view))
2677}
2678
2679pub fn plan_create_view(
2680    scx: &StatementContext,
2681    mut stmt: CreateViewStatement<Aug>,
2682) -> Result<Plan, PlanError> {
2683    let CreateViewStatement {
2684        temporary,
2685        if_exists,
2686        definition,
2687    } = &mut stmt;
2688    let (name, view) = plan_view(scx, definition, *temporary)?;
2689
2690    // Override the statement-level IfExistsBehavior with Skip if this is
2691    // explicitly requested in the PlanContext (the default is `false`).
2692    let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2693
2694    let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2695        let if_exists = true;
2696        let cascade = false;
2697        let maybe_item_to_drop = plan_drop_item(
2698            scx,
2699            ObjectType::View,
2700            if_exists,
2701            definition.name.clone(),
2702            cascade,
2703        )?;
2704
2705        // Check if the new View depends on the item that we would be replacing.
2706        if let Some(id) = maybe_item_to_drop {
2707            let dependencies = view.expr.depends_on();
2708            let invalid_drop = scx
2709                .get_item(&id)
2710                .global_ids()
2711                .any(|gid| dependencies.contains(&gid));
2712            if invalid_drop {
2713                let item = scx.catalog.get_item(&id);
2714                sql_bail!(
2715                    "cannot replace view {0}: depended upon by new {0} definition",
2716                    scx.catalog.resolve_full_name(item.name())
2717                );
2718            }
2719
2720            Some(id)
2721        } else {
2722            None
2723        }
2724    } else {
2725        None
2726    };
2727    let drop_ids = replace
2728        .map(|id| {
2729            scx.catalog
2730                .item_dependents(id)
2731                .into_iter()
2732                .map(|id| id.unwrap_item_id())
2733                .collect()
2734        })
2735        .unwrap_or_default();
2736
2737    validate_view_dependencies(scx, &view.dependencies.0)?;
2738
2739    // Check for an object in the catalog with this same name
2740    let full_name = scx.catalog.resolve_full_name(&name);
2741    let partial_name = PartialItemName::from(full_name.clone());
2742    // For PostgreSQL compatibility, we need to prevent creating views when
2743    // there is an existing object *or* type of the same name.
2744    if let (Ok(item), IfExistsBehavior::Error, false) = (
2745        scx.catalog.resolve_item_or_type(&partial_name),
2746        *if_exists,
2747        ignore_if_exists_errors,
2748    ) {
2749        return Err(PlanError::ItemAlreadyExists {
2750            name: full_name.to_string(),
2751            item_type: item.item_type(),
2752        });
2753    }
2754
2755    Ok(Plan::CreateView(CreateViewPlan {
2756        name,
2757        view,
2758        replace,
2759        drop_ids,
2760        if_not_exists: *if_exists == IfExistsBehavior::Skip,
2761        ambiguous_columns: *scx.ambiguous_columns.borrow(),
2762    }))
2763}
2764
2765/// Validate the dependencies of a (materialized) view.
2766fn validate_view_dependencies(
2767    scx: &StatementContext,
2768    dependencies: &BTreeSet<CatalogItemId>,
2769) -> Result<(), PlanError> {
2770    for id in dependencies {
2771        let item = scx.catalog.get_item(id);
2772        if item.replacement_target().is_some() {
2773            let name = scx.catalog.minimal_qualification(item.name());
2774            return Err(PlanError::InvalidDependency {
2775                name: name.to_string(),
2776                item_type: format!("replacement {}", item.item_type()),
2777            });
2778        }
2779    }
2780
2781    Ok(())
2782}
2783
2784pub fn describe_create_materialized_view(
2785    _: &StatementContext,
2786    _: CreateMaterializedViewStatement<Aug>,
2787) -> Result<StatementDesc, PlanError> {
2788    Ok(StatementDesc::new(None))
2789}
2790
2791pub fn describe_create_network_policy(
2792    _: &StatementContext,
2793    _: CreateNetworkPolicyStatement<Aug>,
2794) -> Result<StatementDesc, PlanError> {
2795    Ok(StatementDesc::new(None))
2796}
2797
2798pub fn describe_alter_network_policy(
2799    _: &StatementContext,
2800    _: AlterNetworkPolicyStatement<Aug>,
2801) -> Result<StatementDesc, PlanError> {
2802    Ok(StatementDesc::new(None))
2803}
2804
2805pub fn plan_create_materialized_view(
2806    scx: &StatementContext,
2807    mut stmt: CreateMaterializedViewStatement<Aug>,
2808) -> Result<Plan, PlanError> {
2809    let cluster_id =
2810        crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2811    stmt.in_cluster = Some(ResolvedClusterName {
2812        id: cluster_id,
2813        print_name: None,
2814    });
2815
2816    let target_replica = match &stmt.in_cluster_replica {
2817        Some(replica_name) => {
2818            scx.require_feature_flag(&ENABLE_REPLICA_TARGETED_MATERIALIZED_VIEWS)?;
2819
2820            let cluster = scx.catalog.get_cluster(cluster_id);
2821            let replica_id = cluster
2822                .replica_ids()
2823                .get(replica_name.as_str())
2824                .copied()
2825                .ok_or_else(|| {
2826                    CatalogError::UnknownClusterReplica(replica_name.as_str().to_string())
2827                })?;
2828            Some(replica_id)
2829        }
2830        None => None,
2831    };
2832
2833    let create_sql =
2834        normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2835
2836    let partial_name = normalize::unresolved_item_name(stmt.name)?;
2837    let name = scx.allocate_qualified_name(partial_name.clone())?;
2838
2839    let query::PlannedRootQuery {
2840        expr,
2841        mut desc,
2842        finishing,
2843        scope: _,
2844    } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2845    // We get back a trivial finishing, see comment in `plan_view`.
2846    assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2847        &finishing,
2848        expr.arity()
2849    ));
2850    if expr.contains_parameters()? {
2851        return Err(PlanError::ParameterNotAllowed(
2852            "materialized views".to_string(),
2853        ));
2854    }
2855
2856    plan_utils::maybe_rename_columns(
2857        format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2858        &mut desc,
2859        &stmt.columns,
2860    )?;
2861    let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2862
2863    let MaterializedViewOptionExtracted {
2864        assert_not_null,
2865        partition_by,
2866        retain_history,
2867        refresh,
2868        seen: _,
2869    }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2870
2871    if let Some(partition_by) = partition_by {
2872        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2873        check_partition_by(&desc, partition_by)?;
2874    }
2875
2876    let refresh_schedule = {
2877        let mut refresh_schedule = RefreshSchedule::default();
2878        let mut on_commits_seen = 0;
2879        for refresh_option_value in refresh {
2880            if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2881                scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2882            }
2883            match refresh_option_value {
2884                RefreshOptionValue::OnCommit => {
2885                    on_commits_seen += 1;
2886                }
2887                RefreshOptionValue::AtCreation => {
2888                    soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2889                    bail_internal!("REFRESH AT CREATION should have been purified away")
2890                }
2891                RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2892                    transform_ast::transform(scx, &mut time)?; // Desugar the expression
2893                    let ecx = &ExprContext {
2894                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2895                        name: "REFRESH AT",
2896                        scope: &Scope::empty(),
2897                        relation_type: &SqlRelationType::empty(),
2898                        allow_aggregates: false,
2899                        allow_subqueries: false,
2900                        allow_parameters: false,
2901                        allow_windows: false,
2902                    };
2903                    let hir = plan_expr(ecx, &time)?.cast_to(
2904                        ecx,
2905                        CastContext::Assignment,
2906                        &SqlScalarType::MzTimestamp,
2907                    )?;
2908                    // (mz_now was purified away to a literal earlier)
2909                    let timestamp = hir
2910                        .into_literal_mz_timestamp()
2911                        .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2912                    refresh_schedule.ats.push(timestamp);
2913                }
2914                RefreshOptionValue::Every(RefreshEveryOptionValue {
2915                    interval,
2916                    aligned_to,
2917                }) => {
2918                    let interval = Interval::try_from_value(Value::Interval(interval))?;
2919                    if interval.as_microseconds() <= 0 {
2920                        sql_bail!("REFRESH interval must be positive; got: {}", interval);
2921                    }
2922                    if interval.months != 0 {
2923                        // This limitation is because we want Intervals to be cleanly convertable
2924                        // to a unix epoch timestamp difference. When the interval involves months, then
2925                        // this is not true anymore, because months have variable lengths.
2926                        // See `Timestamp::round_up`.
2927                        sql_bail!("REFRESH interval must not involve units larger than days");
2928                    }
2929                    let interval = interval.duration()?;
2930                    // `Interval::from_duration` (needed to unparse the interval, e.g. for
2931                    // `mz_materialized_view_refresh_strategies`) requires the micros to fit
2932                    // in an i64, which is a tighter bound than `Duration::duration` enforces.
2933                    // Reject too-large intervals here to avoid panicking later.
2934                    if u64::try_from(interval.as_millis()).is_err()
2935                        || Interval::from_duration(&interval).is_err()
2936                    {
2937                        sql_bail!("REFRESH interval too large");
2938                    }
2939                    if interval.as_micros() < 1000 {
2940                        sql_bail!("REFRESH interval must be at least 1 ms")
2941                    }
2942
2943                    let mut aligned_to = match aligned_to {
2944                        Some(aligned_to) => aligned_to,
2945                        None => {
2946                            soft_panic_or_log!(
2947                                "ALIGNED TO should have been filled in by purification"
2948                            );
2949                            sql_bail!(
2950                                "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2951                            )
2952                        }
2953                    };
2954
2955                    // Desugar the `aligned_to` expression
2956                    transform_ast::transform(scx, &mut aligned_to)?;
2957
2958                    let ecx = &ExprContext {
2959                        qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2960                        name: "REFRESH EVERY ... ALIGNED TO",
2961                        scope: &Scope::empty(),
2962                        relation_type: &SqlRelationType::empty(),
2963                        allow_aggregates: false,
2964                        allow_subqueries: false,
2965                        allow_parameters: false,
2966                        allow_windows: false,
2967                    };
2968                    let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2969                        ecx,
2970                        CastContext::Assignment,
2971                        &SqlScalarType::MzTimestamp,
2972                    )?;
2973                    // (mz_now was purified away to a literal earlier)
2974                    let aligned_to_const = aligned_to_hir
2975                        .into_literal_mz_timestamp()
2976                        .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2977
2978                    refresh_schedule.everies.push(RefreshEvery {
2979                        interval,
2980                        aligned_to: aligned_to_const,
2981                    });
2982                }
2983            }
2984        }
2985
2986        if on_commits_seen > 1 {
2987            sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2988        }
2989        if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2990            sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2991        }
2992
2993        if refresh_schedule == RefreshSchedule::default() {
2994            None
2995        } else {
2996            Some(refresh_schedule)
2997        }
2998    };
2999
3000    let as_of = stmt.as_of.map(Timestamp::from);
3001    let compaction_window = plan_retain_history_option(scx, retain_history)?;
3002    let mut non_null_assertions = assert_not_null
3003        .into_iter()
3004        .map(normalize::column_name)
3005        .map(|assertion_name| {
3006            column_names
3007                .iter()
3008                .position(|col| col == &assertion_name)
3009                .ok_or_else(|| {
3010                    sql_err!(
3011                        "column {} in ASSERT NOT NULL option not found",
3012                        assertion_name.quoted()
3013                    )
3014                })
3015        })
3016        .collect::<Result<Vec<_>, _>>()?;
3017    non_null_assertions.sort();
3018    if let Some(dup) = non_null_assertions.iter().duplicates().next() {
3019        let dup = &column_names[*dup];
3020        sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
3021    }
3022
3023    if let Some(dup) = column_names.iter().duplicates().next() {
3024        sql_bail!("column {} specified more than once", dup.quoted());
3025    }
3026
3027    // Override the statement-level IfExistsBehavior with Skip if this is
3028    // explicitly requested in the PlanContext (the default is `false`).
3029    let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
3030        Ok(true) => IfExistsBehavior::Skip,
3031        _ => stmt.if_exists,
3032    };
3033
3034    let mut replace = None;
3035    let mut if_not_exists = false;
3036    match if_exists {
3037        IfExistsBehavior::Replace => {
3038            let if_exists = true;
3039            let cascade = false;
3040            let replace_id = plan_drop_item(
3041                scx,
3042                ObjectType::MaterializedView,
3043                if_exists,
3044                partial_name.clone().into(),
3045                cascade,
3046            )?;
3047
3048            // Check if the new Materialized View depends on the item that we would be replacing.
3049            if let Some(id) = replace_id {
3050                let dependencies = expr.depends_on();
3051                let invalid_drop = scx
3052                    .get_item(&id)
3053                    .global_ids()
3054                    .any(|gid| dependencies.contains(&gid));
3055                if invalid_drop {
3056                    let item = scx.catalog.get_item(&id);
3057                    sql_bail!(
3058                        "cannot replace materialized view {0}: depended upon by new {0} definition",
3059                        scx.catalog.resolve_full_name(item.name())
3060                    );
3061                }
3062                replace = Some(id);
3063            }
3064        }
3065        IfExistsBehavior::Skip => if_not_exists = true,
3066        IfExistsBehavior::Error => (),
3067    }
3068    let drop_ids = replace
3069        .map(|id| {
3070            scx.catalog
3071                .item_dependents(id)
3072                .into_iter()
3073                .map(|id| id.unwrap_item_id())
3074                .collect()
3075        })
3076        .unwrap_or_default();
3077    let mut dependencies: BTreeSet<_> = expr
3078        .depends_on()
3079        .into_iter()
3080        .map(|gid| scx.catalog.resolve_item_id(&gid))
3081        .collect();
3082
3083    // Validate the replacement target, if one is given.
3084    let mut replacement_target = None;
3085    if let Some(target_name) = &stmt.replacement_for {
3086        scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
3087
3088        let target = scx.get_item_by_resolved_name(target_name)?;
3089        if target.item_type() != CatalogItemType::MaterializedView {
3090            return Err(PlanError::InvalidReplacement {
3091                item_type: target.item_type(),
3092                item_name: scx.catalog.minimal_qualification(target.name()),
3093                replacement_type: CatalogItemType::MaterializedView,
3094                replacement_name: partial_name,
3095            });
3096        }
3097        if target.id().is_system() {
3098            sql_bail!(
3099                "cannot replace {} because it is required by the database system",
3100                scx.catalog.minimal_qualification(target.name()),
3101            );
3102        }
3103
3104        // Check for dependency cycles.
3105        for dependent in scx.catalog.item_dependents(target.id()) {
3106            if let ObjectId::Item(id) = dependent
3107                && dependencies.contains(&id)
3108            {
3109                sql_bail!(
3110                    "replacement would cause {} to depend on itself",
3111                    scx.catalog.minimal_qualification(target.name()),
3112                );
3113            }
3114        }
3115
3116        dependencies.insert(target.id());
3117
3118        for use_id in target.used_by() {
3119            let use_item = scx.get_item(use_id);
3120            if use_item.replacement_target() == Some(target.id()) {
3121                sql_bail!(
3122                    "cannot replace {} because it already has a replacement: {}",
3123                    scx.catalog.minimal_qualification(target.name()),
3124                    scx.catalog.minimal_qualification(use_item.name()),
3125                );
3126            }
3127        }
3128
3129        replacement_target = Some(target.id());
3130    }
3131
3132    validate_view_dependencies(scx, &dependencies)?;
3133
3134    // Check for an object in the catalog with this same name
3135    let full_name = scx.catalog.resolve_full_name(&name);
3136    let partial_name = PartialItemName::from(full_name.clone());
3137    // For PostgreSQL compatibility, we need to prevent creating materialized
3138    // views when there is an existing object *or* type of the same name.
3139    if let (IfExistsBehavior::Error, Ok(item)) =
3140        (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
3141    {
3142        return Err(PlanError::ItemAlreadyExists {
3143            name: full_name.to_string(),
3144            item_type: item.item_type(),
3145        });
3146    }
3147
3148    Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
3149        name,
3150        materialized_view: MaterializedView {
3151            create_sql,
3152            expr,
3153            dependencies: DependencyIds(dependencies),
3154            column_names,
3155            replacement_target,
3156            cluster_id,
3157            target_replica,
3158            non_null_assertions,
3159            compaction_window,
3160            refresh_schedule,
3161            as_of,
3162        },
3163        replace,
3164        drop_ids,
3165        if_not_exists,
3166        ambiguous_columns: *scx.ambiguous_columns.borrow(),
3167    }))
3168}
3169
3170generate_extracted_config!(
3171    MaterializedViewOption,
3172    (AssertNotNull, Ident, AllowMultiple),
3173    (PartitionBy, Vec<Ident>),
3174    (RetainHistory, OptionalDuration),
3175    (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
3176);
3177
3178pub fn describe_create_sink(
3179    _: &StatementContext,
3180    _: CreateSinkStatement<Aug>,
3181) -> Result<StatementDesc, PlanError> {
3182    Ok(StatementDesc::new(None))
3183}
3184
3185generate_extracted_config!(
3186    CreateSinkOption,
3187    (Snapshot, bool),
3188    (PartitionStrategy, String),
3189    (Version, u64),
3190    (CommitInterval, Duration)
3191);
3192
3193pub fn plan_create_sink(
3194    scx: &StatementContext,
3195    stmt: CreateSinkStatement<Aug>,
3196) -> Result<Plan, PlanError> {
3197    // Check for an object in the catalog with this same name
3198    let Some(name) = stmt.name.clone() else {
3199        return Err(PlanError::MissingName(CatalogItemType::Sink));
3200    };
3201    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3202    let full_name = scx.catalog.resolve_full_name(&name);
3203    let partial_name = PartialItemName::from(full_name.clone());
3204    if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3205        return Err(PlanError::ItemAlreadyExists {
3206            name: full_name.to_string(),
3207            item_type: item.item_type(),
3208        });
3209    }
3210
3211    plan_sink(scx, stmt)
3212}
3213
3214/// This function will plan a sink as if it does not exist in the catalog. This is so the planning
3215/// logic is reused by both CREATE SINK and ALTER SINK planning. It is the responsibility of the
3216/// callers (plan_create_sink and plan_alter_sink) to check for name collisions if this is
3217/// important.
3218fn plan_sink(
3219    scx: &StatementContext,
3220    mut stmt: CreateSinkStatement<Aug>,
3221) -> Result<Plan, PlanError> {
3222    let CreateSinkStatement {
3223        name,
3224        in_cluster: _,
3225        from,
3226        connection,
3227        format,
3228        envelope,
3229        mode,
3230        if_not_exists,
3231        with_options,
3232    } = stmt.clone();
3233
3234    let Some(name) = name else {
3235        return Err(PlanError::MissingName(CatalogItemType::Sink));
3236    };
3237    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3238
3239    let envelope = match (&connection, envelope, mode) {
3240        // Kafka sinks use ENVELOPE
3241        (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Upsert), None) => {
3242            SinkEnvelope::Upsert
3243        }
3244        (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Debezium), None) => {
3245            SinkEnvelope::Debezium
3246        }
3247        (CreateSinkConnection::Kafka { .. }, None, None) => {
3248            sql_bail!("ENVELOPE clause is required")
3249        }
3250        (CreateSinkConnection::Kafka { .. }, _, Some(_)) => {
3251            sql_bail!("MODE is not supported for Kafka sinks, use ENVELOPE instead")
3252        }
3253        // Iceberg sinks use MODE
3254        (CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Upsert)) => {
3255            SinkEnvelope::Upsert
3256        }
3257        (CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Append)) => {
3258            SinkEnvelope::Append
3259        }
3260        (CreateSinkConnection::Iceberg { .. }, None, None) => {
3261            sql_bail!("MODE clause is required")
3262        }
3263        (CreateSinkConnection::Iceberg { .. }, Some(_), _) => {
3264            sql_bail!("ENVELOPE is not supported for Iceberg sinks, use MODE instead")
3265        }
3266    };
3267
3268    let from_name = &from;
3269    let from = scx.get_item_by_resolved_name(&from)?;
3270
3271    {
3272        use CatalogItemType::*;
3273        match from.item_type() {
3274            Table | Source | MaterializedView => {
3275                if from.replacement_target().is_some() {
3276                    let name = scx.catalog.minimal_qualification(from.name());
3277                    return Err(PlanError::InvalidSinkFrom {
3278                        name: name.to_string(),
3279                        item_type: format!("replacement {}", from.item_type()),
3280                    });
3281                }
3282            }
3283            Sink | View | Index | Type | Func | Secret | Connection => {
3284                let name = scx.catalog.minimal_qualification(from.name());
3285                return Err(PlanError::InvalidSinkFrom {
3286                    name: name.to_string(),
3287                    item_type: from.item_type().to_string(),
3288                });
3289            }
3290        }
3291    }
3292
3293    if from.id().is_system() {
3294        bail_unsupported!("creating a sink directly on a catalog object");
3295    }
3296
3297    let desc = from
3298        .relation_desc()
3299        .ok_or_else(|| sql_err!("item does not have a relation description"))?;
3300    let key_indices = match &connection {
3301        CreateSinkConnection::Kafka { key: Some(key), .. }
3302        | CreateSinkConnection::Iceberg { key: Some(key), .. } => {
3303            let key_columns = key
3304                .key_columns
3305                .clone()
3306                .into_iter()
3307                .map(normalize::column_name)
3308                .collect::<Vec<_>>();
3309            let mut uniq = BTreeSet::new();
3310            for col in key_columns.iter() {
3311                if !uniq.insert(col) {
3312                    sql_bail!("duplicate column referenced in KEY: {}", col);
3313                }
3314            }
3315            let indices = key_columns
3316                .iter()
3317                .map(|col| {
3318                    let name_idx =
3319                        desc.get_by_name(col)
3320                            .map(|(idx, _type)| idx)
3321                            .ok_or_else(|| {
3322                                sql_err!("column referenced in KEY does not exist: {}", col)
3323                            })?;
3324                    if desc.get_unambiguous_name(name_idx).is_none() {
3325                        sql_bail!("column referenced in KEY is ambiguous: {}", col);
3326                    }
3327                    Ok(name_idx)
3328                })
3329                .collect::<Result<Vec<_>, _>>()?;
3330
3331            // Iceberg equality deletes require primitive, non-float key columns.
3332            // Use an allow-list so that new types are rejected by default.
3333            if matches!(&connection, CreateSinkConnection::Iceberg { .. }) {
3334                let cols: Vec<_> = desc.iter().collect();
3335                for &idx in &indices {
3336                    let (col_name, col_type) = cols[idx];
3337                    let scalar = &col_type.scalar_type;
3338                    let is_valid = matches!(
3339                        scalar,
3340                        // integers
3341                        SqlScalarType::Bool
3342                            | SqlScalarType::Int16
3343                            | SqlScalarType::Int32
3344                            | SqlScalarType::Int64
3345                            | SqlScalarType::UInt16
3346                            | SqlScalarType::UInt32
3347                            | SqlScalarType::UInt64
3348                            // decimal / numeric
3349                            | SqlScalarType::Numeric { .. }
3350                            // date / time
3351                            | SqlScalarType::Date
3352                            | SqlScalarType::Time
3353                            | SqlScalarType::Timestamp { .. }
3354                            | SqlScalarType::TimestampTz { .. }
3355                            | SqlScalarType::Interval
3356                            | SqlScalarType::MzTimestamp
3357                            // string-like
3358                            | SqlScalarType::String
3359                            | SqlScalarType::Char { .. }
3360                            | SqlScalarType::VarChar { .. }
3361                            | SqlScalarType::PgLegacyChar
3362                            | SqlScalarType::PgLegacyName
3363                            | SqlScalarType::Bytes
3364                            | SqlScalarType::Jsonb
3365                            // identifiers
3366                            | SqlScalarType::Uuid
3367                            | SqlScalarType::Oid
3368                            | SqlScalarType::RegProc
3369                            | SqlScalarType::RegType
3370                            | SqlScalarType::RegClass
3371                            | SqlScalarType::MzAclItem
3372                            | SqlScalarType::AclItem
3373                            | SqlScalarType::Int2Vector
3374                    );
3375                    if !is_valid {
3376                        return Err(PlanError::IcebergSinkUnsupportedKeyType {
3377                            column: col_name.to_string(),
3378                            column_type: format!("{:?}", scalar),
3379                        });
3380                    }
3381                }
3382            }
3383
3384            let is_valid_key = desc
3385                .typ()
3386                .keys
3387                .iter()
3388                .any(|key_columns| key_columns.iter().all(|column| indices.contains(column)));
3389
3390            if !is_valid_key && envelope == SinkEnvelope::Upsert {
3391                if key.not_enforced {
3392                    scx.catalog
3393                        .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3394                            key: key_columns.clone(),
3395                            name: name.item.clone(),
3396                        })
3397                } else {
3398                    return Err(PlanError::UpsertSinkWithInvalidKey {
3399                        name: from_name.full_name_str(),
3400                        desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3401                        valid_keys: desc
3402                            .typ()
3403                            .keys
3404                            .iter()
3405                            .map(|key| {
3406                                key.iter()
3407                                    .map(|col| desc.get_name(*col).as_str().into())
3408                                    .collect()
3409                            })
3410                            .collect(),
3411                    });
3412                }
3413            }
3414            Some(indices)
3415        }
3416        CreateSinkConnection::Kafka { key: None, .. }
3417        | CreateSinkConnection::Iceberg { key: None, .. } => None,
3418    };
3419
3420    if key_indices.is_some() && envelope == SinkEnvelope::Append {
3421        sql_bail!("KEY is not supported for MODE APPEND Iceberg sinks");
3422    }
3423
3424    // Reject input columns that clash with the columns MODE APPEND adds to the Iceberg table.
3425    if envelope == SinkEnvelope::Append {
3426        if let CreateSinkConnection::Iceberg { .. } = &connection {
3427            use mz_storage_types::sinks::{
3428                ICEBERG_APPEND_DIFF_COLUMN, ICEBERG_APPEND_TIMESTAMP_COLUMN,
3429            };
3430            for (col_name, _) in desc.iter() {
3431                if col_name.as_str() == ICEBERG_APPEND_DIFF_COLUMN
3432                    || col_name.as_str() == ICEBERG_APPEND_TIMESTAMP_COLUMN
3433                {
3434                    sql_bail!(
3435                        "column {} conflicts with the system column that MODE APPEND \
3436                         adds to the Iceberg table",
3437                        col_name.quoted()
3438                    );
3439                }
3440            }
3441        }
3442    }
3443
3444    let headers_index = match &connection {
3445        CreateSinkConnection::Kafka {
3446            headers: Some(headers),
3447            ..
3448        } => {
3449            scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3450
3451            match envelope {
3452                SinkEnvelope::Upsert | SinkEnvelope::Append => (),
3453                SinkEnvelope::Debezium => {
3454                    sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3455                }
3456            };
3457
3458            let headers = normalize::column_name(headers.clone());
3459            let (idx, ty) = desc
3460                .get_by_name(&headers)
3461                .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3462
3463            if desc.get_unambiguous_name(idx).is_none() {
3464                sql_bail!("HEADERS column ({}) is ambiguous", headers);
3465            }
3466
3467            match &ty.scalar_type {
3468                SqlScalarType::Map { value_type, .. }
3469                    if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3470                _ => sql_bail!(
3471                    "HEADERS column must have type map[text => text] or map[text => bytea]"
3472                ),
3473            }
3474
3475            Some(idx)
3476        }
3477        _ => None,
3478    };
3479
3480    // pick the first valid natural relation key, if any
3481    let relation_key_indices = desc.typ().keys.get(0).cloned();
3482
3483    let key_desc_and_indices = key_indices.map(|key_indices| {
3484        let cols = desc
3485            .iter()
3486            .map(|(name, ty)| (name.clone(), ty.clone()))
3487            .collect::<Vec<_>>();
3488        let (names, types): (Vec<_>, Vec<_>) =
3489            key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3490        let typ = SqlRelationType::new(types);
3491        (RelationDesc::new(typ, names), key_indices)
3492    });
3493
3494    if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3495        return Err(PlanError::UpsertSinkWithoutKey);
3496    }
3497
3498    let CreateSinkOptionExtracted {
3499        snapshot,
3500        version,
3501        partition_strategy: _,
3502        seen: _,
3503        commit_interval,
3504    } = with_options.try_into()?;
3505
3506    let connection_builder = match connection {
3507        CreateSinkConnection::Kafka {
3508            connection,
3509            options,
3510            ..
3511        } => kafka_sink_builder(
3512            scx,
3513            connection,
3514            options,
3515            format,
3516            relation_key_indices,
3517            key_desc_and_indices,
3518            headers_index,
3519            desc.into_owned(),
3520            envelope,
3521            from.id(),
3522            commit_interval,
3523        )?,
3524        CreateSinkConnection::Iceberg {
3525            catalog_connection,
3526            aws_connection,
3527            options,
3528            ..
3529        } => iceberg_sink_builder(
3530            scx,
3531            catalog_connection,
3532            aws_connection,
3533            options,
3534            relation_key_indices,
3535            key_desc_and_indices,
3536            commit_interval,
3537            &desc,
3538        )?,
3539    };
3540
3541    // WITH SNAPSHOT defaults to true
3542    let with_snapshot = snapshot.unwrap_or(true);
3543    // VERSION defaults to 0
3544    let version = version.unwrap_or(0);
3545
3546    // We will rewrite the cluster if one is not provided, so we must use the
3547    // `in_cluster` value we plan to normalize when we canonicalize the create
3548    // statement.
3549    let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3550    let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3551
3552    Ok(Plan::CreateSink(CreateSinkPlan {
3553        name,
3554        sink: Sink {
3555            create_sql,
3556            from: from.global_id(),
3557            connection: connection_builder,
3558            envelope,
3559            version,
3560            commit_interval,
3561        },
3562        with_snapshot,
3563        if_not_exists,
3564        in_cluster: in_cluster.id(),
3565    }))
3566}
3567
3568fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3569    let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3570
3571    let existing_keys = desc
3572        .typ()
3573        .keys
3574        .iter()
3575        .map(|key_columns| {
3576            key_columns
3577                .iter()
3578                .map(|col| desc.get_name(*col).as_str())
3579                .join(", ")
3580        })
3581        .join(", ");
3582
3583    sql_err!(
3584        "Key constraint ({}) conflicts with existing key ({})",
3585        user_keys,
3586        existing_keys
3587    )
3588}
3589
3590/// Creating this by hand instead of using generate_extracted_config! macro
3591/// because the macro doesn't support parameterized enums. See <https://github.com/MaterializeInc/database-issues/issues/6698>
3592#[derive(Debug, Default, PartialEq, Clone)]
3593pub struct CsrConfigOptionExtracted {
3594    seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3595    pub(crate) avro_key_fullname: Option<String>,
3596    pub(crate) avro_value_fullname: Option<String>,
3597    pub(crate) null_defaults: bool,
3598    pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3599    pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3600    pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3601    pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3602}
3603
3604impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3605    type Error = crate::plan::PlanError;
3606    fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3607        let mut extracted = CsrConfigOptionExtracted::default();
3608        let mut common_doc_comments = BTreeMap::new();
3609        for option in v {
3610            if !extracted.seen.insert(option.name.clone()) {
3611                return Err(PlanError::Unstructured({
3612                    format!("{} specified more than once", option.name)
3613                }));
3614            }
3615            let option_name = option.name.clone();
3616            let option_name_str = option_name.to_ast_string_simple();
3617            let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3618                option_name: option_name.to_ast_string_simple(),
3619                err: e.into(),
3620            };
3621            let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3622                val.map(|s| match s {
3623                    WithOptionValue::Value(Value::String(s)) => {
3624                        mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3625                    }
3626                    _ => Err("must be a string".to_string()),
3627                })
3628                .transpose()
3629                .map_err(PlanError::Unstructured)
3630                .map_err(better_error)
3631            };
3632            match option.name {
3633                CsrConfigOptionName::AvroKeyFullname => {
3634                    extracted.avro_key_fullname =
3635                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3636                }
3637                CsrConfigOptionName::AvroValueFullname => {
3638                    extracted.avro_value_fullname =
3639                        <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3640                }
3641                CsrConfigOptionName::NullDefaults => {
3642                    extracted.null_defaults =
3643                        <bool>::try_from_value(option.value).map_err(better_error)?;
3644                }
3645                CsrConfigOptionName::AvroDocOn(doc_on) => {
3646                    let value = String::try_from_value(option.value.ok_or_else(|| {
3647                        PlanError::InvalidOptionValue {
3648                            option_name: option_name_str,
3649                            err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3650                        }
3651                    })?)
3652                    .map_err(better_error)?;
3653                    let key = match doc_on.identifier {
3654                        DocOnIdentifier::Column(ast::ColumnName {
3655                            relation: ResolvedItemName::Item { id, .. },
3656                            column: ResolvedColumnReference::Column { name, index: _ },
3657                        }) => DocTarget::Field {
3658                            object_id: id,
3659                            column_name: name,
3660                        },
3661                        DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3662                            DocTarget::Type(id)
3663                        }
3664                        _ => sql_bail!("invalid DOC ON identifier"),
3665                    };
3666
3667                    match doc_on.for_schema {
3668                        DocOnSchema::KeyOnly => {
3669                            extracted.key_doc_options.insert(key, value);
3670                        }
3671                        DocOnSchema::ValueOnly => {
3672                            extracted.value_doc_options.insert(key, value);
3673                        }
3674                        DocOnSchema::All => {
3675                            common_doc_comments.insert(key, value);
3676                        }
3677                    }
3678                }
3679                CsrConfigOptionName::KeyCompatibilityLevel => {
3680                    extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3681                }
3682                CsrConfigOptionName::ValueCompatibilityLevel => {
3683                    extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3684                }
3685            }
3686        }
3687
3688        for (key, value) in common_doc_comments {
3689            if !extracted.key_doc_options.contains_key(&key) {
3690                extracted.key_doc_options.insert(key.clone(), value.clone());
3691            }
3692            if !extracted.value_doc_options.contains_key(&key) {
3693                extracted.value_doc_options.insert(key, value);
3694            }
3695        }
3696        Ok(extracted)
3697    }
3698}
3699
3700fn iceberg_sink_builder(
3701    scx: &StatementContext,
3702    catalog_connection: ResolvedItemName,
3703    storage_connection: Option<ResolvedItemName>,
3704    options: Vec<IcebergSinkConfigOption<Aug>>,
3705    relation_key_indices: Option<Vec<usize>>,
3706    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3707    commit_interval: Option<Duration>,
3708    desc: &RelationDesc,
3709) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3710    // Reject types that arrow-rs's parquet writer cannot handle, before
3711    // sink creation. Pass the iceberg overrides so types iceberg remaps
3712    // (e.g. interval -> string) don't trip the check.
3713    ArrowBuilder::validate_desc_for_parquet(desc, iceberg_type_overrides)
3714        .map_err(|e| sql_err!("{}", e))?;
3715
3716    let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
3717    let catalog_connection_id = catalog_connection_item.id();
3718    if !matches!(
3719        catalog_connection_item.connection()?,
3720        Connection::IcebergCatalog(_)
3721    ) {
3722        sql_bail!(
3723            "{} is not an iceberg catalog connection",
3724            scx.catalog
3725                .resolve_full_name(catalog_connection_item.name())
3726                .to_string()
3727                .quoted()
3728        );
3729    };
3730
3731    let storage_connection_item = storage_connection
3732        .map(|c| scx.get_item_by_resolved_name(&c))
3733        .transpose()?;
3734    let storage_connection_id = storage_connection_item.as_ref().map(|c| c.id());
3735    if let Some(c) = &storage_connection_item
3736        && !matches!(c.connection()?, Connection::Aws(_))
3737    {
3738        sql_bail!(
3739            "{} is not an AWS connection",
3740            scx.catalog.resolve_full_name(c.name()).to_string().quoted()
3741        );
3742    }
3743
3744    let IcebergSinkConfigOptionExtracted {
3745        table,
3746        namespace,
3747        seen: _,
3748    }: IcebergSinkConfigOptionExtracted = options.try_into()?;
3749
3750    let Some(table) = table else {
3751        sql_bail!("Iceberg sink must specify TABLE");
3752    };
3753    let Some(namespace) = namespace else {
3754        sql_bail!("Iceberg sink must specify NAMESPACE");
3755    };
3756    if commit_interval.is_none() {
3757        sql_bail!("Iceberg sink must specify COMMIT INTERVAL");
3758    }
3759
3760    Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
3761        catalog_connection_id,
3762        catalog_connection: catalog_connection_id,
3763        storage_connection_id,
3764        storage_connection: storage_connection_id,
3765        table,
3766        namespace,
3767        relation_key_indices,
3768        key_desc_and_indices,
3769    }))
3770}
3771
3772fn kafka_sink_builder(
3773    scx: &StatementContext,
3774    connection: ResolvedItemName,
3775    options: Vec<KafkaSinkConfigOption<Aug>>,
3776    format: Option<FormatSpecifier<Aug>>,
3777    relation_key_indices: Option<Vec<usize>>,
3778    key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3779    headers_index: Option<usize>,
3780    value_desc: RelationDesc,
3781    envelope: SinkEnvelope,
3782    sink_from: CatalogItemId,
3783    commit_interval: Option<Duration>,
3784) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3785    // Get Kafka connection.
3786    let connection_item = scx.get_item_by_resolved_name(&connection)?;
3787    let connection_id = connection_item.id();
3788    match connection_item.connection()? {
3789        Connection::Kafka(_) => (),
3790        _ => sql_bail!(
3791            "{} is not a kafka connection",
3792            scx.catalog.resolve_full_name(connection_item.name())
3793        ),
3794    };
3795
3796    if commit_interval.is_some() {
3797        sql_bail!("COMMIT INTERVAL option is not supported with KAFKA sinks");
3798    }
3799
3800    let KafkaSinkConfigOptionExtracted {
3801        topic,
3802        compression_type,
3803        partition_by,
3804        progress_group_id_prefix,
3805        transactional_id_prefix,
3806        legacy_ids,
3807        topic_config,
3808        mut topic_metadata_refresh_interval,
3809        topic_partition_count,
3810        topic_replication_factor,
3811        seen: _,
3812    }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3813
3814    let transactional_id = match (transactional_id_prefix, legacy_ids) {
3815        (Some(_), Some(true)) => {
3816            sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3817        }
3818        (None, Some(true)) => KafkaIdStyle::Legacy,
3819        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3820    };
3821
3822    let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3823        (Some(_), Some(true)) => {
3824            sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3825        }
3826        (None, Some(true)) => KafkaIdStyle::Legacy,
3827        (prefix, _) => KafkaIdStyle::Prefix(prefix),
3828    };
3829
3830    let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3831
3832    if topic_metadata_refresh_interval > MAX_KAFKA_TOPIC_METADATA_REFRESH_INTERVAL {
3833        // This is a librdkafka-enforced restriction that, if violated,
3834        // would result in a runtime error for the source.
3835        sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3836    } else if topic_metadata_refresh_interval < MIN_KAFKA_TOPIC_METADATA_REFRESH_INTERVAL {
3837        // We enforce a minimum of 1 second here to prevent excessive refreshes, and ensure that
3838        // tokio::time::interval receives a valid (positive) duration.
3839        topic_metadata_refresh_interval = MIN_KAFKA_TOPIC_METADATA_REFRESH_INTERVAL;
3840    }
3841
3842    let assert_positive = |val: Option<i32>, name: &str| {
3843        if let Some(val) = val {
3844            if val <= 0 {
3845                sql_bail!("{} must be a positive integer", name);
3846            }
3847        }
3848        val.map(NonNeg::try_from)
3849            .transpose()
3850            .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3851    };
3852    let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3853    let topic_replication_factor =
3854        assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3855
3856    // Helper method to parse avro connection options for format specifiers that use avro
3857    // for either key or value encoding.
3858    let gen_avro_schema_options = |conn| {
3859        let CsrConnectionAvro {
3860            connection:
3861                CsrConnection {
3862                    connection,
3863                    options,
3864                },
3865            seed,
3866            key_strategy,
3867            value_strategy,
3868        } = conn;
3869        if seed.is_some() {
3870            sql_bail!("SEED option does not make sense with sinks");
3871        }
3872        if key_strategy.is_some() {
3873            sql_bail!("KEY STRATEGY option does not make sense with sinks");
3874        }
3875        if value_strategy.is_some() {
3876            sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3877        }
3878
3879        let item = scx.get_item_by_resolved_name(&connection)?;
3880        let csr_connection = match item.connection()? {
3881            Connection::Csr(_) => item.id(),
3882            _ => {
3883                sql_bail!(
3884                    "{} is not a schema registry connection",
3885                    scx.catalog
3886                        .resolve_full_name(item.name())
3887                        .to_string()
3888                        .quoted()
3889                )
3890            }
3891        };
3892        let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
3893
3894        if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
3895            sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
3896        }
3897
3898        if key_desc_and_indices.is_some()
3899            && (extracted_options.avro_key_fullname.is_some()
3900                ^ extracted_options.avro_value_fullname.is_some())
3901        {
3902            sql_bail!(
3903                "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
3904            );
3905        }
3906
3907        Ok((csr_connection, extracted_options))
3908    };
3909
3910    let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
3911        Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
3912        Format::Bytes if desc.arity() == 1 => {
3913            let col_type = &desc.typ().column_types[0].scalar_type;
3914            if !mz_pgrepr::Value::can_encode_binary(col_type) {
3915                bail_unsupported!(format!(
3916                    "BYTES format with non-encodable type: {:?}",
3917                    col_type
3918                ));
3919            }
3920
3921            Ok(KafkaSinkFormatType::Bytes)
3922        }
3923        Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
3924        Format::Bytes | Format::Text => {
3925            bail_unsupported!("BYTES or TEXT format with multiple columns")
3926        }
3927        Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
3928        Format::Avro(AvroSchema::Csr { csr_connection }) => {
3929            let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
3930            let schema = if is_key {
3931                AvroSchemaGenerator::new(
3932                    desc.clone(),
3933                    false,
3934                    options.key_doc_options,
3935                    options.avro_key_fullname.as_deref().unwrap_or("row"),
3936                    options.null_defaults,
3937                    Some(sink_from),
3938                    false,
3939                )?
3940                .schema()
3941                .to_string()
3942            } else {
3943                AvroSchemaGenerator::new(
3944                    desc.clone(),
3945                    matches!(envelope, SinkEnvelope::Debezium),
3946                    options.value_doc_options,
3947                    options.avro_value_fullname.as_deref().unwrap_or("envelope"),
3948                    options.null_defaults,
3949                    Some(sink_from),
3950                    true,
3951                )?
3952                .schema()
3953                .to_string()
3954            };
3955            Ok(KafkaSinkFormatType::Avro {
3956                schema,
3957                compatibility_level: if is_key {
3958                    options.key_compatibility_level
3959                } else {
3960                    options.value_compatibility_level
3961                },
3962                wire_format: WireFormat::Confluent {
3963                    registry: Some(csr_connection),
3964                },
3965            })
3966        }
3967        format => bail_unsupported!(format!("sink format {:?}", format)),
3968    };
3969
3970    let partition_by = match &partition_by {
3971        Some(partition_by) => {
3972            let mut scope = Scope::from_source(None, value_desc.iter_names());
3973
3974            match envelope {
3975                SinkEnvelope::Upsert | SinkEnvelope::Append => (),
3976                SinkEnvelope::Debezium => {
3977                    let key_indices: HashSet<_> = key_desc_and_indices
3978                        .as_ref()
3979                        .map(|(_desc, indices)| indices.as_slice())
3980                        .unwrap_or_default()
3981                        .into_iter()
3982                        .collect();
3983                    for (i, item) in scope.items.iter_mut().enumerate() {
3984                        if !key_indices.contains(&i) {
3985                            item.error_if_referenced = Some(|_table, column| {
3986                                PlanError::InvalidPartitionByEnvelopeDebezium {
3987                                    column_name: column.to_string(),
3988                                }
3989                            });
3990                        }
3991                    }
3992                }
3993            };
3994
3995            let ecx = &ExprContext {
3996                qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
3997                name: "PARTITION BY",
3998                scope: &scope,
3999                relation_type: value_desc.typ(),
4000                allow_aggregates: false,
4001                allow_subqueries: false,
4002                allow_parameters: false,
4003                allow_windows: false,
4004            };
4005            let expr = plan_expr(ecx, partition_by)?.cast_to(
4006                ecx,
4007                CastContext::Assignment,
4008                &SqlScalarType::UInt64,
4009            )?;
4010            let expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
4011
4012            Some(expr)
4013        }
4014        _ => None,
4015    };
4016
4017    // Map from the format specifier of the statement to the individual key/value formats for the sink.
4018    let format = match format {
4019        Some(FormatSpecifier::KeyValue { key, value }) => {
4020            let key_format = match key_desc_and_indices.as_ref() {
4021                Some((desc, _indices)) => Some(map_format(key, desc, true)?),
4022                None => None,
4023            };
4024            KafkaSinkFormat {
4025                value_format: map_format(value, &value_desc, false)?,
4026                key_format,
4027            }
4028        }
4029        Some(FormatSpecifier::Bare(format)) => {
4030            let key_format = match key_desc_and_indices.as_ref() {
4031                Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
4032                None => None,
4033            };
4034            KafkaSinkFormat {
4035                value_format: map_format(format, &value_desc, false)?,
4036                key_format,
4037            }
4038        }
4039        None => bail_unsupported!("sink without format"),
4040    };
4041
4042    Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
4043        connection_id,
4044        connection: connection_id,
4045        format,
4046        topic: topic_name,
4047        relation_key_indices,
4048        key_desc_and_indices,
4049        headers_index,
4050        value_desc,
4051        partition_by,
4052        compression_type,
4053        progress_group_id,
4054        transactional_id,
4055        topic_options: KafkaTopicOptions {
4056            partition_count: topic_partition_count,
4057            replication_factor: topic_replication_factor,
4058            topic_config: topic_config.unwrap_or_default(),
4059        },
4060        topic_metadata_refresh_interval,
4061    }))
4062}
4063
4064pub fn describe_create_index(
4065    _: &StatementContext,
4066    _: CreateIndexStatement<Aug>,
4067) -> Result<StatementDesc, PlanError> {
4068    Ok(StatementDesc::new(None))
4069}
4070
4071pub fn plan_create_index(
4072    scx: &StatementContext,
4073    mut stmt: CreateIndexStatement<Aug>,
4074) -> Result<Plan, PlanError> {
4075    let CreateIndexStatement {
4076        name,
4077        on_name,
4078        in_cluster,
4079        key_parts,
4080        with_options,
4081        if_not_exists,
4082    } = &mut stmt;
4083    let on = scx.get_item_by_resolved_name(on_name)?;
4084
4085    {
4086        use CatalogItemType::*;
4087        match on.item_type() {
4088            Table | Source | View | MaterializedView => {
4089                if on.replacement_target().is_some() {
4090                    sql_bail!(
4091                        "index cannot be created on {} because it is a replacement {}",
4092                        on_name.full_name_str(),
4093                        on.item_type(),
4094                    );
4095                }
4096            }
4097            Sink | Index | Type | Func | Secret | Connection => {
4098                sql_bail!(
4099                    "index cannot be created on {} because it is a {}",
4100                    on_name.full_name_str(),
4101                    on.item_type(),
4102                );
4103            }
4104        }
4105    }
4106
4107    let on_desc = on
4108        .relation_desc()
4109        .ok_or_else(|| sql_err!("item does not have a relation description"))?;
4110
4111    let filled_key_parts = match key_parts {
4112        Some(kp) => kp.to_vec(),
4113        None => {
4114            // `key_parts` is None if we're creating a "default" index.
4115            // Precompute which column names are unambiguous in a single pass,
4116            // avoiding the O(n * k) cost of calling get_unambiguous_name per
4117            // key column.
4118            let mut name_counts = BTreeMap::new();
4119            for name in on_desc.iter_names() {
4120                *name_counts.entry(name).or_insert(0usize) += 1;
4121            }
4122            let key = on_desc.typ().default_key();
4123            key.iter()
4124                .map(|i| {
4125                    let name = on_desc.get_name(*i);
4126                    if name_counts.get(name).copied() == Some(1) {
4127                        Expr::Identifier(vec![name.clone().into()])
4128                    } else {
4129                        Expr::Value(Value::Number((i + 1).to_string()))
4130                    }
4131                })
4132                .collect()
4133        }
4134    };
4135    let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4136
4137    let index_name = if let Some(name) = name {
4138        QualifiedItemName {
4139            qualifiers: on.name().qualifiers.clone(),
4140            item: normalize::ident(name.clone()),
4141        }
4142    } else {
4143        let mut idx_name = QualifiedItemName {
4144            qualifiers: on.name().qualifiers.clone(),
4145            item: on.name().item.clone(),
4146        };
4147        if key_parts.is_none() {
4148            // We're trying to create the "default" index.
4149            idx_name.item += "_primary_idx";
4150        } else {
4151            // Use PG schema for automatically naming indexes:
4152            // `<table>_<_-separated indexed expressions>_idx`
4153            let index_name_col_suffix = keys
4154                .iter()
4155                .map(|k| match k {
4156                    mz_expr::MirScalarExpr::Column(i, name) => {
4157                        match (on_desc.get_unambiguous_name(*i), &name.0) {
4158                            (Some(col_name), _) => col_name.to_string(),
4159                            (None, Some(name)) => name.to_string(),
4160                            (None, None) => format!("{}", i + 1),
4161                        }
4162                    }
4163                    _ => "expr".to_string(),
4164                })
4165                .join("_");
4166            write!(idx_name.item, "_{index_name_col_suffix}_idx")
4167                .expect("write on strings cannot fail");
4168            idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4169        }
4170
4171        if !*if_not_exists {
4172            scx.catalog.find_available_name(idx_name)
4173        } else {
4174            idx_name
4175        }
4176    };
4177
4178    // Check for an object in the catalog with this same name
4179    let full_name = scx.catalog.resolve_full_name(&index_name);
4180    let partial_name = PartialItemName::from(full_name.clone());
4181    // For PostgreSQL compatibility, we need to prevent creating indexes when
4182    // there is an existing object *or* type of the same name.
4183    //
4184    // Technically, we only need to prevent coexistence of indexes and types
4185    // that have an associated relation (record types but not list/map types).
4186    // Enforcing that would be more complicated, though. It's backwards
4187    // compatible to weaken this restriction in the future.
4188    if let (Ok(item), false, false) = (
4189        scx.catalog.resolve_item_or_type(&partial_name),
4190        *if_not_exists,
4191        scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4192    ) {
4193        return Err(PlanError::ItemAlreadyExists {
4194            name: full_name.to_string(),
4195            item_type: item.item_type(),
4196        });
4197    }
4198
4199    let options = plan_index_options(scx, with_options.clone())?;
4200    let cluster_id = match in_cluster {
4201        None => scx.resolve_cluster(None)?.id(),
4202        Some(in_cluster) => in_cluster.id,
4203    };
4204
4205    *in_cluster = Some(ResolvedClusterName {
4206        id: cluster_id,
4207        print_name: None,
4208    });
4209
4210    // Normalize `stmt`.
4211    *name = Some(Ident::new(index_name.item.clone())?);
4212    *key_parts = Some(filled_key_parts);
4213    let if_not_exists = *if_not_exists;
4214
4215    let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4216    let compaction_window = options.iter().find_map(|o| {
4217        #[allow(irrefutable_let_patterns)]
4218        if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4219            Some(lcw.clone())
4220        } else {
4221            None
4222        }
4223    });
4224
4225    Ok(Plan::CreateIndex(CreateIndexPlan {
4226        name: index_name,
4227        index: Index {
4228            create_sql,
4229            on: on.global_id(),
4230            keys,
4231            cluster_id,
4232            compaction_window,
4233        },
4234        if_not_exists,
4235    }))
4236}
4237
4238pub fn describe_create_type(
4239    _: &StatementContext,
4240    _: CreateTypeStatement<Aug>,
4241) -> Result<StatementDesc, PlanError> {
4242    Ok(StatementDesc::new(None))
4243}
4244
4245pub fn plan_create_type(
4246    scx: &StatementContext,
4247    stmt: CreateTypeStatement<Aug>,
4248) -> Result<Plan, PlanError> {
4249    let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4250    let CreateTypeStatement { name, as_type, .. } = stmt;
4251
4252    fn validate_data_type(
4253        scx: &StatementContext,
4254        data_type: ResolvedDataType,
4255        as_type: &str,
4256        key: &str,
4257    ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4258        let (id, modifiers) = match data_type {
4259            ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4260            _ => sql_bail!(
4261                "CREATE TYPE ... AS {}option {} can only use named data types, but \
4262                        found unnamed data type {}. Use CREATE TYPE to create a named type first",
4263                as_type,
4264                key,
4265                data_type.human_readable_name(),
4266            ),
4267        };
4268
4269        let item = scx.catalog.get_item(&id);
4270        match item.type_details() {
4271            None => sql_bail!(
4272                "{} must be of class type, but received {} which is of class {}",
4273                key,
4274                scx.catalog.resolve_full_name(item.name()),
4275                item.item_type()
4276            ),
4277            Some(CatalogTypeDetails {
4278                typ: CatalogType::Char,
4279                ..
4280            }) => {
4281                bail_unsupported!("embedding char type in a list or map")
4282            }
4283            _ => {
4284                // Validate that the modifiers are actually valid.
4285                scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4286
4287                Ok((id, modifiers))
4288            }
4289        }
4290    }
4291
4292    let inner = match as_type {
4293        CreateTypeAs::List { options } => {
4294            let CreateTypeListOptionExtracted {
4295                element_type,
4296                seen: _,
4297            } = CreateTypeListOptionExtracted::try_from(options)?;
4298            let element_type =
4299                element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4300            let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4301            CatalogType::List {
4302                element_reference: id,
4303                element_modifiers: modifiers,
4304            }
4305        }
4306        CreateTypeAs::Map { options } => {
4307            let CreateTypeMapOptionExtracted {
4308                key_type,
4309                value_type,
4310                seen: _,
4311            } = CreateTypeMapOptionExtracted::try_from(options)?;
4312            let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4313            let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4314            let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4315            let (value_id, value_modifiers) =
4316                validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4317            CatalogType::Map {
4318                key_reference: key_id,
4319                key_modifiers,
4320                value_reference: value_id,
4321                value_modifiers,
4322            }
4323        }
4324        CreateTypeAs::Record { column_defs } => {
4325            let mut fields = vec![];
4326            for column_def in column_defs {
4327                let data_type = column_def.data_type;
4328                let key = ident(column_def.name.clone());
4329                let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4330                fields.push(CatalogRecordField {
4331                    name: ColumnName::from(key.clone()),
4332                    type_reference: id,
4333                    type_modifiers: modifiers,
4334                });
4335            }
4336            CatalogType::Record { fields }
4337        }
4338    };
4339
4340    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4341
4342    // Check for an object in the catalog with this same name
4343    let full_name = scx.catalog.resolve_full_name(&name);
4344    let partial_name = PartialItemName::from(full_name.clone());
4345    // For PostgreSQL compatibility, we need to prevent creating types when
4346    // there is an existing object *or* type of the same name.
4347    if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4348        if item.item_type().conflicts_with_type() {
4349            return Err(PlanError::ItemAlreadyExists {
4350                name: full_name.to_string(),
4351                item_type: item.item_type(),
4352            });
4353        }
4354    }
4355
4356    Ok(Plan::CreateType(CreateTypePlan {
4357        name,
4358        typ: Type { create_sql, inner },
4359    }))
4360}
4361
4362generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4363
4364generate_extracted_config!(
4365    CreateTypeMapOption,
4366    (KeyType, ResolvedDataType),
4367    (ValueType, ResolvedDataType)
4368);
4369
4370#[derive(Debug)]
4371pub enum PlannedAlterRoleOption {
4372    Attributes(PlannedRoleAttributes),
4373    Variable(PlannedRoleVariable),
4374}
4375
4376#[derive(Debug, Clone)]
4377pub struct PlannedRoleAttributes {
4378    pub inherit: Option<bool>,
4379    pub password: Option<Password>,
4380    pub scram_iterations: Option<NonZeroU32>,
4381    /// `nopassword` is set to true if the password is from the parser is None.
4382    /// This is semantically different than not supplying a password at all,
4383    /// to allow for unsetting a password.
4384    pub nopassword: Option<bool>,
4385    pub superuser: Option<bool>,
4386    pub login: Option<bool>,
4387}
4388
4389fn plan_role_attributes(
4390    options: Vec<RoleAttribute>,
4391    scx: &StatementContext,
4392) -> Result<PlannedRoleAttributes, PlanError> {
4393    let mut planned_attributes = PlannedRoleAttributes {
4394        inherit: None,
4395        password: None,
4396        scram_iterations: None,
4397        superuser: None,
4398        login: None,
4399        nopassword: None,
4400    };
4401
4402    for option in options {
4403        match option {
4404            RoleAttribute::Inherit | RoleAttribute::NoInherit
4405                if planned_attributes.inherit.is_some() =>
4406            {
4407                sql_bail!("conflicting or redundant options");
4408            }
4409            RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4410                bail_never_supported!(
4411                    "CREATECLUSTER attribute",
4412                    "sql/create-role/#details",
4413                    "Use system privileges instead."
4414                );
4415            }
4416            RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4417                bail_never_supported!(
4418                    "CREATEDB attribute",
4419                    "sql/create-role/#details",
4420                    "Use system privileges instead."
4421                );
4422            }
4423            RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4424                bail_never_supported!(
4425                    "CREATEROLE attribute",
4426                    "sql/create-role/#details",
4427                    "Use system privileges instead."
4428                );
4429            }
4430            RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4431                sql_bail!("conflicting or redundant options");
4432            }
4433
4434            RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4435            RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4436            RoleAttribute::Password(password) => {
4437                if let Some(password) = password {
4438                    planned_attributes.password = Some(password.into());
4439                    planned_attributes.scram_iterations =
4440                        Some(scx.catalog.system_vars().scram_iterations())
4441                } else {
4442                    planned_attributes.nopassword = Some(true);
4443                }
4444            }
4445            RoleAttribute::SuperUser => {
4446                if planned_attributes.superuser == Some(false) {
4447                    sql_bail!("conflicting or redundant options");
4448                }
4449                planned_attributes.superuser = Some(true);
4450            }
4451            RoleAttribute::NoSuperUser => {
4452                if planned_attributes.superuser == Some(true) {
4453                    sql_bail!("conflicting or redundant options");
4454                }
4455                planned_attributes.superuser = Some(false);
4456            }
4457            RoleAttribute::Login => {
4458                if planned_attributes.login == Some(false) {
4459                    sql_bail!("conflicting or redundant options");
4460                }
4461                planned_attributes.login = Some(true);
4462            }
4463            RoleAttribute::NoLogin => {
4464                if planned_attributes.login == Some(true) {
4465                    sql_bail!("conflicting or redundant options");
4466                }
4467                planned_attributes.login = Some(false);
4468            }
4469        }
4470    }
4471    if planned_attributes.inherit == Some(false) {
4472        bail_unsupported!("non inherit roles");
4473    }
4474
4475    Ok(planned_attributes)
4476}
4477
4478#[derive(Debug)]
4479pub enum PlannedRoleVariable {
4480    Set { name: String, value: VariableValue },
4481    Reset { name: String },
4482}
4483
4484impl PlannedRoleVariable {
4485    pub fn name(&self) -> &str {
4486        match self {
4487            PlannedRoleVariable::Set { name, .. } => name,
4488            PlannedRoleVariable::Reset { name } => name,
4489        }
4490    }
4491}
4492
4493fn plan_role_variable(
4494    scx: &StatementContext,
4495    variable: SetRoleVar,
4496) -> Result<PlannedRoleVariable, PlanError> {
4497    let plan = match variable {
4498        SetRoleVar::Set { name, value } => {
4499            let name = name.to_string();
4500            let value = scl::plan_set_variable_to(value)?;
4501            // Gate feature-flagged isolation levels, matching the `SET` and
4502            // connection-option paths in `SessionVars::set`.
4503            if let VariableValue::Values(values) = &value {
4504                vars::check_transaction_isolation_feature_flag(
4505                    &name,
4506                    VarInput::SqlSet(values),
4507                    scx.catalog.system_vars(),
4508                )?;
4509            }
4510            PlannedRoleVariable::Set { name, value }
4511        }
4512        SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4513            name: name.to_string(),
4514        },
4515    };
4516    Ok(plan)
4517}
4518
4519pub fn describe_create_role(
4520    _: &StatementContext,
4521    _: CreateRoleStatement,
4522) -> Result<StatementDesc, PlanError> {
4523    Ok(StatementDesc::new(None))
4524}
4525
4526pub fn plan_create_role(
4527    scx: &StatementContext,
4528    CreateRoleStatement { name, options }: CreateRoleStatement,
4529) -> Result<Plan, PlanError> {
4530    let attributes = plan_role_attributes(options, scx)?;
4531    Ok(Plan::CreateRole(CreateRolePlan {
4532        name: normalize::ident(name),
4533        attributes: attributes.into(),
4534    }))
4535}
4536
4537pub fn plan_create_network_policy(
4538    ctx: &StatementContext,
4539    CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4540) -> Result<Plan, PlanError> {
4541    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4542    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4543
4544    let Some(rule_defs) = policy_options.rules else {
4545        sql_bail!("RULES must be specified when creating network policies.");
4546    };
4547
4548    let mut rules = vec![];
4549    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4550        let NetworkPolicyRuleOptionExtracted {
4551            seen: _,
4552            direction,
4553            action,
4554            address,
4555        } = options.try_into()?;
4556        let (direction, action, address) = match (direction, action, address) {
4557            (Some(direction), Some(action), Some(address)) => (
4558                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4559                NetworkPolicyRuleAction::try_from(action.as_str())?,
4560                PolicyAddress::try_from(address.as_str())?,
4561            ),
4562            (_, _, _) => {
4563                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4564            }
4565        };
4566        rules.push(NetworkPolicyRule {
4567            name: normalize::ident(name),
4568            direction,
4569            action,
4570            address,
4571        });
4572    }
4573
4574    if rules.len()
4575        > ctx
4576            .catalog
4577            .system_vars()
4578            .max_rules_per_network_policy()
4579            .try_into()?
4580    {
4581        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4582    }
4583
4584    Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4585        name: normalize::ident(name),
4586        rules,
4587    }))
4588}
4589
4590pub fn plan_alter_network_policy(
4591    ctx: &StatementContext,
4592    AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4593) -> Result<Plan, PlanError> {
4594    ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4595
4596    let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4597    let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4598
4599    let Some(rule_defs) = policy_options.rules else {
4600        sql_bail!("RULES must be specified when creating network policies.");
4601    };
4602
4603    let mut rules = vec![];
4604    for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4605        let NetworkPolicyRuleOptionExtracted {
4606            seen: _,
4607            direction,
4608            action,
4609            address,
4610        } = options.try_into()?;
4611
4612        let (direction, action, address) = match (direction, action, address) {
4613            (Some(direction), Some(action), Some(address)) => (
4614                NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4615                NetworkPolicyRuleAction::try_from(action.as_str())?,
4616                PolicyAddress::try_from(address.as_str())?,
4617            ),
4618            (_, _, _) => {
4619                sql_bail!("Direction, Address, and Action must specified when creating a rule")
4620            }
4621        };
4622        rules.push(NetworkPolicyRule {
4623            name: normalize::ident(name),
4624            direction,
4625            action,
4626            address,
4627        });
4628    }
4629    if rules.len()
4630        > ctx
4631            .catalog
4632            .system_vars()
4633            .max_rules_per_network_policy()
4634            .try_into()?
4635    {
4636        sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4637    }
4638
4639    Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4640        id: policy.id(),
4641        name: normalize::ident(name),
4642        rules,
4643    }))
4644}
4645
4646pub fn describe_create_cluster(
4647    _: &StatementContext,
4648    _: CreateClusterStatement<Aug>,
4649) -> Result<StatementDesc, PlanError> {
4650    Ok(StatementDesc::new(None))
4651}
4652
4653// WARNING:
4654// DO NOT set any `Default` value here using the built-in mechanism of `generate_extracted_config`!
4655// These options are also used in ALTER CLUSTER, where not giving an option means that the value of
4656// that option stays the same. If you were to give a default value here, then not giving that option
4657// to ALTER CLUSTER would always reset the value of that option to the default.
4658generate_extracted_config!(
4659    ClusterOption,
4660    (AvailabilityZones, Vec<String>),
4661    (Disk, bool),
4662    (IntrospectionDebugging, bool),
4663    (IntrospectionInterval, OptionalDuration),
4664    (Managed, bool),
4665    (Replicas, Vec<ReplicaDefinition<Aug>>),
4666    (ReplicationFactor, u32),
4667    (Size, String),
4668    (Schedule, ClusterScheduleOptionValue),
4669    (WorkloadClass, OptionalString)
4670);
4671
4672generate_extracted_config!(
4673    NetworkPolicyOption,
4674    (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4675);
4676
4677generate_extracted_config!(
4678    NetworkPolicyRuleOption,
4679    (Direction, String),
4680    (Action, String),
4681    (Address, String)
4682);
4683
4684generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4685
4686generate_extracted_config!(
4687    ClusterAlterUntilReadyOption,
4688    (Timeout, Duration),
4689    (OnTimeout, String)
4690);
4691
4692generate_extracted_config!(
4693    ClusterFeature,
4694    (ReoptimizeImportedViews, Option<bool>, Default(None)),
4695    (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4696    (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4697    (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4698    (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4699    (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4700    (
4701        EnableProjectionPushdownAfterRelationCse,
4702        Option<bool>,
4703        Default(None)
4704    )
4705);
4706
4707/// Convert a [`CreateClusterStatement`] into a [`Plan`].
4708///
4709/// The reverse of [`unplan_create_cluster`].
4710pub fn plan_create_cluster(
4711    scx: &StatementContext,
4712    stmt: CreateClusterStatement<Aug>,
4713) -> Result<Plan, PlanError> {
4714    let plan = plan_create_cluster_inner(scx, stmt)?;
4715
4716    // Roundtrip through unplan and make sure that we end up with the same plan.
4717    if let CreateClusterVariant::Managed(_) = &plan.variant {
4718        let stmt = unplan_create_cluster(scx, plan.clone())
4719            .map_err(|e| PlanError::Replan(e.to_string()))?;
4720        let create_sql = stmt.to_ast_string_stable();
4721        let stmt = parse::parse(&create_sql)
4722            .map_err(|e| PlanError::Replan(e.to_string()))?
4723            .into_element()
4724            .ast;
4725        let (stmt, _resolved_ids) =
4726            names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4727        let stmt = match stmt {
4728            Statement::CreateCluster(stmt) => stmt,
4729            stmt => {
4730                return Err(PlanError::Replan(format!(
4731                    "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4732                )));
4733            }
4734        };
4735        let replan =
4736            plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4737        if plan != replan {
4738            return Err(PlanError::Replan(format!(
4739                "replan does not match: plan={plan:?}, replan={replan:?}"
4740            )));
4741        }
4742    }
4743
4744    Ok(Plan::CreateCluster(plan))
4745}
4746
4747pub fn plan_create_cluster_inner(
4748    scx: &StatementContext,
4749    CreateClusterStatement {
4750        name,
4751        options,
4752        features,
4753    }: CreateClusterStatement<Aug>,
4754) -> Result<CreateClusterPlan, PlanError> {
4755    let ClusterOptionExtracted {
4756        availability_zones,
4757        introspection_debugging,
4758        introspection_interval,
4759        managed,
4760        replicas,
4761        replication_factor,
4762        seen: _,
4763        size,
4764        disk,
4765        schedule,
4766        workload_class,
4767    }: ClusterOptionExtracted = options.try_into()?;
4768
4769    let managed = managed.unwrap_or_else(|| replicas.is_none());
4770
4771    if !scx.catalog.active_role_id().is_system() {
4772        if !features.is_empty() {
4773            sql_bail!("FEATURES not supported for non-system users");
4774        }
4775        if workload_class.is_some() {
4776            sql_bail!("WORKLOAD CLASS not supported for non-system users");
4777        }
4778    }
4779
4780    let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4781    let workload_class = workload_class.and_then(|v| v.0);
4782
4783    if managed {
4784        if replicas.is_some() {
4785            sql_bail!("REPLICAS not supported for managed clusters");
4786        }
4787        let Some(size) = size else {
4788            sql_bail!("SIZE must be specified for managed clusters");
4789        };
4790
4791        if disk.is_some() {
4792            // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
4793            // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
4794            // we'll be able to remove the `DISK` option entirely.
4795            if scx.catalog.is_cluster_size_cc(&size) {
4796                sql_bail!(
4797                    "DISK option not supported for modern cluster sizes because disk is always enabled"
4798                );
4799            }
4800
4801            scx.catalog
4802                .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4803        }
4804
4805        let compute = plan_compute_replica_config(
4806            introspection_interval,
4807            introspection_debugging.unwrap_or(false),
4808        )?;
4809
4810        let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4811            replication_factor.unwrap_or_else(|| {
4812                scx.catalog
4813                    .system_vars()
4814                    .default_cluster_replication_factor()
4815            })
4816        } else {
4817            scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4818            if replication_factor.is_some() {
4819                sql_bail!(
4820                    "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4821                );
4822            }
4823            // If we have a non-trivial schedule, then let's not have any replicas initially,
4824            // to avoid quickly going back and forth if the schedule doesn't want a replica
4825            // initially.
4826            0
4827        };
4828        let availability_zones = availability_zones.unwrap_or_default();
4829
4830        if !availability_zones.is_empty() {
4831            scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4832        }
4833
4834        // Plan OptimizerFeatureOverrides.
4835        let ClusterFeatureExtracted {
4836            reoptimize_imported_views,
4837            enable_eager_delta_joins,
4838            enable_new_outer_join_lowering,
4839            enable_variadic_left_join_lowering,
4840            enable_letrec_fixpoint_analysis,
4841            enable_join_prioritize_arranged,
4842            enable_projection_pushdown_after_relation_cse,
4843            seen: _,
4844        } = ClusterFeatureExtracted::try_from(features)?;
4845        let optimizer_feature_overrides = OptimizerFeatureOverrides {
4846            reoptimize_imported_views,
4847            enable_eager_delta_joins,
4848            enable_new_outer_join_lowering,
4849            enable_variadic_left_join_lowering,
4850            enable_letrec_fixpoint_analysis,
4851            enable_join_prioritize_arranged,
4852            enable_projection_pushdown_after_relation_cse,
4853            ..Default::default()
4854        };
4855
4856        let schedule = plan_cluster_schedule(schedule)?;
4857
4858        Ok(CreateClusterPlan {
4859            name: normalize::ident(name),
4860            variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4861                replication_factor,
4862                size,
4863                availability_zones,
4864                compute,
4865                optimizer_feature_overrides,
4866                schedule,
4867            }),
4868            workload_class,
4869        })
4870    } else {
4871        let Some(replica_defs) = replicas else {
4872            sql_bail!("REPLICAS must be specified for unmanaged clusters");
4873        };
4874        if availability_zones.is_some() {
4875            sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4876        }
4877        if replication_factor.is_some() {
4878            sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4879        }
4880        if introspection_debugging.is_some() {
4881            sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4882        }
4883        if introspection_interval.is_some() {
4884            sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4885        }
4886        if size.is_some() {
4887            sql_bail!("SIZE not supported for unmanaged clusters");
4888        }
4889        if disk.is_some() {
4890            sql_bail!("DISK not supported for unmanaged clusters");
4891        }
4892        if !features.is_empty() {
4893            sql_bail!("FEATURES not supported for unmanaged clusters");
4894        }
4895        if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4896            sql_bail!(
4897                "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4898            );
4899        }
4900
4901        let mut replicas = vec![];
4902        for ReplicaDefinition { name, options } in replica_defs {
4903            replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4904        }
4905
4906        Ok(CreateClusterPlan {
4907            name: normalize::ident(name),
4908            variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4909            workload_class,
4910        })
4911    }
4912}
4913
4914/// Convert a [`CreateClusterPlan`] into a [`CreateClusterStatement`].
4915///
4916/// The reverse of [`plan_create_cluster`].
4917pub fn unplan_create_cluster(
4918    scx: &StatementContext,
4919    CreateClusterPlan {
4920        name,
4921        variant,
4922        workload_class,
4923    }: CreateClusterPlan,
4924) -> Result<CreateClusterStatement<Aug>, PlanError> {
4925    match variant {
4926        CreateClusterVariant::Managed(CreateClusterManagedPlan {
4927            replication_factor,
4928            size,
4929            availability_zones,
4930            compute,
4931            optimizer_feature_overrides,
4932            schedule,
4933        }) => {
4934            let schedule = unplan_cluster_schedule(schedule);
4935            let OptimizerFeatureOverrides {
4936                enable_reduce_mfp_fusion: _,
4937                enable_cardinality_estimates: _,
4938                persist_fast_path_limit: _,
4939                reoptimize_imported_views,
4940                enable_eager_delta_joins,
4941                enable_new_outer_join_lowering,
4942                enable_variadic_left_join_lowering,
4943                enable_letrec_fixpoint_analysis,
4944                enable_join_prioritize_arranged,
4945                enable_projection_pushdown_after_relation_cse,
4946                enable_less_reduce_in_eqprop: _,
4947                enable_dequadratic_eqprop_map: _,
4948                enable_eq_classes_withholding_errors: _,
4949                enable_fast_path_plan_insights: _,
4950                enable_cast_elimination: _,
4951                enable_case_literal_transform: _,
4952                enable_simplify_quantified_comparisons: _,
4953                enable_coalesce_case_transform: _,
4954                enable_will_distinct_propagation: _,
4955            } = optimizer_feature_overrides;
4956            // The ones from above that don't occur below are not wired up to cluster features.
4957            let features_extracted = ClusterFeatureExtracted {
4958                // Seen is ignored when unplanning.
4959                seen: Default::default(),
4960                reoptimize_imported_views,
4961                enable_eager_delta_joins,
4962                enable_new_outer_join_lowering,
4963                enable_variadic_left_join_lowering,
4964                enable_letrec_fixpoint_analysis,
4965                enable_join_prioritize_arranged,
4966                enable_projection_pushdown_after_relation_cse,
4967            };
4968            let features = features_extracted.into_values(scx.catalog);
4969            let availability_zones = if availability_zones.is_empty() {
4970                None
4971            } else {
4972                Some(availability_zones)
4973            };
4974            let (introspection_interval, introspection_debugging) =
4975                unplan_compute_replica_config(compute);
4976            // Replication factor cannot be explicitly specified with a refresh schedule, it's
4977            // always 1 or less.
4978            let replication_factor = match &schedule {
4979                ClusterScheduleOptionValue::Manual => Some(replication_factor),
4980                ClusterScheduleOptionValue::Refresh { .. } => {
4981                    // A cluster with a refresh schedule is turned On/Off by the cluster scheduling
4982                    // policy, so its replication factor should always be 0 or 1, and CREATE/ALTER
4983                    // reject setting both a non-MANUAL schedule and a higher replication factor. If
4984                    // we nevertheless find one (e.g., a cluster left in an invalid state by an
4985                    // older version), log loudly rather than crashing the coordinator: the
4986                    // replication factor is omitted from the rendered statement regardless.
4987                    soft_assert_or_log!(
4988                        replication_factor <= 1,
4989                        "replication factor, {replication_factor:?}, must be <= 1 with a refresh schedule"
4990                    );
4991                    None
4992                }
4993            };
4994            let workload_class = workload_class.map(|s| OptionalString(Some(s)));
4995            let options_extracted = ClusterOptionExtracted {
4996                // Seen is ignored when unplanning.
4997                seen: Default::default(),
4998                availability_zones,
4999                disk: None,
5000                introspection_debugging: Some(introspection_debugging),
5001                introspection_interval,
5002                managed: Some(true),
5003                replicas: None,
5004                replication_factor,
5005                size: Some(size),
5006                schedule: Some(schedule),
5007                workload_class,
5008            };
5009            let options = options_extracted.into_values(scx.catalog);
5010            let name = Ident::new_unchecked(name);
5011            Ok(CreateClusterStatement {
5012                name,
5013                options,
5014                features,
5015            })
5016        }
5017        CreateClusterVariant::Unmanaged(_) => {
5018            bail_unsupported!("SHOW CREATE for unmanaged clusters")
5019        }
5020    }
5021}
5022
5023generate_extracted_config!(
5024    ReplicaOption,
5025    (AvailabilityZone, String),
5026    (BilledAs, String),
5027    (ComputeAddresses, Vec<String>),
5028    (ComputectlAddresses, Vec<String>),
5029    (Disk, bool),
5030    (Internal, bool, Default(false)),
5031    (IntrospectionDebugging, bool, Default(false)),
5032    (IntrospectionInterval, OptionalDuration),
5033    (Size, String),
5034    (StorageAddresses, Vec<String>),
5035    (StoragectlAddresses, Vec<String>),
5036    (Workers, u16)
5037);
5038
5039fn plan_replica_config(
5040    scx: &StatementContext,
5041    options: Vec<ReplicaOption<Aug>>,
5042) -> Result<ReplicaConfig, PlanError> {
5043    let ReplicaOptionExtracted {
5044        availability_zone,
5045        billed_as,
5046        computectl_addresses,
5047        disk,
5048        internal,
5049        introspection_debugging,
5050        introspection_interval,
5051        size,
5052        storagectl_addresses,
5053        ..
5054    }: ReplicaOptionExtracted = options.try_into()?;
5055
5056    let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
5057
5058    match (
5059        size,
5060        availability_zone,
5061        billed_as,
5062        storagectl_addresses,
5063        computectl_addresses,
5064    ) {
5065        // Common cases we expect end users to hit.
5066        (None, _, None, None, None) => {
5067            // We don't mention the unmanaged options in the error message
5068            // because they are only available in unsafe mode.
5069            sql_bail!("SIZE option must be specified");
5070        }
5071        (Some(size), availability_zone, billed_as, None, None) => {
5072            if disk.is_some() {
5073                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
5074                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
5075                // we'll be able to remove the `DISK` option entirely.
5076                if scx.catalog.is_cluster_size_cc(&size) {
5077                    sql_bail!(
5078                        "DISK option not supported for modern cluster sizes because disk is always enabled"
5079                    );
5080                }
5081
5082                scx.catalog
5083                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5084            }
5085
5086            Ok(ReplicaConfig::Orchestrated {
5087                size,
5088                availability_zone,
5089                compute,
5090                billed_as,
5091                internal,
5092            })
5093        }
5094
5095        (None, None, None, storagectl_addresses, computectl_addresses) => {
5096            scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
5097
5098            // When manually testing Materialize in unsafe mode, it's easy to
5099            // accidentally omit one of these options, so we try to produce
5100            // helpful error messages.
5101            let Some(storagectl_addrs) = storagectl_addresses else {
5102                sql_bail!("missing STORAGECTL ADDRESSES option");
5103            };
5104            let Some(computectl_addrs) = computectl_addresses else {
5105                sql_bail!("missing COMPUTECTL ADDRESSES option");
5106            };
5107
5108            if storagectl_addrs.len() != computectl_addrs.len() {
5109                sql_bail!(
5110                    "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
5111                );
5112            }
5113
5114            if disk.is_some() {
5115                sql_bail!("DISK can't be specified for unorchestrated clusters");
5116            }
5117
5118            Ok(ReplicaConfig::Unorchestrated {
5119                storagectl_addrs,
5120                computectl_addrs,
5121                compute,
5122            })
5123        }
5124        _ => {
5125            // We don't bother trying to produce a more helpful error message
5126            // here because no user is likely to hit this path.
5127            sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
5128        }
5129    }
5130}
5131
5132/// Convert an [`Option<OptionalDuration>`] and [`bool`] into a [`ComputeReplicaConfig`].
5133///
5134/// The reverse of [`unplan_compute_replica_config`].
5135fn plan_compute_replica_config(
5136    introspection_interval: Option<OptionalDuration>,
5137    introspection_debugging: bool,
5138) -> Result<ComputeReplicaConfig, PlanError> {
5139    let introspection_interval = introspection_interval
5140        .map(|OptionalDuration(i)| i)
5141        .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5142    let introspection = match introspection_interval {
5143        Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5144            interval,
5145            debugging: introspection_debugging,
5146        }),
5147        None if introspection_debugging => {
5148            sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5149        }
5150        None => None,
5151    };
5152    let compute = ComputeReplicaConfig { introspection };
5153    Ok(compute)
5154}
5155
5156/// Convert a [`ComputeReplicaConfig`] into an [`Option<OptionalDuration>`] and [`bool`].
5157///
5158/// The reverse of [`plan_compute_replica_config`].
5159fn unplan_compute_replica_config(
5160    compute_replica_config: ComputeReplicaConfig,
5161) -> (Option<OptionalDuration>, bool) {
5162    match compute_replica_config.introspection {
5163        Some(ComputeReplicaIntrospectionConfig {
5164            debugging,
5165            interval,
5166        }) => (Some(OptionalDuration(Some(interval))), debugging),
5167        None => (Some(OptionalDuration(None)), false),
5168    }
5169}
5170
5171/// Convert a [`ClusterScheduleOptionValue`] into a [`ClusterSchedule`].
5172///
5173/// The reverse of [`unplan_cluster_schedule`].
5174fn plan_cluster_schedule(
5175    schedule: ClusterScheduleOptionValue,
5176) -> Result<ClusterSchedule, PlanError> {
5177    Ok(match schedule {
5178        ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5179        // If `HYDRATION TIME ESTIMATE` is not explicitly given, we default to 0.
5180        ClusterScheduleOptionValue::Refresh {
5181            hydration_time_estimate: None,
5182        } => ClusterSchedule::Refresh {
5183            hydration_time_estimate: Duration::from_millis(0),
5184        },
5185        // Otherwise we convert the `IntervalValue` to a `Duration`.
5186        ClusterScheduleOptionValue::Refresh {
5187            hydration_time_estimate: Some(interval_value),
5188        } => {
5189            let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5190            if interval.as_microseconds() < 0 {
5191                sql_bail!(
5192                    "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5193                    interval
5194                );
5195            }
5196            if interval.months != 0 {
5197                // This limitation is because we want this interval to be cleanly convertable
5198                // to a unix epoch timestamp difference. When the interval involves months, then
5199                // this is not true anymore, because months have variable lengths.
5200                sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5201            }
5202            let duration = interval.duration()?;
5203            if u64::try_from(duration.as_millis()).is_err()
5204                || Interval::from_duration(&duration).is_err()
5205            {
5206                sql_bail!("HYDRATION TIME ESTIMATE too large");
5207            }
5208            ClusterSchedule::Refresh {
5209                hydration_time_estimate: duration,
5210            }
5211        }
5212    })
5213}
5214
5215/// Convert a [`ClusterSchedule`] into a [`ClusterScheduleOptionValue`].
5216///
5217/// The reverse of [`plan_cluster_schedule`].
5218fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5219    match schedule {
5220        ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5221        ClusterSchedule::Refresh {
5222            hydration_time_estimate,
5223        } => {
5224            let interval = Interval::from_duration(&hydration_time_estimate)
5225                .expect("planning ensured that this is convertible back to Interval");
5226            let interval_value = literal::unplan_interval(&interval);
5227            ClusterScheduleOptionValue::Refresh {
5228                hydration_time_estimate: Some(interval_value),
5229            }
5230        }
5231    }
5232}
5233
5234pub fn describe_create_cluster_replica(
5235    _: &StatementContext,
5236    _: CreateClusterReplicaStatement<Aug>,
5237) -> Result<StatementDesc, PlanError> {
5238    Ok(StatementDesc::new(None))
5239}
5240
5241pub fn plan_create_cluster_replica(
5242    scx: &StatementContext,
5243    CreateClusterReplicaStatement {
5244        definition: ReplicaDefinition { name, options },
5245        of_cluster,
5246    }: CreateClusterReplicaStatement<Aug>,
5247) -> Result<Plan, PlanError> {
5248    let cluster = scx
5249        .catalog
5250        .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5251
5252    let config = plan_replica_config(scx, options)?;
5253
5254    if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5255        if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5256            return Err(PlanError::MangedReplicaName(name.into_string()));
5257        }
5258    } else {
5259        ensure_cluster_is_not_managed(scx, cluster.id())?;
5260    }
5261
5262    Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5263        name: normalize::ident(name),
5264        cluster_id: cluster.id(),
5265        config,
5266    }))
5267}
5268
5269pub fn describe_create_secret(
5270    _: &StatementContext,
5271    _: CreateSecretStatement<Aug>,
5272) -> Result<StatementDesc, PlanError> {
5273    Ok(StatementDesc::new(None))
5274}
5275
5276pub fn plan_create_secret(
5277    scx: &StatementContext,
5278    stmt: CreateSecretStatement<Aug>,
5279) -> Result<Plan, PlanError> {
5280    let CreateSecretStatement {
5281        name,
5282        if_not_exists,
5283        value,
5284    } = &stmt;
5285
5286    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5287    let mut create_sql_statement = stmt.clone();
5288    create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5289    let create_sql =
5290        normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5291    let secret_as = query::plan_secret_as(scx, value.clone())?;
5292
5293    let secret = Secret {
5294        create_sql,
5295        secret_as,
5296    };
5297
5298    Ok(Plan::CreateSecret(CreateSecretPlan {
5299        name,
5300        secret,
5301        if_not_exists: *if_not_exists,
5302    }))
5303}
5304
5305pub fn describe_create_connection(
5306    _: &StatementContext,
5307    _: CreateConnectionStatement<Aug>,
5308) -> Result<StatementDesc, PlanError> {
5309    Ok(StatementDesc::new(None))
5310}
5311
5312generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5313
5314pub fn plan_create_connection(
5315    scx: &StatementContext,
5316    mut stmt: CreateConnectionStatement<Aug>,
5317) -> Result<Plan, PlanError> {
5318    let CreateConnectionStatement {
5319        name,
5320        connection_type,
5321        values,
5322        if_not_exists,
5323        with_options,
5324    } = stmt.clone();
5325    let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5326    let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5327    let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5328
5329    let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5330    if options.validate.is_some() {
5331        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5332    }
5333    let validate = match options.validate {
5334        Some(val) => val,
5335        None => {
5336            scx.catalog
5337                .system_vars()
5338                .enable_default_connection_validation()
5339                && details.to_connection().validate_by_default()
5340        }
5341    };
5342
5343    // Check for an object in the catalog with this same name
5344    let full_name = scx.catalog.resolve_full_name(&name);
5345    let partial_name = PartialItemName::from(full_name.clone());
5346    if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5347        return Err(PlanError::ItemAlreadyExists {
5348            name: full_name.to_string(),
5349            item_type: item.item_type(),
5350        });
5351    }
5352
5353    // For SSH connections, overwrite the public key options based on the
5354    // connection details, in case we generated new keys during planning.
5355    if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5356        stmt.values.retain(|v| {
5357            v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5358        });
5359        stmt.values.push(ConnectionOption {
5360            name: ConnectionOptionName::PublicKey1,
5361            value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5362        });
5363        stmt.values.push(ConnectionOption {
5364            name: ConnectionOptionName::PublicKey2,
5365            value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5366        });
5367    }
5368    let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5369
5370    let plan = CreateConnectionPlan {
5371        name,
5372        if_not_exists,
5373        connection: crate::plan::Connection {
5374            create_sql,
5375            details,
5376        },
5377        validate,
5378    };
5379    Ok(Plan::CreateConnection(plan))
5380}
5381
5382fn plan_drop_database(
5383    scx: &StatementContext,
5384    if_exists: bool,
5385    name: &UnresolvedDatabaseName,
5386    cascade: bool,
5387) -> Result<Option<DatabaseId>, PlanError> {
5388    Ok(match resolve_database(scx, name, if_exists)? {
5389        Some(database) => {
5390            if !cascade && database.has_schemas() {
5391                sql_bail!(
5392                    "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5393                    name,
5394                );
5395            }
5396            Some(database.id())
5397        }
5398        None => None,
5399    })
5400}
5401
5402pub fn describe_drop_objects(
5403    _: &StatementContext,
5404    _: DropObjectsStatement,
5405) -> Result<StatementDesc, PlanError> {
5406    Ok(StatementDesc::new(None))
5407}
5408
5409pub fn plan_drop_objects(
5410    scx: &mut StatementContext,
5411    DropObjectsStatement {
5412        object_type,
5413        if_exists,
5414        names,
5415        cascade,
5416    }: DropObjectsStatement,
5417) -> Result<Plan, PlanError> {
5418    if object_type == mz_sql_parser::ast::ObjectType::Func {
5419        bail_unsupported!("DROP FUNCTION");
5420    }
5421    let object_type = object_type.into();
5422
5423    let mut referenced_ids = Vec::new();
5424    for name in names {
5425        let id = match &name {
5426            UnresolvedObjectName::Cluster(name) => {
5427                plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5428            }
5429            UnresolvedObjectName::ClusterReplica(name) => {
5430                plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5431            }
5432            UnresolvedObjectName::Database(name) => {
5433                plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5434            }
5435            UnresolvedObjectName::Schema(name) => {
5436                plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5437            }
5438            UnresolvedObjectName::Role(name) => {
5439                plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5440            }
5441            UnresolvedObjectName::Item(name) => {
5442                // Defer the dependency check until all names are resolved, so a
5443                // dependent that is itself being dropped in this same statement
5444                // does not block a non-cascade drop.
5445                plan_drop_item_name(scx, object_type, if_exists, name.clone())?.map(ObjectId::Item)
5446            }
5447            UnresolvedObjectName::NetworkPolicy(name) => {
5448                plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5449            }
5450        };
5451        match id {
5452            Some(id) => referenced_ids.push(id),
5453            None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5454                name: name.to_ast_string_simple(),
5455                object_type,
5456            }),
5457        }
5458    }
5459
5460    // Now that the full set of explicitly-named items is known, run the
5461    // non-cascade dependency check. A dependent that is itself being dropped in
5462    // this statement does not block the drop, matching PostgreSQL.
5463    if !cascade {
5464        let dropped_items: BTreeSet<CatalogItemId> = referenced_ids
5465            .iter()
5466            .filter_map(|id| match id {
5467                ObjectId::Item(id) => Some(*id),
5468                _ => None,
5469            })
5470            .collect();
5471        for id in &dropped_items {
5472            let catalog_item = scx.catalog.get_item(id);
5473            ensure_no_blocking_dependents(scx, object_type, catalog_item, &dropped_items)?;
5474        }
5475    }
5476
5477    let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5478
5479    Ok(Plan::DropObjects(DropObjectsPlan {
5480        referenced_ids,
5481        drop_ids,
5482        object_type,
5483    }))
5484}
5485
5486fn plan_drop_schema(
5487    scx: &StatementContext,
5488    if_exists: bool,
5489    name: &UnresolvedSchemaName,
5490    cascade: bool,
5491) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5492    // Special case for mz_temp: with lazy temporary schema creation, the temp
5493    // schema may not exist yet, but we still need to return the correct error.
5494    // Check the schema name directly against MZ_TEMP_SCHEMA.
5495    let normalized = normalize::unresolved_schema_name(name.clone())?;
5496    if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
5497        sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5498    }
5499
5500    Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5501        Some((database_spec, schema_spec)) => {
5502            if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5503                sql_bail!(
5504                    "cannot drop schema {name} because it is required by the database system",
5505                );
5506            }
5507            if let SchemaSpecifier::Temporary = schema_spec {
5508                sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5509            }
5510            let schema = scx.get_schema(&database_spec, &schema_spec);
5511            if !cascade && schema.has_items() {
5512                let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5513                sql_bail!(
5514                    "schema '{}' cannot be dropped without CASCADE while it contains objects",
5515                    full_schema_name
5516                );
5517            }
5518            Some((database_spec, schema_spec))
5519        }
5520        None => None,
5521    })
5522}
5523
5524fn plan_drop_role(
5525    scx: &StatementContext,
5526    if_exists: bool,
5527    name: &Ident,
5528) -> Result<Option<RoleId>, PlanError> {
5529    match scx.catalog.resolve_role(name.as_str()) {
5530        Ok(role) => {
5531            let id = role.id();
5532            if &id == scx.catalog.active_role_id() {
5533                sql_bail!("current role cannot be dropped");
5534            }
5535            for role in scx.catalog.get_roles() {
5536                for (member_id, grantor_id) in role.membership() {
5537                    if &id == grantor_id {
5538                        let member_role = scx.catalog.get_role(member_id);
5539                        sql_bail!(
5540                            "cannot drop role {}: still depended up by membership of role {} in role {}",
5541                            name.as_str(),
5542                            role.name(),
5543                            member_role.name()
5544                        );
5545                    }
5546                }
5547            }
5548            Ok(Some(role.id()))
5549        }
5550        Err(_) if if_exists => Ok(None),
5551        Err(e) => Err(e.into()),
5552    }
5553}
5554
5555fn plan_drop_cluster(
5556    scx: &StatementContext,
5557    if_exists: bool,
5558    name: &Ident,
5559    cascade: bool,
5560) -> Result<Option<ClusterId>, PlanError> {
5561    Ok(match resolve_cluster(scx, name, if_exists)? {
5562        Some(cluster) => {
5563            if !cascade && !cluster.bound_objects().is_empty() {
5564                return Err(PlanError::DependentObjectsStillExist {
5565                    object_type: "cluster".to_string(),
5566                    object_name: cluster.name().to_string(),
5567                    dependents: Vec::new(),
5568                });
5569            }
5570            Some(cluster.id())
5571        }
5572        None => None,
5573    })
5574}
5575
5576fn plan_drop_network_policy(
5577    scx: &StatementContext,
5578    if_exists: bool,
5579    name: &Ident,
5580) -> Result<Option<NetworkPolicyId>, PlanError> {
5581    match scx.catalog.resolve_network_policy(name.as_str()) {
5582        Ok(policy) => {
5583            // TODO(network_policy): When we support role based network policies, check if any role
5584            // currently has the specified policy set.
5585            if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5586                Err(PlanError::NetworkPolicyInUse)
5587            } else {
5588                Ok(Some(policy.id()))
5589            }
5590        }
5591        Err(_) if if_exists => Ok(None),
5592        Err(e) => Err(e.into()),
5593    }
5594}
5595
5596fn plan_drop_cluster_replica(
5597    scx: &StatementContext,
5598    if_exists: bool,
5599    name: &QualifiedReplica,
5600) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5601    let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5602    Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5603}
5604
5605/// Returns the [`CatalogItemId`] of the item we should drop, if it exists.
5606fn plan_drop_item(
5607    scx: &StatementContext,
5608    object_type: ObjectType,
5609    if_exists: bool,
5610    name: UnresolvedItemName,
5611    cascade: bool,
5612) -> Result<Option<CatalogItemId>, PlanError> {
5613    let Some(id) = plan_drop_item_name(scx, object_type, if_exists, name)? else {
5614        return Ok(None);
5615    };
5616    if !cascade {
5617        let catalog_item = scx.catalog.get_item(&id);
5618        ensure_no_blocking_dependents(scx, object_type, catalog_item, &BTreeSet::new())?;
5619    }
5620    Ok(Some(id))
5621}
5622
5623/// Resolves `name` to the [`CatalogItemId`] of the item to drop, performing the
5624/// system-object check but *not* the dependency check. Returns `None` if the
5625/// item does not exist and `if_exists` is set.
5626fn plan_drop_item_name(
5627    scx: &StatementContext,
5628    object_type: ObjectType,
5629    if_exists: bool,
5630    name: UnresolvedItemName,
5631) -> Result<Option<CatalogItemId>, PlanError> {
5632    let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5633        Ok(r) => r,
5634        // Return a more helpful error on `DROP VIEW <materialized-view>`.
5635        Err(PlanError::MismatchedObjectType {
5636            name,
5637            is_type: ObjectType::MaterializedView,
5638            expected_type: ObjectType::View,
5639        }) => {
5640            return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5641        }
5642        e => e?,
5643    };
5644
5645    Ok(match resolved {
5646        Some(catalog_item) => {
5647            if catalog_item.id().is_system() {
5648                sql_bail!(
5649                    "cannot drop {} {} because it is required by the database system",
5650                    catalog_item.item_type(),
5651                    scx.catalog.minimal_qualification(catalog_item.name()),
5652                );
5653            }
5654            Some(catalog_item.id())
5655        }
5656        None => None,
5657    })
5658}
5659
5660/// Errors if dropping `catalog_item` would leave a dangling dependent, i.e. an
5661/// object that depends on it and is not itself being dropped. Dependents whose
5662/// ids are in `also_dropped` are ignored, since they are being dropped as part
5663/// of the same statement.
5664fn ensure_no_blocking_dependents(
5665    scx: &StatementContext,
5666    object_type: ObjectType,
5667    catalog_item: &dyn CatalogItem,
5668    also_dropped: &BTreeSet<CatalogItemId>,
5669) -> Result<(), PlanError> {
5670    for id in catalog_item.used_by() {
5671        if also_dropped.contains(id) {
5672            continue;
5673        }
5674        let dep = scx.catalog.get_item(id);
5675        if dependency_prevents_drop(object_type, dep) {
5676            return Err(PlanError::DependentObjectsStillExist {
5677                object_type: catalog_item.item_type().to_string(),
5678                object_name: scx
5679                    .catalog
5680                    .minimal_qualification(catalog_item.name())
5681                    .to_string(),
5682                dependents: vec![(
5683                    dep.item_type().to_string(),
5684                    scx.catalog.minimal_qualification(dep.name()).to_string(),
5685                )],
5686            });
5687        }
5688    }
5689    // TODO(jkosh44) It would be nice to also check if any active subscribe or pending peek
5690    //  relies on entry. Unfortunately, we don't have that information readily available.
5691    Ok(())
5692}
5693
5694/// Does the dependency `dep` prevent a drop of a non-cascade query?
5695fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5696    match object_type {
5697        ObjectType::Type => true,
5698        ObjectType::Table
5699        | ObjectType::View
5700        | ObjectType::MaterializedView
5701        | ObjectType::Source
5702        | ObjectType::Sink
5703        | ObjectType::Index
5704        | ObjectType::Role
5705        | ObjectType::Cluster
5706        | ObjectType::ClusterReplica
5707        | ObjectType::Secret
5708        | ObjectType::Connection
5709        | ObjectType::Database
5710        | ObjectType::Schema
5711        | ObjectType::Func
5712        | ObjectType::NetworkPolicy => match dep.item_type() {
5713            CatalogItemType::Func
5714            | CatalogItemType::Table
5715            | CatalogItemType::Source
5716            | CatalogItemType::View
5717            | CatalogItemType::MaterializedView
5718            | CatalogItemType::Sink
5719            | CatalogItemType::Type
5720            | CatalogItemType::Secret
5721            | CatalogItemType::Connection => true,
5722            CatalogItemType::Index => false,
5723        },
5724    }
5725}
5726
5727pub fn describe_alter_index_options(
5728    _: &StatementContext,
5729    _: AlterIndexStatement<Aug>,
5730) -> Result<StatementDesc, PlanError> {
5731    Ok(StatementDesc::new(None))
5732}
5733
5734pub fn describe_drop_owned(
5735    _: &StatementContext,
5736    _: DropOwnedStatement<Aug>,
5737) -> Result<StatementDesc, PlanError> {
5738    Ok(StatementDesc::new(None))
5739}
5740
5741pub fn plan_drop_owned(
5742    scx: &StatementContext,
5743    drop: DropOwnedStatement<Aug>,
5744) -> Result<Plan, PlanError> {
5745    let cascade = drop.cascade();
5746    let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5747    let mut drop_ids = Vec::new();
5748    let mut privilege_revokes = Vec::new();
5749    let mut default_privilege_revokes = Vec::new();
5750
5751    fn update_privilege_revokes(
5752        object_id: SystemObjectId,
5753        privileges: &PrivilegeMap,
5754        role_ids: &BTreeSet<RoleId>,
5755        privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5756    ) {
5757        privilege_revokes.extend(iter::zip(
5758            iter::repeat(object_id),
5759            privileges
5760                .all_values()
5761                .filter(|privilege| role_ids.contains(&privilege.grantee))
5762                .cloned(),
5763        ));
5764    }
5765
5766    // Replicas
5767    for replica in scx.catalog.get_cluster_replicas() {
5768        if role_ids.contains(&replica.owner_id()) {
5769            drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5770        }
5771    }
5772
5773    // Clusters
5774    for cluster in scx.catalog.get_clusters() {
5775        if role_ids.contains(&cluster.owner_id()) {
5776            // Note: CASCADE is not required for replicas.
5777            if !cascade {
5778                let non_owned_bound_objects: Vec<_> = cluster
5779                    .bound_objects()
5780                    .into_iter()
5781                    .map(|item_id| scx.catalog.get_item(item_id))
5782                    .filter(|item| !role_ids.contains(&item.owner_id()))
5783                    .collect();
5784                if !non_owned_bound_objects.is_empty() {
5785                    let names: Vec<_> = non_owned_bound_objects
5786                        .into_iter()
5787                        .map(|item| {
5788                            (
5789                                item.item_type().to_string(),
5790                                scx.catalog.resolve_full_name(item.name()).to_string(),
5791                            )
5792                        })
5793                        .collect();
5794                    return Err(PlanError::DependentObjectsStillExist {
5795                        object_type: "cluster".to_string(),
5796                        object_name: cluster.name().to_string(),
5797                        dependents: names,
5798                    });
5799                }
5800            }
5801            drop_ids.push(cluster.id().into());
5802        }
5803        update_privilege_revokes(
5804            SystemObjectId::Object(cluster.id().into()),
5805            cluster.privileges(),
5806            &role_ids,
5807            &mut privilege_revokes,
5808        );
5809    }
5810
5811    // Items
5812    for item in scx.catalog.get_items() {
5813        if role_ids.contains(&item.owner_id()) {
5814            if !cascade {
5815                // Checks if any items still depend on this one, returning an error if so.
5816                let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5817                    let non_owned_dependencies: Vec<_> = used_by
5818                        .into_iter()
5819                        .map(|item_id| scx.catalog.get_item(item_id))
5820                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5821                        .filter(|item| !role_ids.contains(&item.owner_id()))
5822                        .collect();
5823                    if !non_owned_dependencies.is_empty() {
5824                        let names: Vec<_> = non_owned_dependencies
5825                            .into_iter()
5826                            .map(|item| {
5827                                let item_typ = item.item_type().to_string();
5828                                let item_name =
5829                                    scx.catalog.resolve_full_name(item.name()).to_string();
5830                                (item_typ, item_name)
5831                            })
5832                            .collect();
5833                        Err(PlanError::DependentObjectsStillExist {
5834                            object_type: item.item_type().to_string(),
5835                            object_name: scx
5836                                .catalog
5837                                .resolve_full_name(item.name())
5838                                .to_string()
5839                                .to_string(),
5840                            dependents: names,
5841                        })
5842                    } else {
5843                        Ok(())
5844                    }
5845                };
5846
5847                // When this item gets dropped it will also drop its progress source, so we need to
5848                // check the users of those.
5849                if let Some(id) = item.progress_id() {
5850                    let progress_item = scx.catalog.get_item(&id);
5851                    check_if_dependents_exist(progress_item.used_by())?;
5852                }
5853                check_if_dependents_exist(item.used_by())?;
5854            }
5855            drop_ids.push(item.id().into());
5856        }
5857        update_privilege_revokes(
5858            SystemObjectId::Object(item.id().into()),
5859            item.privileges(),
5860            &role_ids,
5861            &mut privilege_revokes,
5862        );
5863    }
5864
5865    // Schemas
5866    for schema in scx.catalog.get_schemas() {
5867        if !schema.id().is_temporary() {
5868            if role_ids.contains(&schema.owner_id()) {
5869                if !cascade {
5870                    let non_owned_dependencies: Vec<_> = schema
5871                        .item_ids()
5872                        .map(|item_id| scx.catalog.get_item(&item_id))
5873                        .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5874                        .filter(|item| !role_ids.contains(&item.owner_id()))
5875                        .collect();
5876                    if !non_owned_dependencies.is_empty() {
5877                        let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5878                        sql_bail!(
5879                            "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5880                            full_schema_name.to_string().quoted()
5881                        );
5882                    }
5883                }
5884                drop_ids.push((*schema.database(), *schema.id()).into())
5885            }
5886            update_privilege_revokes(
5887                SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5888                schema.privileges(),
5889                &role_ids,
5890                &mut privilege_revokes,
5891            );
5892        }
5893    }
5894
5895    // Databases
5896    for database in scx.catalog.get_databases() {
5897        if role_ids.contains(&database.owner_id()) {
5898            if !cascade {
5899                let non_owned_schemas: Vec<_> = database
5900                    .schemas()
5901                    .into_iter()
5902                    .filter(|schema| !role_ids.contains(&schema.owner_id()))
5903                    .collect();
5904                if !non_owned_schemas.is_empty() {
5905                    sql_bail!(
5906                        "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5907                        database.name().quoted(),
5908                    );
5909                }
5910            }
5911            drop_ids.push(database.id().into());
5912        }
5913        update_privilege_revokes(
5914            SystemObjectId::Object(database.id().into()),
5915            database.privileges(),
5916            &role_ids,
5917            &mut privilege_revokes,
5918        );
5919    }
5920
5921    // Network policies
5922    for network_policy in scx.catalog.get_network_policies() {
5923        if role_ids.contains(&network_policy.owner_id()) {
5924            drop_ids.push(ObjectId::NetworkPolicy(network_policy.id()));
5925        }
5926        update_privilege_revokes(
5927            SystemObjectId::Object(ObjectId::NetworkPolicy(network_policy.id())),
5928            network_policy.privileges(),
5929            &role_ids,
5930            &mut privilege_revokes,
5931        );
5932    }
5933
5934    // System
5935    update_privilege_revokes(
5936        SystemObjectId::System,
5937        scx.catalog.get_system_privileges(),
5938        &role_ids,
5939        &mut privilege_revokes,
5940    );
5941
5942    for (default_privilege_object, default_privilege_acl_items) in
5943        scx.catalog.get_default_privileges()
5944    {
5945        for default_privilege_acl_item in default_privilege_acl_items {
5946            if role_ids.contains(&default_privilege_object.role_id)
5947                || role_ids.contains(&default_privilege_acl_item.grantee)
5948            {
5949                default_privilege_revokes.push((
5950                    default_privilege_object.clone(),
5951                    default_privilege_acl_item.clone(),
5952                ));
5953            }
5954        }
5955    }
5956
5957    let drop_ids = scx.catalog.object_dependents(&drop_ids);
5958
5959    let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5960    if !system_ids.is_empty() {
5961        let mut owners = system_ids
5962            .into_iter()
5963            .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5964            .collect::<BTreeSet<_>>()
5965            .into_iter()
5966            .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5967        sql_bail!(
5968            "cannot drop objects owned by role {} because they are required by the database system",
5969            owners.join(", "),
5970        );
5971    }
5972
5973    Ok(Plan::DropOwned(DropOwnedPlan {
5974        role_ids: role_ids.into_iter().collect(),
5975        drop_ids,
5976        privilege_revokes,
5977        default_privilege_revokes,
5978    }))
5979}
5980
5981fn plan_retain_history_option(
5982    scx: &StatementContext,
5983    retain_history: Option<OptionalDuration>,
5984) -> Result<Option<CompactionWindow>, PlanError> {
5985    if let Some(OptionalDuration(lcw)) = retain_history {
5986        Ok(Some(plan_retain_history(scx, lcw)?))
5987    } else {
5988        Ok(None)
5989    }
5990}
5991
5992// Convert a specified RETAIN HISTORY option into a compaction window. `None` corresponds to
5993// `DisableCompaction`. A zero duration will error. This is because the `OptionalDuration` type
5994// already converts the zero duration into `None`. This function must not be called in the `RESET
5995// (RETAIN HISTORY)` path, which should be handled by the outer `Option<OptionalDuration>` being
5996// `None`.
5997fn plan_retain_history(
5998    scx: &StatementContext,
5999    lcw: Option<Duration>,
6000) -> Result<CompactionWindow, PlanError> {
6001    scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
6002    match lcw {
6003        // A zero duration has already been converted to `None` by `OptionalDuration` (and means
6004        // disable compaction), and should never occur here. Furthermore, some things actually do
6005        // break when this is set to real zero:
6006        // https://github.com/MaterializeInc/database-issues/issues/3798.
6007        Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
6008            option_name: "RETAIN HISTORY".to_string(),
6009            err: Box::new(PlanError::Unstructured(
6010                "internal error: unexpectedly zero".to_string(),
6011            )),
6012        }),
6013        Some(duration) => {
6014            // Error if the duration is low and enable_unlimited_retain_history is not set (which
6015            // should only be possible during testing).
6016            if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
6017                && scx
6018                    .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
6019                    .is_err()
6020            {
6021                return Err(PlanError::RetainHistoryLow {
6022                    limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
6023                });
6024            }
6025            Ok(duration.try_into()?)
6026        }
6027        // In the past `RETAIN HISTORY FOR '0'` meant disable compaction. Disabling compaction seems
6028        // to be a bad choice, so prevent it.
6029        None => {
6030            if scx
6031                .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
6032                .is_err()
6033            {
6034                Err(PlanError::RetainHistoryRequired)
6035            } else {
6036                Ok(CompactionWindow::DisableCompaction)
6037            }
6038        }
6039    }
6040}
6041
6042generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
6043
6044fn plan_index_options(
6045    scx: &StatementContext,
6046    with_opts: Vec<IndexOption<Aug>>,
6047) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
6048    if !with_opts.is_empty() {
6049        // Index options are not durable.
6050        scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
6051    }
6052
6053    let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
6054
6055    let mut out = Vec::with_capacity(1);
6056    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6057        out.push(crate::plan::IndexOption::RetainHistory(cw));
6058    }
6059    Ok(out)
6060}
6061
6062generate_extracted_config!(
6063    TableOption,
6064    (PartitionBy, Vec<Ident>),
6065    (RetainHistory, OptionalDuration),
6066    (RedactedTest, String)
6067);
6068
6069fn plan_table_options(
6070    scx: &StatementContext,
6071    desc: &RelationDesc,
6072    with_opts: Vec<TableOption<Aug>>,
6073) -> Result<Vec<crate::plan::TableOption>, PlanError> {
6074    let TableOptionExtracted {
6075        partition_by,
6076        retain_history,
6077        redacted_test,
6078        ..
6079    }: TableOptionExtracted = with_opts.try_into()?;
6080
6081    if let Some(partition_by) = partition_by {
6082        scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
6083        check_partition_by(desc, partition_by)?;
6084    }
6085
6086    if redacted_test.is_some() {
6087        scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
6088    }
6089
6090    let mut out = Vec::with_capacity(1);
6091    if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6092        out.push(crate::plan::TableOption::RetainHistory(cw));
6093    }
6094    Ok(out)
6095}
6096
6097pub fn plan_alter_index_options(
6098    scx: &mut StatementContext,
6099    AlterIndexStatement {
6100        index_name,
6101        if_exists,
6102        action,
6103    }: AlterIndexStatement<Aug>,
6104) -> Result<Plan, PlanError> {
6105    let object_type = ObjectType::Index;
6106    match action {
6107        AlterIndexAction::ResetOptions(options) => {
6108            let mut options = options.into_iter();
6109            if let Some(opt) = options.next() {
6110                match opt {
6111                    IndexOptionName::RetainHistory => {
6112                        if options.next().is_some() {
6113                            sql_bail!("RETAIN HISTORY must be only option");
6114                        }
6115                        return alter_retain_history(
6116                            scx,
6117                            object_type,
6118                            if_exists,
6119                            UnresolvedObjectName::Item(index_name),
6120                            None,
6121                        );
6122                    }
6123                }
6124            }
6125            sql_bail!("expected option");
6126        }
6127        AlterIndexAction::SetOptions(options) => {
6128            let mut options = options.into_iter();
6129            if let Some(opt) = options.next() {
6130                match opt.name {
6131                    IndexOptionName::RetainHistory => {
6132                        if options.next().is_some() {
6133                            sql_bail!("RETAIN HISTORY must be only option");
6134                        }
6135                        return alter_retain_history(
6136                            scx,
6137                            object_type,
6138                            if_exists,
6139                            UnresolvedObjectName::Item(index_name),
6140                            opt.value,
6141                        );
6142                    }
6143                }
6144            }
6145            sql_bail!("expected option");
6146        }
6147    }
6148}
6149
6150pub fn describe_alter_cluster_set_options(
6151    _: &StatementContext,
6152    _: AlterClusterStatement<Aug>,
6153) -> Result<StatementDesc, PlanError> {
6154    Ok(StatementDesc::new(None))
6155}
6156
6157pub fn plan_alter_cluster(
6158    scx: &mut StatementContext,
6159    AlterClusterStatement {
6160        name,
6161        action,
6162        if_exists,
6163    }: AlterClusterStatement<Aug>,
6164) -> Result<Plan, PlanError> {
6165    let cluster = match resolve_cluster(scx, &name, if_exists)? {
6166        Some(entry) => entry,
6167        None => {
6168            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6169                name: name.to_ast_string_simple(),
6170                object_type: ObjectType::Cluster,
6171            });
6172
6173            return Ok(Plan::AlterNoop(AlterNoopPlan {
6174                object_type: ObjectType::Cluster,
6175            }));
6176        }
6177    };
6178
6179    let mut options: PlanClusterOption = Default::default();
6180    let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
6181
6182    match action {
6183        AlterClusterAction::SetOptions {
6184            options: set_options,
6185            with_options,
6186        } => {
6187            let ClusterOptionExtracted {
6188                availability_zones,
6189                introspection_debugging,
6190                introspection_interval,
6191                managed,
6192                replicas: replica_defs,
6193                replication_factor,
6194                seen: _,
6195                size,
6196                disk,
6197                schedule,
6198                workload_class,
6199            }: ClusterOptionExtracted = set_options.try_into()?;
6200
6201            if !scx.catalog.active_role_id().is_system() {
6202                if workload_class.is_some() {
6203                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6204                }
6205            }
6206
6207            match managed.unwrap_or_else(|| cluster.is_managed()) {
6208                true => {
6209                    let alter_strategy_extracted =
6210                        ClusterAlterOptionExtracted::try_from(with_options)?;
6211                    alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6212
6213                    match alter_strategy {
6214                        AlterClusterPlanStrategy::None => {}
6215                        _ => {
6216                            scx.require_feature_flag(
6217                                &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6218                            )?;
6219                        }
6220                    }
6221
6222                    if replica_defs.is_some() {
6223                        sql_bail!("REPLICAS not supported for managed clusters");
6224                    }
6225                    if schedule.is_some()
6226                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6227                    {
6228                        scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6229
6230                        // A cluster with a non-MANUAL schedule is automatically turned On/Off by
6231                        // the cluster scheduling policy, which means its replication factor is
6232                        // always 0 or 1. If the cluster currently has a higher replication factor
6233                        // and the user is not lowering it in the same statement (which would be
6234                        // rejected just below), then reject the schedule change: otherwise we'd
6235                        // leave the cluster in an invalid state with both a non-MANUAL schedule and
6236                        // a replication factor > 1 (which would, e.g., make SHOW CREATE CLUSTER
6237                        // panic).
6238                        if replication_factor.is_none()
6239                            && cluster.replication_factor().is_some_and(|rf| rf > 1)
6240                        {
6241                            sql_bail!(
6242                                "SCHEDULE cannot be set to anything other than MANUAL while the \
6243                                cluster's REPLICATION FACTOR is greater than 1; \
6244                                set the REPLICATION FACTOR to 1 first"
6245                            );
6246                        }
6247                    }
6248
6249                    if replication_factor.is_some() {
6250                        if schedule.is_some()
6251                            && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6252                        {
6253                            sql_bail!(
6254                                "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6255                            );
6256                        }
6257                        if let Some(current_schedule) = cluster.schedule() {
6258                            if !matches!(current_schedule, ClusterSchedule::Manual) {
6259                                sql_bail!(
6260                                    "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6261                                );
6262                            }
6263                        }
6264                    }
6265                }
6266                false => {
6267                    if !alter_strategy.is_none() {
6268                        sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6269                    }
6270                    if availability_zones.is_some() {
6271                        sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6272                    }
6273                    if replication_factor.is_some() {
6274                        sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6275                    }
6276                    if introspection_debugging.is_some() {
6277                        sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6278                    }
6279                    if introspection_interval.is_some() {
6280                        sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6281                    }
6282                    if size.is_some() {
6283                        sql_bail!("SIZE not supported for unmanaged clusters");
6284                    }
6285                    if disk.is_some() {
6286                        sql_bail!("DISK not supported for unmanaged clusters");
6287                    }
6288                    if schedule.is_some()
6289                        && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6290                    {
6291                        sql_bail!(
6292                            "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6293                        );
6294                    }
6295                    if let Some(current_schedule) = cluster.schedule() {
6296                        if !matches!(current_schedule, ClusterSchedule::Manual)
6297                            && schedule.is_none()
6298                        {
6299                            sql_bail!(
6300                                "when switching a cluster to unmanaged, if the managed \
6301                                cluster's SCHEDULE is anything other than MANUAL, you have to \
6302                                explicitly set the SCHEDULE to MANUAL"
6303                            );
6304                        }
6305                    }
6306                }
6307            }
6308
6309            let mut replicas = vec![];
6310            for ReplicaDefinition { name, options } in
6311                replica_defs.into_iter().flat_map(Vec::into_iter)
6312            {
6313                replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6314            }
6315
6316            if let Some(managed) = managed {
6317                options.managed = AlterOptionParameter::Set(managed);
6318            }
6319            if let Some(replication_factor) = replication_factor {
6320                options.replication_factor = AlterOptionParameter::Set(replication_factor);
6321            }
6322            if let Some(size) = &size {
6323                options.size = AlterOptionParameter::Set(size.clone());
6324            }
6325            if let Some(availability_zones) = availability_zones {
6326                options.availability_zones = AlterOptionParameter::Set(availability_zones);
6327            }
6328            if let Some(introspection_debugging) = introspection_debugging {
6329                options.introspection_debugging =
6330                    AlterOptionParameter::Set(introspection_debugging);
6331            }
6332            if let Some(introspection_interval) = introspection_interval {
6333                options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6334            }
6335            if disk.is_some() {
6336                // The `DISK` option is a no-op for legacy cluster sizes and was never allowed for
6337                // `cc` sizes. The long term plan is to phase out the legacy sizes, at which point
6338                // we'll be able to remove the `DISK` option entirely.
6339                let size = match size.as_deref() {
6340                    Some(s) => s,
6341                    None => cluster
6342                        .managed_size()
6343                        .ok_or_else(|| sql_err!("cluster is not managed"))?,
6344                };
6345                if scx.catalog.is_cluster_size_cc(size) {
6346                    sql_bail!(
6347                        "DISK option not supported for modern cluster sizes because disk is always enabled"
6348                    );
6349                }
6350
6351                scx.catalog
6352                    .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6353            }
6354            if !replicas.is_empty() {
6355                options.replicas = AlterOptionParameter::Set(replicas);
6356            }
6357            if let Some(schedule) = schedule {
6358                options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6359            }
6360            if let Some(workload_class) = workload_class {
6361                options.workload_class = AlterOptionParameter::Set(workload_class.0);
6362            }
6363        }
6364        AlterClusterAction::ResetOptions(reset_options) => {
6365            use AlterOptionParameter::Reset;
6366            use ClusterOptionName::*;
6367
6368            if !scx.catalog.active_role_id().is_system() {
6369                if reset_options.contains(&WorkloadClass) {
6370                    sql_bail!("WORKLOAD CLASS not supported for non-system users");
6371                }
6372            }
6373
6374            for option in reset_options {
6375                match option {
6376                    AvailabilityZones => options.availability_zones = Reset,
6377                    Disk => scx
6378                        .catalog
6379                        .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6380                    IntrospectionInterval => options.introspection_interval = Reset,
6381                    IntrospectionDebugging => options.introspection_debugging = Reset,
6382                    Managed => options.managed = Reset,
6383                    Replicas => options.replicas = Reset,
6384                    ReplicationFactor => options.replication_factor = Reset,
6385                    Size => options.size = Reset,
6386                    Schedule => options.schedule = Reset,
6387                    WorkloadClass => options.workload_class = Reset,
6388                }
6389            }
6390        }
6391    }
6392    Ok(Plan::AlterCluster(AlterClusterPlan {
6393        id: cluster.id(),
6394        name: cluster.name().to_string(),
6395        options,
6396        strategy: alter_strategy,
6397    }))
6398}
6399
6400pub fn describe_alter_set_cluster(
6401    _: &StatementContext,
6402    _: AlterSetClusterStatement<Aug>,
6403) -> Result<StatementDesc, PlanError> {
6404    Ok(StatementDesc::new(None))
6405}
6406
6407pub fn plan_alter_item_set_cluster(
6408    scx: &StatementContext,
6409    AlterSetClusterStatement {
6410        if_exists,
6411        set_cluster: in_cluster_name,
6412        name,
6413        object_type,
6414    }: AlterSetClusterStatement<Aug>,
6415) -> Result<Plan, PlanError> {
6416    scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6417
6418    let object_type = object_type.into();
6419
6420    // Prevent access to `SET CLUSTER` for unsupported objects.
6421    match object_type {
6422        ObjectType::MaterializedView => {}
6423        ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6424            bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6425        }
6426        ObjectType::Table
6427        | ObjectType::View
6428        | ObjectType::Type
6429        | ObjectType::Role
6430        | ObjectType::Cluster
6431        | ObjectType::ClusterReplica
6432        | ObjectType::Secret
6433        | ObjectType::Connection
6434        | ObjectType::Database
6435        | ObjectType::Schema
6436        | ObjectType::Func
6437        | ObjectType::NetworkPolicy => {
6438            bail_never_supported!(
6439                format!("ALTER {object_type} SET CLUSTER"),
6440                "sql/alter-set-cluster/",
6441                format!("{object_type} has no associated cluster")
6442            )
6443        }
6444    }
6445
6446    let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6447
6448    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6449        Some(entry) => {
6450            let current_cluster = entry.cluster_id();
6451            let Some(current_cluster) = current_cluster else {
6452                sql_bail!("No cluster associated with {name}");
6453            };
6454
6455            if current_cluster == in_cluster.id() {
6456                Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6457            } else {
6458                Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6459                    id: entry.id(),
6460                    set_cluster: in_cluster.id(),
6461                }))
6462            }
6463        }
6464        None => {
6465            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6466                name: name.to_ast_string_simple(),
6467                object_type,
6468            });
6469
6470            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6471        }
6472    }
6473}
6474
6475pub fn describe_alter_object_rename(
6476    _: &StatementContext,
6477    _: AlterObjectRenameStatement,
6478) -> Result<StatementDesc, PlanError> {
6479    Ok(StatementDesc::new(None))
6480}
6481
6482pub fn plan_alter_object_rename(
6483    scx: &mut StatementContext,
6484    AlterObjectRenameStatement {
6485        name,
6486        object_type,
6487        to_item_name,
6488        if_exists,
6489    }: AlterObjectRenameStatement,
6490) -> Result<Plan, PlanError> {
6491    let object_type = object_type.into();
6492    match (object_type, name) {
6493        (
6494            ObjectType::View
6495            | ObjectType::MaterializedView
6496            | ObjectType::Table
6497            | ObjectType::Source
6498            | ObjectType::Index
6499            | ObjectType::Sink
6500            | ObjectType::Secret
6501            | ObjectType::Connection,
6502            UnresolvedObjectName::Item(name),
6503        ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6504        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6505            plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6506        }
6507        (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6508            plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6509        }
6510        (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6511            plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6512        }
6513        (object_type, name) => {
6514            // The earlier dispatch + name resolution should make this
6515            // combination impossible.
6516            bail_internal!("invalid object type '{object_type}' for ALTER RENAME with name {name}")
6517        }
6518    }
6519}
6520
6521pub fn plan_alter_schema_rename(
6522    scx: &mut StatementContext,
6523    name: UnresolvedSchemaName,
6524    to_schema_name: Ident,
6525    if_exists: bool,
6526) -> Result<Plan, PlanError> {
6527    // Special case for mz_temp: with lazy temporary schema creation, the temp
6528    // schema may not exist yet, but we still need to return the correct error.
6529    // Check the schema name directly against MZ_TEMP_SCHEMA.
6530    let normalized = normalize::unresolved_schema_name(name.clone())?;
6531    if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6532        sql_bail!(
6533            "cannot rename schemas in the ambient database: {:?}",
6534            mz_repr::namespaces::MZ_TEMP_SCHEMA
6535        );
6536    }
6537
6538    let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6539        let object_type = ObjectType::Schema;
6540        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6541            name: name.to_ast_string_simple(),
6542            object_type,
6543        });
6544        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6545    };
6546
6547    // Make sure the name is unique.
6548    if scx
6549        .resolve_schema_in_database(&db_spec, &to_schema_name)
6550        .is_ok()
6551    {
6552        return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6553            to_schema_name.clone().into_string(),
6554        )));
6555    }
6556
6557    // Prevent users from renaming system related schemas.
6558    let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6559    if schema.id().is_system() {
6560        bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6561    }
6562
6563    Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6564        cur_schema_spec: (db_spec, schema_spec),
6565        new_schema_name: to_schema_name.into_string(),
6566    }))
6567}
6568
6569pub fn plan_alter_schema_swap<F>(
6570    scx: &mut StatementContext,
6571    name_a: UnresolvedSchemaName,
6572    name_b: Ident,
6573    if_exists: bool,
6574    gen_temp_suffix: F,
6575) -> Result<Plan, PlanError>
6576where
6577    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6578{
6579    // Special case for mz_temp: with lazy temporary schema creation, the temp
6580    // schema may not exist yet, but we still need to return the correct error.
6581    // Check the schema name directly against MZ_TEMP_SCHEMA.
6582    let normalized_a = normalize::unresolved_schema_name(name_a.clone())?;
6583    if normalized_a.database.is_none() && normalized_a.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA
6584    {
6585        sql_bail!("cannot swap schemas that are in the ambient database");
6586    }
6587    // Also check name_b (the target schema name)
6588    let name_b_str = normalize::ident_ref(&name_b);
6589    if name_b_str == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6590        sql_bail!("cannot swap schemas that are in the ambient database");
6591    }
6592
6593    let schema_a = match scx.resolve_schema(name_a.clone()) {
6594        Ok(schema) => schema,
6595        Err(_) if if_exists => {
6596            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6597                name: name_a.to_ast_string_simple(),
6598                object_type: ObjectType::Schema,
6599            });
6600            return Ok(Plan::AlterNoop(AlterNoopPlan {
6601                object_type: ObjectType::Schema,
6602            }));
6603        }
6604        Err(e) => return Err(e),
6605    };
6606
6607    let db_spec = schema_a.database().clone();
6608    if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6609        sql_bail!("cannot swap schemas that are in the ambient database");
6610    };
6611    let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6612
6613    // We cannot swap system schemas.
6614    if schema_a.id().is_system() || schema_b.id().is_system() {
6615        bail_never_supported!("swapping a system schema".to_string())
6616    }
6617
6618    // Generate a temporary name we can swap schema_a to.
6619    //
6620    // 'check' returns if the temp schema name would be valid.
6621    const SCHEMA_SWAP_PREFIX: &str = "mz_schema_swap_";
6622    let check = |temp_suffix: &str| {
6623        let mut temp_name = ident!(SCHEMA_SWAP_PREFIX);
6624        temp_name.append_lossy(temp_suffix);
6625        scx.resolve_schema_in_database(&db_spec, &temp_name)
6626            .is_err()
6627    };
6628    let temp_suffix = gen_temp_suffix(&check)?;
6629    let name_temp = format!("{SCHEMA_SWAP_PREFIX}{temp_suffix}");
6630
6631    Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6632        schema_a_spec: (*schema_a.database(), *schema_a.id()),
6633        schema_a_name: schema_a.name().schema.to_string(),
6634        schema_b_spec: (*schema_b.database(), *schema_b.id()),
6635        schema_b_name: schema_b.name().schema.to_string(),
6636        name_temp,
6637    }))
6638}
6639
6640pub fn plan_alter_item_rename(
6641    scx: &mut StatementContext,
6642    object_type: ObjectType,
6643    name: UnresolvedItemName,
6644    to_item_name: Ident,
6645    if_exists: bool,
6646) -> Result<Plan, PlanError> {
6647    let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6648        Ok(r) => r,
6649        // Return a more helpful error on `DROP VIEW <materialized-view>`.
6650        Err(PlanError::MismatchedObjectType {
6651            name,
6652            is_type: ObjectType::MaterializedView,
6653            expected_type: ObjectType::View,
6654        }) => {
6655            return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6656        }
6657        e => e?,
6658    };
6659
6660    match resolved {
6661        Some(entry) => {
6662            let full_name = scx.catalog.resolve_full_name(entry.name());
6663            let item_type = entry.item_type();
6664
6665            let proposed_name = QualifiedItemName {
6666                qualifiers: entry.name().qualifiers.clone(),
6667                item: to_item_name.clone().into_string(),
6668            };
6669
6670            // For PostgreSQL compatibility, items and types cannot have
6671            // overlapping names in a variety of situations. See the comment on
6672            // `CatalogItemType::conflicts_with_type` for details.
6673            let conflicting_type_exists;
6674            let conflicting_item_exists;
6675            if item_type == CatalogItemType::Type {
6676                conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6677                conflicting_item_exists = scx
6678                    .catalog
6679                    .get_item_by_name(&proposed_name)
6680                    .map(|item| item.item_type().conflicts_with_type())
6681                    .unwrap_or(false);
6682            } else {
6683                conflicting_type_exists = item_type.conflicts_with_type()
6684                    && scx.catalog.get_type_by_name(&proposed_name).is_some();
6685                conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6686            };
6687            if conflicting_type_exists || conflicting_item_exists {
6688                sql_bail!("catalog item '{}' already exists", to_item_name);
6689            }
6690
6691            Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6692                id: entry.id(),
6693                current_full_name: full_name,
6694                to_name: normalize::ident(to_item_name),
6695                object_type,
6696            }))
6697        }
6698        None => {
6699            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6700                name: name.to_ast_string_simple(),
6701                object_type,
6702            });
6703
6704            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6705        }
6706    }
6707}
6708
6709pub fn plan_alter_cluster_rename(
6710    scx: &mut StatementContext,
6711    object_type: ObjectType,
6712    name: Ident,
6713    to_name: Ident,
6714    if_exists: bool,
6715) -> Result<Plan, PlanError> {
6716    match resolve_cluster(scx, &name, if_exists)? {
6717        Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6718            id: entry.id(),
6719            name: entry.name().to_string(),
6720            to_name: ident(to_name),
6721        })),
6722        None => {
6723            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6724                name: name.to_ast_string_simple(),
6725                object_type,
6726            });
6727
6728            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6729        }
6730    }
6731}
6732
6733pub fn plan_alter_cluster_swap<F>(
6734    scx: &mut StatementContext,
6735    name_a: Ident,
6736    name_b: Ident,
6737    if_exists: bool,
6738    gen_temp_suffix: F,
6739) -> Result<Plan, PlanError>
6740where
6741    F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6742{
6743    let cluster_a = match scx.resolve_cluster(Some(&name_a)) {
6744        Ok(cluster) => cluster,
6745        Err(_) if if_exists => {
6746            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6747                name: name_a.to_ast_string_simple(),
6748                object_type: ObjectType::Cluster,
6749            });
6750            return Ok(Plan::AlterNoop(AlterNoopPlan {
6751                object_type: ObjectType::Cluster,
6752            }));
6753        }
6754        Err(e) => return Err(e),
6755    };
6756    let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6757
6758    const CLUSTER_SWAP_PREFIX: &str = "mz_cluster_swap_";
6759    let check = |temp_suffix: &str| {
6760        let mut temp_name = ident!(CLUSTER_SWAP_PREFIX);
6761        temp_name.append_lossy(temp_suffix);
6762        match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6763            // Temp name does not exist, so we can use it.
6764            Err(CatalogError::UnknownCluster(_)) => true,
6765            // Temp name already exists!
6766            Ok(_) | Err(_) => false,
6767        }
6768    };
6769    let temp_suffix = gen_temp_suffix(&check)?;
6770    let name_temp = format!("{CLUSTER_SWAP_PREFIX}{temp_suffix}");
6771
6772    Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6773        id_a: cluster_a.id(),
6774        id_b: cluster_b.id(),
6775        name_a: name_a.into_string(),
6776        name_b: name_b.into_string(),
6777        name_temp,
6778    }))
6779}
6780
6781pub fn plan_alter_cluster_replica_rename(
6782    scx: &mut StatementContext,
6783    object_type: ObjectType,
6784    name: QualifiedReplica,
6785    to_item_name: Ident,
6786    if_exists: bool,
6787) -> Result<Plan, PlanError> {
6788    match resolve_cluster_replica(scx, &name, if_exists)? {
6789        Some((cluster, replica)) => {
6790            ensure_cluster_is_not_managed(scx, cluster.id())?;
6791            Ok(Plan::AlterClusterReplicaRename(
6792                AlterClusterReplicaRenamePlan {
6793                    cluster_id: cluster.id(),
6794                    replica_id: replica,
6795                    name: QualifiedReplica {
6796                        cluster: Ident::new(cluster.name())?,
6797                        replica: name.replica,
6798                    },
6799                    to_name: normalize::ident(to_item_name),
6800                },
6801            ))
6802        }
6803        None => {
6804            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6805                name: name.to_ast_string_simple(),
6806                object_type,
6807            });
6808
6809            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6810        }
6811    }
6812}
6813
6814pub fn describe_alter_object_swap(
6815    _: &StatementContext,
6816    _: AlterObjectSwapStatement,
6817) -> Result<StatementDesc, PlanError> {
6818    Ok(StatementDesc::new(None))
6819}
6820
6821pub fn plan_alter_object_swap(
6822    scx: &mut StatementContext,
6823    stmt: AlterObjectSwapStatement,
6824) -> Result<Plan, PlanError> {
6825    scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6826
6827    let AlterObjectSwapStatement {
6828        object_type,
6829        if_exists,
6830        name_a,
6831        name_b,
6832    } = stmt;
6833    let object_type = object_type.into();
6834
6835    // We'll try 10 times to generate a temporary suffix.
6836    let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6837        let mut attempts = 0;
6838        let name_temp = loop {
6839            attempts += 1;
6840            if attempts > 10 {
6841                tracing::warn!("Unable to generate temp id for swapping");
6842                sql_bail!("unable to swap!");
6843            }
6844
6845            // Call the provided closure to make sure this name is unique!
6846            let short_id = mz_ore::id_gen::temp_id();
6847            if check_fn(&short_id) {
6848                break short_id;
6849            }
6850        };
6851
6852        Ok(name_temp)
6853    };
6854
6855    match (object_type, name_a, name_b) {
6856        (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6857            plan_alter_schema_swap(scx, name_a, name_b, if_exists, gen_temp_suffix)
6858        }
6859        (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6860            plan_alter_cluster_swap(scx, name_a, name_b, if_exists, gen_temp_suffix)
6861        }
6862        (ObjectType::Schema | ObjectType::Cluster, _, _) => {
6863            bail_internal!("name type does not match object type for ALTER SWAP")
6864        }
6865        (
6866            ObjectType::Table
6867            | ObjectType::View
6868            | ObjectType::MaterializedView
6869            | ObjectType::Source
6870            | ObjectType::Sink
6871            | ObjectType::Index
6872            | ObjectType::Type
6873            | ObjectType::Role
6874            | ObjectType::ClusterReplica
6875            | ObjectType::Secret
6876            | ObjectType::Connection
6877            | ObjectType::Database
6878            | ObjectType::Func
6879            | ObjectType::NetworkPolicy,
6880            _,
6881            _,
6882        ) => Err(PlanError::Unsupported {
6883            feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6884            discussion_no: None,
6885        }),
6886    }
6887}
6888
6889pub fn describe_alter_retain_history(
6890    _: &StatementContext,
6891    _: AlterRetainHistoryStatement<Aug>,
6892) -> Result<StatementDesc, PlanError> {
6893    Ok(StatementDesc::new(None))
6894}
6895
6896pub fn plan_alter_retain_history(
6897    scx: &StatementContext,
6898    AlterRetainHistoryStatement {
6899        object_type,
6900        if_exists,
6901        name,
6902        history,
6903    }: AlterRetainHistoryStatement<Aug>,
6904) -> Result<Plan, PlanError> {
6905    alter_retain_history(scx, object_type.into(), if_exists, name, history)
6906}
6907
6908fn alter_retain_history(
6909    scx: &StatementContext,
6910    object_type: ObjectType,
6911    if_exists: bool,
6912    name: UnresolvedObjectName,
6913    history: Option<WithOptionValue<Aug>>,
6914) -> Result<Plan, PlanError> {
6915    let name = match (object_type, name) {
6916        (
6917            // View gets a special error below.
6918            ObjectType::View
6919            | ObjectType::MaterializedView
6920            | ObjectType::Table
6921            | ObjectType::Source
6922            | ObjectType::Index,
6923            UnresolvedObjectName::Item(name),
6924        ) => name,
6925        (object_type, _) => {
6926            bail_unsupported!(format!("RETAIN HISTORY on {object_type}"))
6927        }
6928    };
6929    match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6930        Some(entry) => {
6931            let full_name = scx.catalog.resolve_full_name(entry.name());
6932            let item_type = entry.item_type();
6933
6934            // Return a more helpful error on `ALTER VIEW <materialized-view>`.
6935            if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6936                return Err(PlanError::AlterViewOnMaterializedView(
6937                    full_name.to_string(),
6938                ));
6939            } else if object_type == ObjectType::View {
6940                sql_bail!("{object_type} does not support RETAIN HISTORY")
6941            } else if object_type != item_type {
6942                sql_bail!(
6943                    "\"{}\" is a {} not a {}",
6944                    full_name,
6945                    entry.item_type(),
6946                    format!("{object_type}").to_lowercase()
6947                )
6948            }
6949
6950            // Save the original value so we can write it back down in the create_sql catalog item.
6951            let (value, lcw) = match &history {
6952                Some(WithOptionValue::RetainHistoryFor(value)) => {
6953                    let window = OptionalDuration::try_from_value(value.clone())?;
6954                    (Some(value.clone()), window.0)
6955                }
6956                // None is RESET, so use the default CW.
6957                None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6958                _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6959            };
6960            let window = plan_retain_history(scx, lcw)?;
6961
6962            Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6963                id: entry.id(),
6964                value,
6965                window,
6966                object_type,
6967            }))
6968        }
6969        None => {
6970            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6971                name: name.to_ast_string_simple(),
6972                object_type,
6973            });
6974
6975            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6976        }
6977    }
6978}
6979
6980fn alter_source_timestamp_interval(
6981    scx: &StatementContext,
6982    if_exists: bool,
6983    source_name: UnresolvedItemName,
6984    value: Option<WithOptionValue<Aug>>,
6985) -> Result<Plan, PlanError> {
6986    let object_type = ObjectType::Source;
6987    match resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)? {
6988        Some(entry) => {
6989            let full_name = scx.catalog.resolve_full_name(entry.name());
6990            if entry.item_type() != CatalogItemType::Source {
6991                sql_bail!(
6992                    "\"{}\" is a {} not a {}",
6993                    full_name,
6994                    entry.item_type(),
6995                    format!("{object_type}").to_lowercase()
6996                )
6997            }
6998
6999            match value {
7000                Some(val) => {
7001                    let val = match val {
7002                        WithOptionValue::Value(v) => v,
7003                        _ => sql_bail!("TIMESTAMP INTERVAL requires an interval value"),
7004                    };
7005                    let duration = Duration::try_from_value(val.clone())?;
7006
7007                    let min = scx.catalog.system_vars().min_timestamp_interval();
7008                    let max = scx.catalog.system_vars().max_timestamp_interval();
7009                    if duration < min || duration > max {
7010                        return Err(PlanError::InvalidTimestampInterval {
7011                            min,
7012                            max,
7013                            requested: duration,
7014                        });
7015                    }
7016
7017                    Ok(Plan::AlterSourceTimestampInterval(
7018                        AlterSourceTimestampIntervalPlan {
7019                            id: entry.id(),
7020                            value: Some(val),
7021                            interval: duration,
7022                        },
7023                    ))
7024                }
7025                None => {
7026                    let interval = scx.catalog.system_vars().default_timestamp_interval();
7027                    Ok(Plan::AlterSourceTimestampInterval(
7028                        AlterSourceTimestampIntervalPlan {
7029                            id: entry.id(),
7030                            value: None,
7031                            interval,
7032                        },
7033                    ))
7034                }
7035            }
7036        }
7037        None => {
7038            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7039                name: source_name.to_ast_string_simple(),
7040                object_type,
7041            });
7042
7043            Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
7044        }
7045    }
7046}
7047
7048pub fn describe_alter_secret_options(
7049    _: &StatementContext,
7050    _: AlterSecretStatement<Aug>,
7051) -> Result<StatementDesc, PlanError> {
7052    Ok(StatementDesc::new(None))
7053}
7054
7055pub fn plan_alter_secret(
7056    scx: &mut StatementContext,
7057    stmt: AlterSecretStatement<Aug>,
7058) -> Result<Plan, PlanError> {
7059    let AlterSecretStatement {
7060        name,
7061        if_exists,
7062        value,
7063    } = stmt;
7064    let object_type = ObjectType::Secret;
7065    let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7066        Some(entry) => entry.id(),
7067        None => {
7068            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7069                name: name.to_string(),
7070                object_type,
7071            });
7072
7073            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7074        }
7075    };
7076
7077    let secret_as = query::plan_secret_as(scx, value)?;
7078
7079    Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
7080}
7081
7082pub fn describe_alter_connection(
7083    _: &StatementContext,
7084    _: AlterConnectionStatement<Aug>,
7085) -> Result<StatementDesc, PlanError> {
7086    Ok(StatementDesc::new(None))
7087}
7088
7089generate_extracted_config!(AlterConnectionOption, (Validate, bool));
7090
7091pub fn plan_alter_connection(
7092    scx: &StatementContext,
7093    stmt: AlterConnectionStatement<Aug>,
7094) -> Result<Plan, PlanError> {
7095    let AlterConnectionStatement {
7096        name,
7097        if_exists,
7098        actions,
7099        with_options,
7100    } = stmt;
7101    let conn_name = normalize::unresolved_item_name(name)?;
7102    let entry = match scx.catalog.resolve_item(&conn_name) {
7103        Ok(entry) => entry,
7104        Err(_) if if_exists => {
7105            scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7106                name: conn_name.to_string(),
7107                object_type: ObjectType::Connection,
7108            });
7109
7110            return Ok(Plan::AlterNoop(AlterNoopPlan {
7111                object_type: ObjectType::Connection,
7112            }));
7113        }
7114        Err(e) => return Err(e.into()),
7115    };
7116
7117    let connection = entry.connection()?;
7118
7119    if actions
7120        .iter()
7121        .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
7122    {
7123        if actions.len() > 1 {
7124            sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
7125        }
7126
7127        if !with_options.is_empty() {
7128            sql_bail!(
7129                "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
7130                with_options
7131                    .iter()
7132                    .map(|o| o.to_ast_string_simple())
7133                    .join(", ")
7134            );
7135        }
7136
7137        if !matches!(connection, Connection::Ssh(_)) {
7138            sql_bail!(
7139                "{} is not an SSH connection",
7140                scx.catalog.resolve_full_name(entry.name())
7141            )
7142        }
7143
7144        return Ok(Plan::AlterConnection(AlterConnectionPlan {
7145            id: entry.id(),
7146            action: crate::plan::AlterConnectionAction::RotateKeys,
7147        }));
7148    }
7149
7150    let options = AlterConnectionOptionExtracted::try_from(with_options)?;
7151    if options.validate.is_some() {
7152        scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
7153    }
7154
7155    let validate = match options.validate {
7156        Some(val) => val,
7157        None => {
7158            scx.catalog
7159                .system_vars()
7160                .enable_default_connection_validation()
7161                && connection.validate_by_default()
7162        }
7163    };
7164
7165    let connection_type = match connection {
7166        Connection::Aws(_) => CreateConnectionType::Aws,
7167        Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
7168        Connection::Gcp(_) => CreateConnectionType::Gcp,
7169        Connection::Kafka(_) => CreateConnectionType::Kafka,
7170        Connection::Csr(_) => CreateConnectionType::Csr,
7171        Connection::GlueSchemaRegistry(_) => CreateConnectionType::GlueSchemaRegistry,
7172        Connection::Postgres(_) => CreateConnectionType::Postgres,
7173        Connection::Ssh(_) => CreateConnectionType::Ssh,
7174        Connection::MySql(_) => CreateConnectionType::MySql,
7175        Connection::SqlServer(_) => CreateConnectionType::SqlServer,
7176        Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
7177    };
7178
7179    // Collect all options irrespective of action taken on them.
7180    let specified_options: BTreeSet<_> = actions
7181        .iter()
7182        .map(|action: &AlterConnectionAction<Aug>| match action {
7183            AlterConnectionAction::SetOption(option) => Ok(option.name.clone()),
7184            AlterConnectionAction::DropOption(name) => Ok(name.clone()),
7185            AlterConnectionAction::RotateKeys => {
7186                Err(internal_err!("RotateKeys is handled separately above"))
7187            }
7188        })
7189        .collect::<Result<_, PlanError>>()?;
7190
7191    for invalid in INALTERABLE_OPTIONS {
7192        if specified_options.contains(invalid) {
7193            sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
7194        }
7195    }
7196
7197    connection::validate_options_per_connection_type(connection_type, specified_options)?;
7198
7199    // Partition operations into set and drop.
7200    let mut set_options_vec: Vec<_> = Vec::new();
7201    let mut drop_options: BTreeSet<_> = BTreeSet::new();
7202    for action in actions {
7203        match action {
7204            AlterConnectionAction::SetOption(option) => set_options_vec.push(option),
7205            AlterConnectionAction::DropOption(name) => {
7206                drop_options.insert(name);
7207            }
7208            AlterConnectionAction::RotateKeys => {
7209                bail_internal!("RotateKeys is handled separately above")
7210            }
7211        }
7212    }
7213
7214    let set_options: BTreeMap<_, _> = set_options_vec
7215        .clone()
7216        .into_iter()
7217        .map(|option| (option.name, option.value))
7218        .collect();
7219
7220    // Type check values + avoid duplicates; we don't want to e.g. let users
7221    // drop and set the same option in the same statement, so treating drops as
7222    // sets here is fine.
7223    let connection_options_extracted =
7224        connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
7225
7226    let duplicates: Vec<_> = connection_options_extracted
7227        .seen
7228        .intersection(&drop_options)
7229        .collect();
7230
7231    if !duplicates.is_empty() {
7232        sql_bail!(
7233            "cannot both SET and DROP/RESET options {}",
7234            duplicates
7235                .iter()
7236                .map(|option| option.to_string())
7237                .join(", ")
7238        )
7239    }
7240
7241    for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
7242        let set_options_count = mutually_exclusive_options
7243            .iter()
7244            .filter(|o| set_options.contains_key(o))
7245            .count();
7246        let drop_options_count = mutually_exclusive_options
7247            .iter()
7248            .filter(|o| drop_options.contains(o))
7249            .count();
7250
7251        // Disallow setting _and_ resetting mutually exclusive options
7252        if set_options_count > 0 && drop_options_count > 0 {
7253            sql_bail!(
7254                "cannot both SET and DROP/RESET mutually exclusive {} options {}",
7255                connection_type,
7256                mutually_exclusive_options
7257                    .iter()
7258                    .map(|option| option.to_string())
7259                    .join(", ")
7260            )
7261        }
7262
7263        // If any option is either set or dropped, ensure all mutually exclusive
7264        // options are dropped. We do this "behind the scenes", even though we
7265        // disallow users from performing the same action because this is the
7266        // mechanism by which we overwrite values elsewhere in the code.
7267        if set_options_count > 0 || drop_options_count > 0 {
7268            drop_options.extend(mutually_exclusive_options.iter().cloned());
7269        }
7270
7271        // n.b. if mutually exclusive options are set, those will error when we
7272        // try to replan the connection.
7273    }
7274
7275    Ok(Plan::AlterConnection(AlterConnectionPlan {
7276        id: entry.id(),
7277        action: crate::plan::AlterConnectionAction::AlterOptions {
7278            set_options,
7279            drop_options,
7280            validate,
7281        },
7282    }))
7283}
7284
7285pub fn describe_alter_sink(
7286    _: &StatementContext,
7287    _: AlterSinkStatement<Aug>,
7288) -> Result<StatementDesc, PlanError> {
7289    Ok(StatementDesc::new(None))
7290}
7291
7292pub fn plan_alter_sink(
7293    scx: &mut StatementContext,
7294    stmt: AlterSinkStatement<Aug>,
7295) -> Result<Plan, PlanError> {
7296    let AlterSinkStatement {
7297        sink_name,
7298        if_exists,
7299        action,
7300    } = stmt;
7301
7302    let object_type = ObjectType::Sink;
7303    let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
7304
7305    let Some(item) = item else {
7306        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7307            name: sink_name.to_string(),
7308            object_type,
7309        });
7310
7311        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7312    };
7313    // Always ALTER objects from their latest version.
7314    let item = item.at_version(RelationVersionSelector::Latest);
7315
7316    match action {
7317        AlterSinkAction::ChangeRelation(new_from) => {
7318            // First we reconstruct the original CREATE SINK statement
7319            let create_sql = item.create_sql();
7320            let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
7321            let [stmt]: [StatementParseResult; 1] = stmts
7322                .try_into()
7323                .map_err(|_| internal_err!("create SQL of sink was not exactly one statement"))?;
7324            let Statement::CreateSink(stmt) = stmt.ast else {
7325                bail_internal!("create SQL of sink is not a CREATE SINK statement");
7326            };
7327
7328            // Then resolve and swap the resolved from relation to the new one
7329            let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7330            stmt.from = new_from;
7331
7332            // Finally re-plan the modified create sink statement to verify the new configuration is valid
7333            let Plan::CreateSink(mut plan) = plan_sink(scx, stmt)? else {
7334                bail_internal!("plan_sink did not produce a CreateSink plan");
7335            };
7336
7337            plan.sink.version += 1;
7338
7339            Ok(Plan::AlterSink(AlterSinkPlan {
7340                item_id: item.id(),
7341                global_id: item.global_id(),
7342                sink: plan.sink,
7343                with_snapshot: plan.with_snapshot,
7344                in_cluster: plan.in_cluster,
7345            }))
7346        }
7347        AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7348        AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7349    }
7350}
7351
7352pub fn describe_alter_source(
7353    _: &StatementContext,
7354    _: AlterSourceStatement<Aug>,
7355) -> Result<StatementDesc, PlanError> {
7356    // TODO: put the options here, right?
7357    Ok(StatementDesc::new(None))
7358}
7359
7360generate_extracted_config!(
7361    AlterSourceAddSubsourceOption,
7362    (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7363    (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7364    (Details, String)
7365);
7366
7367pub fn plan_alter_source(
7368    scx: &mut StatementContext,
7369    stmt: AlterSourceStatement<Aug>,
7370) -> Result<Plan, PlanError> {
7371    let AlterSourceStatement {
7372        source_name,
7373        if_exists,
7374        action,
7375    } = stmt;
7376    let object_type = ObjectType::Source;
7377
7378    if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7379        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7380            name: source_name.to_string(),
7381            object_type,
7382        });
7383
7384        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7385    }
7386
7387    match action {
7388        AlterSourceAction::SetOptions(options) => {
7389            let mut options = options.into_iter();
7390            let option = options
7391                .next()
7392                .ok_or_else(|| sql_err!("ALTER SOURCE SET requires at least one option"))?;
7393            if option.name == CreateSourceOptionName::RetainHistory {
7394                if options.next().is_some() {
7395                    sql_bail!("RETAIN HISTORY must be only option");
7396                }
7397                return alter_retain_history(
7398                    scx,
7399                    object_type,
7400                    if_exists,
7401                    UnresolvedObjectName::Item(source_name),
7402                    option.value,
7403                );
7404            }
7405            if option.name == CreateSourceOptionName::TimestampInterval {
7406                if options.next().is_some() {
7407                    sql_bail!("TIMESTAMP INTERVAL must be only option");
7408                }
7409                return alter_source_timestamp_interval(scx, if_exists, source_name, option.value);
7410            }
7411            // n.b we use this statement in purification in a way that cannot be
7412            // planned directly.
7413            sql_bail!(
7414                "Cannot modify the {} of a SOURCE.",
7415                option.name.to_ast_string_simple()
7416            );
7417        }
7418        AlterSourceAction::ResetOptions(reset) => {
7419            let mut options = reset.into_iter();
7420            let option = options
7421                .next()
7422                .ok_or_else(|| sql_err!("ALTER SOURCE RESET requires at least one option"))?;
7423            if option == CreateSourceOptionName::RetainHistory {
7424                if options.next().is_some() {
7425                    sql_bail!("RETAIN HISTORY must be only option");
7426                }
7427                return alter_retain_history(
7428                    scx,
7429                    object_type,
7430                    if_exists,
7431                    UnresolvedObjectName::Item(source_name),
7432                    None,
7433                );
7434            }
7435            if option == CreateSourceOptionName::TimestampInterval {
7436                if options.next().is_some() {
7437                    sql_bail!("TIMESTAMP INTERVAL must be only option");
7438                }
7439                return alter_source_timestamp_interval(scx, if_exists, source_name, None);
7440            }
7441            sql_bail!(
7442                "Cannot modify the {} of a SOURCE.",
7443                option.to_ast_string_simple()
7444            );
7445        }
7446        AlterSourceAction::DropSubsources { .. } => {
7447            sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7448        }
7449        AlterSourceAction::AddSubsources { .. } => {
7450            sql_bail!("ALTER SOURCE...ADD SUBSOURCE must be purified before planning")
7451        }
7452        AlterSourceAction::RefreshReferences => {
7453            sql_bail!("ALTER SOURCE...REFRESH REFERENCES must be purified before planning")
7454        }
7455    };
7456}
7457
7458pub fn describe_alter_system_set(
7459    _: &StatementContext,
7460    _: AlterSystemSetStatement,
7461) -> Result<StatementDesc, PlanError> {
7462    Ok(StatementDesc::new(None))
7463}
7464
7465pub fn plan_alter_system_set(
7466    _: &StatementContext,
7467    AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7468) -> Result<Plan, PlanError> {
7469    let name = name.to_string();
7470    Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7471        name,
7472        value: scl::plan_set_variable_to(to)?,
7473    }))
7474}
7475
7476pub fn describe_alter_system_reset(
7477    _: &StatementContext,
7478    _: AlterSystemResetStatement,
7479) -> Result<StatementDesc, PlanError> {
7480    Ok(StatementDesc::new(None))
7481}
7482
7483pub fn plan_alter_system_reset(
7484    _: &StatementContext,
7485    AlterSystemResetStatement { name }: AlterSystemResetStatement,
7486) -> Result<Plan, PlanError> {
7487    let name = name.to_string();
7488    Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7489}
7490
7491pub fn describe_alter_system_reset_all(
7492    _: &StatementContext,
7493    _: AlterSystemResetAllStatement,
7494) -> Result<StatementDesc, PlanError> {
7495    Ok(StatementDesc::new(None))
7496}
7497
7498pub fn plan_alter_system_reset_all(
7499    _: &StatementContext,
7500    _: AlterSystemResetAllStatement,
7501) -> Result<Plan, PlanError> {
7502    Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7503}
7504
7505pub fn describe_alter_role(
7506    _: &StatementContext,
7507    _: AlterRoleStatement<Aug>,
7508) -> Result<StatementDesc, PlanError> {
7509    Ok(StatementDesc::new(None))
7510}
7511
7512pub fn plan_alter_role(
7513    scx: &StatementContext,
7514    AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7515) -> Result<Plan, PlanError> {
7516    let option = match option {
7517        AlterRoleOption::Attributes(attrs) => {
7518            let attrs = plan_role_attributes(attrs, scx)?;
7519            PlannedAlterRoleOption::Attributes(attrs)
7520        }
7521        AlterRoleOption::Variable(variable) => {
7522            let var = plan_role_variable(scx, variable)?;
7523            PlannedAlterRoleOption::Variable(var)
7524        }
7525    };
7526
7527    Ok(Plan::AlterRole(AlterRolePlan {
7528        id: name.id,
7529        name: name.name,
7530        option,
7531    }))
7532}
7533
7534pub fn describe_alter_table_add_column(
7535    _: &StatementContext,
7536    _: AlterTableAddColumnStatement<Aug>,
7537) -> Result<StatementDesc, PlanError> {
7538    Ok(StatementDesc::new(None))
7539}
7540
7541pub fn plan_alter_table_add_column(
7542    scx: &StatementContext,
7543    stmt: AlterTableAddColumnStatement<Aug>,
7544) -> Result<Plan, PlanError> {
7545    let AlterTableAddColumnStatement {
7546        if_exists,
7547        name,
7548        if_col_not_exist,
7549        column_name,
7550        data_type,
7551    } = stmt;
7552    let object_type = ObjectType::Table;
7553
7554    scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7555
7556    let (relation_id, item_name, desc) =
7557        match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7558            Some(item) => {
7559                // Always add columns to the latest version of the item.
7560                let item_name = scx.catalog.resolve_full_name(item.name());
7561                let item = item.at_version(RelationVersionSelector::Latest);
7562                let desc = item
7563                    .relation_desc()
7564                    .ok_or_else(|| sql_err!("item does not have a relation description"))?
7565                    .into_owned();
7566                (item.id(), item_name, desc)
7567            }
7568            None => {
7569                scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7570                    name: name.to_ast_string_simple(),
7571                    object_type,
7572                });
7573                return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7574            }
7575        };
7576
7577    let column_name = ColumnName::from(column_name.as_str());
7578    if desc.get_by_name(&column_name).is_some() {
7579        if if_col_not_exist {
7580            scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7581                column_name: column_name.to_string(),
7582                object_name: item_name.item,
7583            });
7584            return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7585        } else {
7586            return Err(PlanError::ColumnAlreadyExists {
7587                column_name,
7588                object_name: item_name.item,
7589            });
7590        }
7591    }
7592
7593    let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7594    // TODO(alter_table): Support non-nullable columns with default values.
7595    let column_type = scalar_type.nullable(true);
7596    // "unresolve" our data type so we can later update the persisted create_sql.
7597    let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7598
7599    Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7600        relation_id,
7601        column_name,
7602        column_type,
7603        raw_sql_type,
7604    }))
7605}
7606
7607pub fn describe_alter_materialized_view_apply_replacement(
7608    _: &StatementContext,
7609    _: AlterMaterializedViewApplyReplacementStatement,
7610) -> Result<StatementDesc, PlanError> {
7611    Ok(StatementDesc::new(None))
7612}
7613
7614pub fn plan_alter_materialized_view_apply_replacement(
7615    scx: &StatementContext,
7616    stmt: AlterMaterializedViewApplyReplacementStatement,
7617) -> Result<Plan, PlanError> {
7618    let AlterMaterializedViewApplyReplacementStatement {
7619        if_exists,
7620        name,
7621        replacement_name,
7622    } = stmt;
7623
7624    scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
7625
7626    let object_type = ObjectType::MaterializedView;
7627    let Some(mv) = resolve_item_or_type(scx, object_type, name.clone(), if_exists)? else {
7628        scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7629            name: name.to_ast_string_simple(),
7630            object_type,
7631        });
7632        return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7633    };
7634
7635    let replacement = resolve_item_or_type(scx, object_type, replacement_name, false)?
7636        .ok_or_else(|| sql_err!("replacement materialized view does not exist"))?;
7637
7638    if replacement.replacement_target() != Some(mv.id()) {
7639        return Err(PlanError::InvalidReplacement {
7640            item_type: mv.item_type(),
7641            item_name: scx.catalog.minimal_qualification(mv.name()),
7642            replacement_type: replacement.item_type(),
7643            replacement_name: scx.catalog.minimal_qualification(replacement.name()),
7644        });
7645    }
7646
7647    Ok(Plan::AlterMaterializedViewApplyReplacement(
7648        AlterMaterializedViewApplyReplacementPlan {
7649            id: mv.id(),
7650            replacement_id: replacement.id(),
7651        },
7652    ))
7653}
7654
7655pub fn describe_comment(
7656    _: &StatementContext,
7657    _: CommentStatement<Aug>,
7658) -> Result<StatementDesc, PlanError> {
7659    Ok(StatementDesc::new(None))
7660}
7661
7662pub fn plan_comment(
7663    scx: &mut StatementContext,
7664    stmt: CommentStatement<Aug>,
7665) -> Result<Plan, PlanError> {
7666    const MAX_COMMENT_LENGTH: usize = 1024;
7667
7668    let CommentStatement { object, comment } = stmt;
7669
7670    // TODO(parkmycar): Make max comment length configurable.
7671    if let Some(c) = &comment {
7672        if c.len() > 1024 {
7673            return Err(PlanError::CommentTooLong {
7674                length: c.len(),
7675                max_size: MAX_COMMENT_LENGTH,
7676            });
7677        }
7678    }
7679
7680    let (object_id, column_pos) = match &object {
7681        com_ty @ CommentObjectType::Table { name }
7682        | com_ty @ CommentObjectType::View { name }
7683        | com_ty @ CommentObjectType::MaterializedView { name }
7684        | com_ty @ CommentObjectType::Index { name }
7685        | com_ty @ CommentObjectType::Func { name }
7686        | com_ty @ CommentObjectType::Connection { name }
7687        | com_ty @ CommentObjectType::Source { name }
7688        | com_ty @ CommentObjectType::Sink { name }
7689        | com_ty @ CommentObjectType::Secret { name } => {
7690            let item = scx.get_item_by_resolved_name(name)?;
7691            match (com_ty, item.item_type()) {
7692                (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7693                    (CommentObjectId::Table(item.id()), None)
7694                }
7695                (CommentObjectType::View { .. }, CatalogItemType::View) => {
7696                    (CommentObjectId::View(item.id()), None)
7697                }
7698                (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7699                    (CommentObjectId::MaterializedView(item.id()), None)
7700                }
7701                (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7702                    (CommentObjectId::Index(item.id()), None)
7703                }
7704                (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7705                    (CommentObjectId::Func(item.id()), None)
7706                }
7707                (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7708                    (CommentObjectId::Connection(item.id()), None)
7709                }
7710                (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7711                    (CommentObjectId::Source(item.id()), None)
7712                }
7713                (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7714                    (CommentObjectId::Sink(item.id()), None)
7715                }
7716                (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7717                    (CommentObjectId::Secret(item.id()), None)
7718                }
7719                (com_ty, cat_ty) => {
7720                    let expected_type = match com_ty {
7721                        CommentObjectType::Table { .. } => ObjectType::Table,
7722                        CommentObjectType::View { .. } => ObjectType::View,
7723                        CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7724                        CommentObjectType::Index { .. } => ObjectType::Index,
7725                        CommentObjectType::Func { .. } => ObjectType::Func,
7726                        CommentObjectType::Connection { .. } => ObjectType::Connection,
7727                        CommentObjectType::Source { .. } => ObjectType::Source,
7728                        CommentObjectType::Sink { .. } => ObjectType::Sink,
7729                        CommentObjectType::Secret { .. } => ObjectType::Secret,
7730                        _ => sql_bail!("cannot comment on this object type"),
7731                    };
7732
7733                    return Err(PlanError::InvalidObjectType {
7734                        expected_type: SystemObjectType::Object(expected_type),
7735                        actual_type: SystemObjectType::Object(cat_ty.into()),
7736                        object_name: item.name().item.clone(),
7737                    });
7738                }
7739            }
7740        }
7741        CommentObjectType::Type { ty } => match ty {
7742            ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7743                sql_bail!("cannot comment on anonymous list or map type");
7744            }
7745            ResolvedDataType::Named { id, modifiers, .. } => {
7746                if !modifiers.is_empty() {
7747                    sql_bail!("cannot comment on type with modifiers");
7748                }
7749                (CommentObjectId::Type(*id), None)
7750            }
7751            ResolvedDataType::Error => bail_internal!("unresolved data type"),
7752        },
7753        CommentObjectType::Column { name } => {
7754            let (item, pos) = scx.get_column_by_resolved_name(name)?;
7755            match item.item_type() {
7756                CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7757                CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7758                CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7759                CatalogItemType::MaterializedView => {
7760                    (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7761                }
7762                CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7763                r => {
7764                    return Err(PlanError::Unsupported {
7765                        feature: format!("Specifying comments on a column of {r}"),
7766                        discussion_no: None,
7767                    });
7768                }
7769            }
7770        }
7771        CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7772        CommentObjectType::Database { name } => {
7773            (CommentObjectId::Database(*name.database_id()), None)
7774        }
7775        CommentObjectType::Schema { name } => {
7776            // Temporary schemas cannot have comments - they are connection-specific
7777            // and transient. With lazy temporary schema creation, the temp schema
7778            // may not exist yet, but we still need to return the correct error.
7779            if matches!(name.schema_spec(), SchemaSpecifier::Temporary) {
7780                sql_bail!(
7781                    "cannot comment on schema {} because it is a temporary schema",
7782                    mz_repr::namespaces::MZ_TEMP_SCHEMA
7783                );
7784            }
7785            (
7786                CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7787                None,
7788            )
7789        }
7790        CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7791        CommentObjectType::ClusterReplica { name } => {
7792            let replica = scx.catalog.resolve_cluster_replica(name)?;
7793            (
7794                CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7795                None,
7796            )
7797        }
7798        CommentObjectType::NetworkPolicy { name } => {
7799            (CommentObjectId::NetworkPolicy(name.id), None)
7800        }
7801    };
7802
7803    // Note: the `mz_comments` table uses an `Int4` for the column position, but in the catalog storage we
7804    // store a `usize` which would be a `Uint8`. We guard against a safe conversion here because
7805    // it's the easiest place to raise an error.
7806    //
7807    // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
7808    if let Some(p) = column_pos {
7809        i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7810            max_num_columns: MAX_NUM_COLUMNS,
7811            req_num_columns: p,
7812        })?;
7813    }
7814
7815    Ok(Plan::Comment(CommentPlan {
7816        object_id,
7817        sub_component: column_pos,
7818        comment,
7819    }))
7820}
7821
7822pub(crate) fn resolve_cluster<'a>(
7823    scx: &'a StatementContext,
7824    name: &'a Ident,
7825    if_exists: bool,
7826) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7827    match scx.resolve_cluster(Some(name)) {
7828        Ok(cluster) => Ok(Some(cluster)),
7829        Err(_) if if_exists => Ok(None),
7830        Err(e) => Err(e),
7831    }
7832}
7833
7834pub(crate) fn resolve_cluster_replica<'a>(
7835    scx: &'a StatementContext,
7836    name: &QualifiedReplica,
7837    if_exists: bool,
7838) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7839    match scx.resolve_cluster(Some(&name.cluster)) {
7840        Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7841            Some(replica_id) => Ok(Some((cluster, *replica_id))),
7842            None if if_exists => Ok(None),
7843            None => Err(sql_err!(
7844                "CLUSTER {} has no CLUSTER REPLICA named {}",
7845                cluster.name(),
7846                name.replica.as_str().quoted(),
7847            )),
7848        },
7849        Err(_) if if_exists => Ok(None),
7850        Err(e) => Err(e),
7851    }
7852}
7853
7854pub(crate) fn resolve_database<'a>(
7855    scx: &'a StatementContext,
7856    name: &'a UnresolvedDatabaseName,
7857    if_exists: bool,
7858) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7859    match scx.resolve_database(name) {
7860        Ok(database) => Ok(Some(database)),
7861        Err(_) if if_exists => Ok(None),
7862        Err(e) => Err(e),
7863    }
7864}
7865
7866pub(crate) fn resolve_schema<'a>(
7867    scx: &'a StatementContext,
7868    name: UnresolvedSchemaName,
7869    if_exists: bool,
7870) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7871    match scx.resolve_schema(name) {
7872        Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7873        Err(_) if if_exists => Ok(None),
7874        Err(e) => Err(e),
7875    }
7876}
7877
7878pub(crate) fn resolve_network_policy<'a>(
7879    scx: &'a StatementContext,
7880    name: Ident,
7881    if_exists: bool,
7882) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7883    match scx.catalog.resolve_network_policy(&name.to_string()) {
7884        Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7885            id: policy.id(),
7886            name: policy.name().to_string(),
7887        })),
7888        Err(_) if if_exists => Ok(None),
7889        Err(e) => Err(e.into()),
7890    }
7891}
7892
7893pub(crate) fn resolve_item_or_type<'a>(
7894    scx: &'a StatementContext,
7895    object_type: ObjectType,
7896    name: UnresolvedItemName,
7897    if_exists: bool,
7898) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7899    let name = normalize::unresolved_item_name(name)?;
7900    let catalog_item = match object_type {
7901        ObjectType::Type => scx.catalog.resolve_type(&name),
7902        ObjectType::Table
7903        | ObjectType::View
7904        | ObjectType::MaterializedView
7905        | ObjectType::Source
7906        | ObjectType::Sink
7907        | ObjectType::Index
7908        | ObjectType::Role
7909        | ObjectType::Cluster
7910        | ObjectType::ClusterReplica
7911        | ObjectType::Secret
7912        | ObjectType::Connection
7913        | ObjectType::Database
7914        | ObjectType::Schema
7915        | ObjectType::Func
7916        | ObjectType::NetworkPolicy => scx.catalog.resolve_item(&name),
7917    };
7918
7919    match catalog_item {
7920        Ok(item) => {
7921            let is_type = ObjectType::from(item.item_type());
7922            if object_type == is_type {
7923                Ok(Some(item))
7924            } else {
7925                Err(PlanError::MismatchedObjectType {
7926                    name: scx.catalog.minimal_qualification(item.name()),
7927                    is_type,
7928                    expected_type: object_type,
7929                })
7930            }
7931        }
7932        Err(_) if if_exists => Ok(None),
7933        Err(e) => Err(e.into()),
7934    }
7935}
7936
7937/// Returns an error if the given cluster is a managed cluster
7938fn ensure_cluster_is_not_managed(
7939    scx: &StatementContext,
7940    cluster_id: ClusterId,
7941) -> Result<(), PlanError> {
7942    let cluster = scx.catalog.get_cluster(cluster_id);
7943    if cluster.is_managed() {
7944        Err(PlanError::ManagedCluster {
7945            cluster_name: cluster.name().to_string(),
7946        })
7947    } else {
7948        Ok(())
7949    }
7950}